Impala的SQL解析与执行计划生成部分是由impala-frontend(Java)实现的,监听端口是21000。用户通过Beeswax接口BeeswaxService.query()提交一个请求,在impalad端的处理逻辑是由void ImpalaServer::query(QueryHandle query_handle, const Query query)这个
Impala的SQL解析与执行计划生成部分是由impala-frontend(Java)实现的,监听端口是21000。用户通过Beeswax接口BeeswaxService.query()提交一个请求,在impalad端的处理逻辑是由void ImpalaServer::query(QueryHandle& query_handle, const Query& query)这个函数(在impala-beeswax-server.cc中实现)完成的。
在impala中一条SQL语句先后经历BeeswaxService.Query->TClientRequest->TExecRequest,最后把TExecRequest交由impala-coordinator分发给多个backend处理。本文主要讲一条SQL语句是怎么一步一步变成TExecRequest的。
本文以下内容都以这样的一个SQL为例说明:
select jobinfo.dt,user, max(taskinfo.finish_time-taskinfo.start_time), max(jobinfo.finish_time-jobinfo.submit_time) from taskinfo join jobinfo on jobinfo.jobid=taskinfo.jobid where jobinfo.job_status='SUCCESS' and taskinfo.task_status='SUCCESS' group by jobinfo.dt,user
通过调用Status ImpalaServer::GetExecRequest(const TClientRequest& request, TExecRequest* result) 函数把TClientRequest转化成TExecRequest
在这个函数里通过JNI接口调用frontend.createExecRequest()生成TExecRequest。首先调用AnalysisContext.analyze(String stmt)分析提交的SQL语句。
注释:Analyzer对象是个存放这个SQL所涉及到的所有信息(包含Table, conjunct, slot,slotRefMap, eqJoinConjuncts等)的知识库,所有跟这个SQL有关的东西都会存到Analyzer对象里面。
AnalysisContext.analyze(String stmt)会调用SelectStmt.analyze()函数,这个函数就是对SQL的analyze和向中央知识库Analyzer register各种信息。
(1)处理这个SQL所涉及到的Table(即TableRefs),这些Table是在from从句中提取出来的(包含关键字from, join, on/using)。注意JOIN操作以及on/using条件是存储在参与JOIN操作的右边的表的TableRef中并分析的。依次analyze()每个TableRef,向Analyzer注册registerBaseTableRef(填充TupleDescriptor)。如果对应的TableRef涉及到JOIN操作,还要analyzeJoin()。在analyzeJoin()时会向Analyzer registerConjunct()填充Analyzer的一些成员变量:conjuncts,tuplePredicates(TupleId与conjunct的映射),slotPredicates(SlotId与conjunct的映射),eqJoinConjuncts。本例中on从句是一种BinaryPredicate,然后onClause.analyze(analyzer)会递归analyze这个on从句里的各种组件。
(2)处理select从句(包含关键字select, MAX(), AVG()等聚集函数):分析这个SQL都select了哪几项,每一项都是个Expr类型的子类对象,把这几项填入resultExprs数组和colLabels。然后把resultExprs里面的Expr都递归analyze一下,要分析到树的最底层,向Analyzer注册SlotRef等。
(3)分析where从句(关键字where),首先递归Analyze从句中Expr组成的树,然后向Analyzer registerConjunct()填充Analyzer的一些成员变量(同1,此外还要填充whereClauseConjuncts) 。
(4)处理sort相关信息(关键字order by)。先是解析aliases和ordinals,然后从order by后面的从句中提取Expr填入orderingExprs,接着递归Analyze从句中Expr组成的树,最后创建SortInfo对象。
(5)处理aggregation相关信息(关键字group by, having, avg, max等)。首先递归分析group by从句里的Expr,然后如果有having从句就像where从句一样,先是analyze having从句中Expr组成的树,然后向Analyzer registerConjunct()等。
(6)处理InlineView。
关于SQL解析中所涉及到的各种数据结构表示如下:
至此词法分析,语法分析结束,有点像一个小的编译器。我们现在回到frontend.createExecRequest()函数中。调用完AnalysisContext.analyze()之后,就开始填充TExecRequest内的成员变量。
(1)如果是DDL命令(use, show tables, show databases, describe),那么调用createDdlExecRequest();
(2)另外一种情况就是Query或者DML命令,那么就得创建和填充TQueryExecRequest了。
下面就是用Planner把SQL解析出的语法树转换成Plan fragments,后者能在各个backend被执行。
Planner planner = new Planner();
ArrayListfragments =
planner.createPlanFragments(analysisResult, request.queryOptions);
这个createPlanFragments()函数是frontend最重要的函数:根据SQL解析的结果和client传入的query options,生成执行计划。执行计划是用PlanFragment的数组表示的,最后会序列化到TQueryExecRequest.fragments然后传给backend的coordinator去调度执行。
下面进入Planner.createPlanFragments()函数看看执行计划是怎么生成的:
首先要搞清楚两个概念:PlanNode和PlanFragment。
PlanNode是SQL解析出来的逻辑功能节点;PlanFragment是真正的执行计划节点。
PlanNode singleNodePlan =
createQueryPlan(queryStmt, analyzer, queryOptions.getDefault_order_by_limit());
(1)这个函数首先根据from从句中的第一个TableRef创建一个PlanNode,一般为ScanNode(HdfsScanNode或者HBaseScanNode)。这个ScanNode关联一个ValueRange的数组(由多个cluster column取值区间组成)表示要读取的Table的范围,还关联一个conjunct(where从句)。
(2)这个SQL语句中TableRef中剩下的其他Table就需要建立HashJoinNode了。进入Planner.createHashJoinNode()函数:首先为这个Table建立ScanNode(同上),然后调用getHashLookupJoinConjuncts()获取两表或者多表JOIN的eqJoinConjuncts和eqJoinPredicates,利用这两个条件创建HashJoinNode。每个HashJoinNode也是树状的,会有孩子节点,对于我们举例的两表JOIN,孩子节点分别是两个表对应的ScanNode。(注意目前impala只支持一大一小两个表的JOIN,默认是左大右小,是通过把右边的小表分发到每个节点的内存中分别于左边大表的一个区间进行JOIN过滤实现的。)
(3)如果有group by从句,创建AggregationNode,并把刚才的HashJoinNode设为它的孩子。这里暂时不考虑DISTINCT aggregation function。
(4)如果有order by… limit从句,创建SortNode。
这样createQueryPlan()函数执行完毕,PlanNode组成的execution tree形成如下:
接下来就看impala backend节点数目有多少,如果只有一个节点,那么整棵执行树都在同一个impalad上执行;否则调用createPlanFragments(singleNodePlan, isPartitioned, false, fragments)把PlanNode组成的执行树转换成PlanFragment组成的执行计划。
下面进入createPlanFragments()这个函数:
这是一个递归函数,沿着PlanNode组成的执行树递归下去,分别创建对应的Fragment。
(1)如果是ScanNode,创建一个PlanFragment(这个PlanFragment的root node是这个ScanNode,而且这个PlanFragment只包含一个PlanNode)。
(2)如果是HashJoinNode,并不是创建一个新的PlanFragment,而是修改leftChildFragment(是一个ScanNode)为以HashJoinNode作为root node的PlanFragment。因为对于HashJoinNode一般有两个ScanNode孩子,在处理HashJoinNode之前已经把这两个ScanNode变成了对应的PlanFragment。那么此时要得到HashJoinNode作为root node的PlanFragment是通过Planner.createHashJoinFragment()函数完成的:首先把当前HashJoinNode作为HashJoinFragment的root node;然后把leftChildFragment中的root PlanNode(也就是参与JOIN的两个表中左边的那个表对应的ScanNode)作为HashJoinNode的左孩子;通过调用Planner.connectChildFragment()函数把HashJoinNode的右孩子设置为一个ExchangeNode(这个ExchangeNode表示一个1:n的数据流的receiver);同时把rightChildFragment(ScanNode作为root node)的destination设置为这个ExchangeNode。
(3)如果是AggregationNode,聚集操作很复杂了。以我们的例子来说明:如果这个AggregationNode不是DISTINCT aggregation的2nd phase(因为本例中的AggregationNode的孩子是HashJoinNode而不是另外一个AggregationNode),首先把刚才生成的HashJoinNode作为root node对应的PlanFragment的root node设置为该AggregationNode,并把原来的root node(即HashJoinNode)设为新root node的孩子。然后通过Planner.createParentFragment()创建一个包含ExchangeNode作为root node的新的PlanFragment。并把孩子PlanFragment的destination设置为这个ExchangeNode。然后在这个新的PlanFragment中创建一个新的AggregationNode作为新的root node并把刚才的ExchangeNode作为其孩子节点。
至此,createPlanFragments()调用完成,生成的三个PlanFragment如下:
通过createPlanFragments(singleNodePlan, isPartitioned, false, fragments)获取了所以执行计划PlanFragment组成的数组fragments,这个数组的最后一个元素就是根节点PlanFragment。然后就是调用PlanFragment.finalize()把这个执行计划finalize(递归finalize每个PlanNode)同时为每个PlanFragment指定 DataStreamSink。
然后回到frontend.createExecRequest()函数中。执行完Planner.createPlanFragments()返回的ArrayList就是完整的执行计划了。然后就是一次调用PlanFragment.toThrift()把它序列化到TQueryExecRequest。填充TQueryExecRequest的相关变量:dest_fragment_idx,per_node_scan_ranges,query_globals,result_set_metadata等。最后返回TExecRequest型的对象给backend执行。
Impala-backend(C++代码)拿到这个TExecRequest对象,有coordinator在各个backend之间分发执行,这是下一篇文章的内容了。
吐槽:从中还是能够看到MapReduce的影子的。。。对于每个PlanFragment有个DataStreamSink,会指向其他PlanFragment中的ExchangeNode,是个1对N的关系。。。所以分布式系统的瓶颈还是Data Shuffle,不管是MapReduce模型还是impala。这也说明其实Tez/Stinger Initiative 对Hive的优化还是很值得期待的。
参考文献:http://blog.csdn.net/wind5shy/article/details/8563355
原文地址:Impala源代码分析(2)-SQL解析与执行计划生成, 感谢原作者分享。