Impala总共分为3个组件:impalad, statestored, client/impala-shell。关于这三个组件的基本功能在这篇文章中已经介绍过了。 Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。无论哪个其实就是一个Thrift的client,连接到impala
Impala总共分为3个组件:impalad, statestored, client/impala-shell。关于这三个组件的基本功能在这篇文章中已经介绍过了。
Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。无论哪个其实就是一个Thrift的client,连接到impalad的21000端口。
Impalad: 分为frontend和backend两部分,这个进程有三个ThriftServer(beeswax_server, hs2_server, be_server)对系统外和系统内提供服务。
Statestored: 集群内各个backend service的数据交换中心,每个backend会在statestored注册,以后statestored会与所有注册过的backend交换update消息。
Component | Service | Port | Access Requirement | Comment |
ImpalaDaemon | Impala Daemon Backend Port | 22000 | Internal | ImpalaBackendService export |
Impala Daemon Frontend Port | 21000 | External | ImpalaService export | |
Impala Daemon HTTP Server Port | 25000 | External | Impala debug web server | |
StateStoreSubscriber Service Port | 23000 | Internal | StateStoreSubscriberService | |
?ImpalaStateStore Daemon | StateStore HTTP Server Port | 25010 | External | StateStore debug web server |
StateStore Service Port | 24000 | Internal | StateStoreService export |
下面介绍三个组件之间的Thrift RPC(“”前面的表示RPC client,“”后面的表示RPC server)
BeeswaxService(beeswax.thrift): client通过query()提交SQL请求,然后异步调用get_state()监听该SQL的查询进度,一旦完成,调用fetch()取回结果。
TCLIService(cli_service.thrift): client提交SQL请求,功能和上面类似,更丰富的就是对DDL操作的支持,例如GetTables()返回指定table的元数据。
ImpalaService和ImpalaHiveServer2Service(ImpalaService.thrift)分别是上面两个类的子类,各自丰富了点功能而已,核心功能没啥大变化。
StateStoreService(StateStoreService.thrift): statestored保存整个系统所有backend service状态的全局数据库,这里是个单节点中央数据交换中心(该节点保存的状态是soft state,一旦宕机,保存的状态信息就没了)。例如每个impala backend启动的时候会调用StateStoreService.RegisterService()向statestored注册自己(其实是通过跟这个backend service捆绑在一起的StateStoreSubscriber标识的),然后再调用StateStoreService.RegisterSubscription()表明这个StateStoreSubscriber接收来自statestored的update。
StateStoreSubscriberService(StateStoreSubscriberService.thrift): backend向statestored调用RegisterSubscription之后,statestored就会定期向backend这边捆绑的StateStoreSubscriber发送该backend的状态更新信息。然后backend这边调用StateStoreSubscriberService.UpdateState()更新相关状态。同时这个UpdateState()调用在impalad backend/StateStoreSubscriber这端还会返回该backend的一些update信息给statestored。
ImpalaInternalService(ImpalaInternalService.thrift):某个backend的coordinator要向其他backend的execute engine发送执行某个plan fragment的请求(提交ExecPlanFragment并要求返回ReportExecStatus)。这部分功能会在backend分析中详细讨论。
ImpalaPlanService(ImpalaPlanService.thrift):可以由其他形式的frontend生成TExecRequest然后交给backend执行。
另外,Impala frontend是用Java写的,而backend使用C++写的。Frontend负责把输入的SQL解析,然后生成执行计划,之后通过Thrift的序列化/反序列化的方式传给backend。TExecRequest(frontend.thrift)是中间传输的数据结构,表示了一个Query/DML/DDL的查询请求,也是SQL执行过程中在frontend和backend之间的数据接口。所以我们可以把impala-frontend换掉,用其他的形式拼凑出这个TExecRequest就可以传给backend执行,这也就是前面说的ImpalaPlanService干的事。
client就可以通过Beeswax和HiveServer2的Thrift API向Impala提交query。这两种访问接口的作用是一样的(都是用于client提交query,返回query result)。
Impala_shell.py是通过Beeswax方式访问impala的,下面我们看看impala_shell.py是怎么向impalad提交query的。
(1)通过OptionParser()解析命令行参数。如果参数中有—query或者—query_file,则执行execute_queries_non_interactive_mode(options),这是非交互查询(也就是就查询一个SQL或者一个写满SQL的文件);否则进入ImpalaShell.cmdloop (intro)循环。
(2)进入命令行循环后,一般是先connect某一个impalad,输入”connect localhost:21000”,进入do_connect(self, args)函数。这个函数根据用户指定的host和port,生成与相应的impalad的socket连接。最重要的就是这行代码:
self.imp_service = ImpalaService.Client(protocol)
至此imp_service就是client端的代理了,所有请求都通过它提交。
(3)下面以select命令为例说明,如果client输入这样的命令”select col1, col2 from tbl”,则进入do_select(self, args)函数。在这个函数里首先生成BeeswaxService.Query对象,向这个对象填充query statement和configuration。然后进入__query_with_result()函数通过imp_service.query(query)提交query。注意ImpalaService都是异步的,提交之后返回一个QueryHandle,然后就是在一个while循环里不断__get_query_state()查询状态。如果发现这个SQL的状态是FINISHED,那么就通过fetch() RPC获取结果。
Statestored进程对外提供StateStoreService RPC服务,而StateStoreSubscriberService RPC服务是在impalad进程中提供的。StateStoreService这个RPC的逻辑实现是在StateStore这个类里面实现的。
Statestored收到backend发送的RegisterService RPC请求时,调用StateStore::RegisterService()处理,主要做两件事:
(1)根据TRegisterServiceRequest提供的service_id把该service加入StateStore.service_instances_。
通常在整个impala集群只存在名为“impala_backend_service”这一个服务,所以service_id=”impala_backend_service”。而每个backend捆绑的
(2)Impalad backend在向statestored RegisterService的时候,会把subscriber_address发送过去。在statestored端,会根据这个subscriber_address生成对应的Subscriber对象(表示与该Subscriber捆绑的backend)。把与该backend绑定的Subscriber加入StateStore.subscribers_这个map里。每个Subscriber有个唯一的id,这样分布在集群内的impala backend就有了全局唯一id了。
这样如果以后某个backend/StateStoreSubscriber fail或者其中运行的SQL任务出了问题,在statestored这里就会有体现了,那么就会通知给其他相关的backend。
那么每个backend是怎么update的呢?StateStore::UpdateLoop()负责定期向各个backend推送其所订阅的service的所有成员的更新,目前的更新策略是全量更新,未来会考虑增量更新。
Impalad进程的服务被wrapper在ImpalaServer这个类中。ImpalaServer包括fe和be的功能,实现了ImpalaService(Beeswax), ImpalaHiveServer2Service(HiveServer2)和ImpalaInternelService API。
全局函数CreateImpalaServer()创建了一个ImpalaServer其中包含了多个ThriftServer:
(1)创建一个名为beeswax_server的ThriftServer对系统外提供ImpalaService(Beeswax)服务,主要服务于Query查询,是fe/frontend的核心服务,端口21000
(2)创建一个名为hs2_server的ThriftServer对系统外提供ImpalaHiveServer2Service服务,提供Query, DML, DDL相关操作,端口21050
(3)创建一个名为be_server的ThriftServer对系统内其他impalad提供ImpalaInternalService,端口22000
(4)创建ImpalaServer对象,前面三个ThriftServer的TProcessor被赋值这个ImpalaServer对象,所以对前面三个Thrift服务的RPC请求都交由这个ImpalaServer对象处理。最典型的例子就是我们通过Beeswax接口提交了一个BeeswaxService.query()请求,在impalad端的处理逻辑是由void ImpalaServer::query(QueryHandle& query_handle, const Query& query)这个函数(在impala-beeswax-server.cc中实现)完成的。
下面是impalad-main.cc的主函数:
int main(int argc, char** argv) { //参数解析,开启日志(基于Google gflags和glog) InitDaemon(argc, argv); LlvmCodeGen::InitializeLlvm(); // Enable Kerberos security if requested. if (!FLAGS_principal.empty()) { EXIT_IF_ERROR(InitKerberos("Impalad")); } //因为frontend, HBase等相关组件是由Java开发的,所以下面这几行都是初始化JNI相关的reference和method id JniUtil::InitLibhdfs(); EXIT_IF_ERROR(JniUtil::Init()); EXIT_IF_ERROR(HBaseTableScanner::Init()); EXIT_IF_ERROR(HBaseTableCache::Init()); InitFeSupport(); //ExecEnv类是impalad backend上Query/PlanFragment的执行环境。 //生成SubscriptionManager, SimpleScheduler和各种Cache ExecEnv exec_env; //生成Beeswax, hive-server2和backend三种ThriftServer用于接收client请求,不过这三种服务的后端真正的处理逻辑都是ImpalaServer* server这个对象。 ThriftServer* beeswax_server = NULL; ThriftServer* hs2_server = NULL; ThriftServer* be_server = NULL; ImpalaServer* server = CreateImpalaServer(&exec_env, FLAGS_fe_port, FLAGS_hs2_port, FLAGS_be_port, &beeswax_server, &hs2_server, &be_server); //因为be_server是对系统内提供服务的,先启动它。 be_server->Start(); //这里面关键是启动了SubscriptionManager和Scheduler Status status = exec_env.StartServices(); if (!status.ok()) { LOG(ERROR) RegisterService(IMPALA_SERVICE_ID, host_port); unordered_set services; services.insert(IMPALA_SERVICE_ID); //注册callback函数,每当StateStoreSubscriber接收到来自statestored的update之后调用该函数。 cb.reset(new SubscriptionManager::UpdateCallback( bind(mem_fn(&ImpalaServer::MembershipCallback), server, _1))); exec_env.subscription_mgr()->RegisterSubscription(services, "impala.server", cb.get()); if (!status.ok()) { LOG(ERROR) Start(); hs2_server->Start(); beeswax_server->Join(); hs2_server->Join(); delete be_server; delete beeswax_server; delete hs2_server; }
exec_env.StartServices()调用SubscriptionManager.Start(),进一步调用StateStoreSubscriber.Start()启动一个ThriftServer。
StateStoreSubscriber实现了StateStoreSubscriberService(StateStoreSubscriberService.thrift中定义),用于接收来自statestored的update,并把与这个StateStoreSubscriber捆绑的backend的update反馈给statestored。这样这个backend就可以对其他backend可见,这样就可以接受其他impala backend发来的任务更新了(当然,接收backend更新是通过statestored中转的)。
参考文献:
http://www.sizeofvoid.net/wp-content/uploads/ImpalaIntroduction2.pdf
原文地址:Impala源代码分析(1)-Impala架构和RPC, 感谢原作者分享。