在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。 所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的执行过
在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。
所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的执行过程。
1、事件的产生
NIOReactor的R线程一直在监听selector上的每个连接的感兴趣事件是否发生,当客户端发送了一条select * from tb1,select函数会返回,然后获取到该连接SelectionKey,并且该SelectKey的兴趣事件是OP_READ。此时会调用read(NIOConnection)函数。
01 |
public <code class="Brush keyword">void <code class="Brush plain">run() { |
02 |
<code class="Brush keyword">final <code class="Brush plain">Selector selector = <code class="Brush keyword">this<code class="Brush plain">.selector; |
03 |
<code class="Brush keyword">for <code class="Brush plain">(;;) { |
04 |
<code class="Brush plain">++reactCount; |
05 |
<code class="Brush keyword">try <code class="Brush plain">{ |
06 |
<code class="Brush keyword">int <code class="Brush plain">res = selector.select(); |
07 |
<code class="Brush plain">LOGGER.debug(reactCount + <code class="Brush string">">>NIOReactor接受连接数:" <code class="Brush plain">+ res); |
08 |
<code class="Brush plain">register(selector); |
09 |
<code class="Brush plain">Set<selectionkey> keys = selector.selectedKeys();</selectionkey> |
10 |
<code class="Brush keyword">try <code class="Brush plain">{ |
11 |
<code class="Brush keyword">for <code class="Brush plain">(SelectionKey key : keys) { |
12 |
<code class="Brush plain">Object att = key.attachment(); |
13 |
<code class="Brush keyword">if <code class="Brush plain">(att != <code class="Brush keyword">null <code class="Brush plain">&& key.isValid()) { |
14 |
<code class="Brush keyword">int <code class="Brush plain">readyOps = key.readyOps(); |
15 |
<code class="Brush keyword">if <code class="Brush plain">((readyOps & SelectionKey.OP_READ) != <code class="Brush value">0<code class="Brush plain">) { |
16 |
<code class="Brush plain">LOGGER.debug(<code class="Brush string">"select读事件"<code class="Brush plain">); |
17 |
<code class="Brush plain">read((NIOConnection) att); |
18 |
<code class="Brush plain">.............................. |
19 |
<code class="Brush plain">} |
20 |
<code class="Brush plain">........................... |
21 |
<code class="Brush plain">} |
22 |
<code class="Brush plain">} .................. |
23 |
<code class="Brush plain">} ............ |
24 |
<code class="Brush plain">} |
25 |
<code class="Brush plain">} |
2、调用该连接的read函数进行处理
该函数在上一篇中提到过,该函数的实现在AbstractConnection中,实现从channel中读取数据到缓冲区,然后从缓冲区完整的取出整包数据交给FrontendConnection类的handle()函数处理。
该函数交给processor进行异步处理。从processor中的线程池获取一个线程来执行该任务。这里调用具体的handler来进行处理。
刚开始提到的,当认证成功后,Cobar将连接的回调处理函数设置为FrontendCommandHandler。所以这里会调用前端命令处理器的handler函数进行数据的处理。
在这里需要先了解MySQL数据包的格式:
MySQL客户端命令请求报文
该处理函数如下:
01 |
public <code class="Brush keyword">void <code class="Brush plain">handle(<code class="Brush keyword">byte<code class="Brush plain">[] data) { |
02 |
<code class="Brush plain">LOGGER.info(<code class="Brush string">"data[4]:"<code class="Brush plain">+data[<code class="Brush value">4<code class="Brush plain">]); |
03 |
<code class="Brush keyword">switch <code class="Brush plain">(data[<code class="Brush value">4<code class="Brush plain">]) { |
04 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_INIT_DB: |
05 |
<code class="Brush plain">commands.doInitDB(); |
06 |
<code class="Brush plain">source.initDB(data); |
07 |
<code class="Brush keyword">break<code class="Brush plain">; |
08 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_QUERY: |
09 |
<code class="Brush plain">commands.doQuery(); |
10 |
<code class="Brush plain">source.query(data); |
11 |
<code class="Brush keyword">break<code class="Brush plain">; |
12 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_PING: |
13 |
<code class="Brush plain">commands.doPing(); |
14 |
<code class="Brush plain">source.ping(); |
15 |
<code class="Brush keyword">break<code class="Brush plain">; |
16 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_QUIT: |
17 |
<code class="Brush plain">commands.doQuit(); |
18 |
<code class="Brush plain">source.close(); |
19 |
<code class="Brush keyword">break<code class="Brush plain">; |
20 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_PROCESS_KILL: |
21 |
<code class="Brush plain">commands.doKill(); |
22 |
<code class="Brush plain">source.kill(data); |
23 |
<code class="Brush keyword">break<code class="Brush plain">; |
24 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_STMT_PREPARE: |
25 |
<code class="Brush plain">commands.doStmtPrepare(); |
26 |
<code class="Brush plain">source.stmtPrepare(data); |
27 |
<code class="Brush keyword">break<code class="Brush plain">; |
28 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_STMT_EXECUTE: |
29 |
<code class="Brush plain">commands.doStmtExecute(); |
30 |
<code class="Brush plain">source.stmtExecute(data); |
31 |
<code class="Brush keyword">break<code class="Brush plain">; |
32 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_STMT_CLOSE: |
33 |
<code class="Brush plain">commands.doStmtClose(); |
34 |
<code class="Brush plain">source.stmtClose(data); |
35 |
<code class="Brush keyword">break<code class="Brush plain">; |
36 |
<code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_HEARTBEAT: |
37 |
<code class="Brush plain">commands.doHeartbeat(); |
38 |
<code class="Brush plain">source.heartbeat(data); |
39 |
<code class="Brush keyword">break<code class="Brush plain">; |
40 |
<code class="Brush keyword">default<code class="Brush plain">: |
41 |
<code class="Brush plain">commands.doOther(); |
42 |
<code class="Brush plain">source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, <code class="Brush string">"Unknown command"<code class="Brush plain">); |
43 |
<code class="Brush plain">} |
44 |
<code class="Brush plain">} |
由于每个报文都有消息头,消息头固定的是4个字节,前3个字节是消息长度,后面的一个字节是报文序号,如下所示
所以data[4]是第五个字节。也就是消息体的第一个字节。客户端向Cobar端发送的是命令报文,第一个字节是具体的命令。
如果是select语句,那么data[4]就是COM_QUERY,然后会调用具体连接的query成员函数,其定义在FrontendConnection类中。
01 |
public <code class="Brush keyword">void <code class="Brush plain">query(<code class="Brush keyword">byte<code class="Brush plain">[] data) { |
02 |
<code class="Brush keyword">if <code class="Brush plain">(queryHandler != <code class="Brush keyword">null<code class="Brush plain">) { |
03 |
<code class="Brush comments">// 取得语句 |
04 |
<code class="Brush plain">MySQLMessage mm = <code class="Brush keyword">new <code class="Brush plain">MySQLMessage(data); |
05 |
<code class="Brush plain">mm.position(<code class="Brush value">5<code class="Brush plain">); |
06 |
<code class="Brush plain">String sql = <code class="Brush keyword">null<code class="Brush plain">; |
07 |
<code class="Brush keyword">try <code class="Brush plain">{ |
08 |
<code class="Brush plain">sql = mm.readString(charset); |
09 |
<code class="Brush plain">} catch (UnsupportedEncodingException e) { |
10 |
<code class="Brush plain">writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, <code class="Brush string">"Unknown charset '" <code class="Brush plain">+ charset + <code class="Brush string">"'"<code class="Brush plain">); |
11 |
<code class="Brush keyword">return<code class="Brush plain">; |
12 |
<code class="Brush plain">} |
13 |
<code class="Brush keyword">if <code class="Brush plain">(sql == <code class="Brush keyword">null <code class="Brush plain">|| sql.length() == <code class="Brush value">0<code class="Brush plain">) { |
14 |
<code class="Brush plain">writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, <code class="Brush string">"Empty SQL"<code class="Brush plain">); |
15 |
<code class="Brush keyword">return<code class="Brush plain">; |
16 |
<code class="Brush plain">} |
17 |
<code class="Brush plain">LOGGER.debug(<code class="Brush string">"解析的SQL语句:"<code class="Brush plain">+sql); |
18 |
<code class="Brush comments">// 执行查询 |
19 |
<code class="Brush plain">queryHandler.query(sql); |
20 |
<code class="Brush plain">} else { |
21 |
<code class="Brush plain">writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, <code class="Brush string">"Query unsupported!"<code class="Brush plain">); |
22 |
<code class="Brush plain">} |
23 |
<code class="Brush plain">} |
首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的所有的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。
查询的时候Cobar控制台输出如下内容:
11:35:33,392 INFO data[4]:3
解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:
01 |
public <code class="Brush keyword">void <code class="Brush plain">query(String sql) { |
02 |
<code class="Brush comments">//这里就得到了完整的SQL语句,接收自客户端 |
03 |
<code class="Brush plain">ServerConnection c = <code class="Brush keyword">this<code class="Brush plain">.source; |
04 |
<code class="Brush keyword">if <code class="Brush plain">(LOGGER.isDebugEnabled()) { |
05 |
<code class="Brush plain">LOGGER.debug(<code class="Brush keyword">new <code class="Brush plain">StringBuilder().append(c).append(sql).toString()); |
06 |
<code class="Brush plain">} |
07 |
<code class="Brush comments">//该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,执行相应的操作 |
08 |
<code class="Brush keyword">int <code class="Brush plain">rs = ServerParse.parse(sql); |
09 |
<code class="Brush keyword">switch <code class="Brush plain">(rs & <code class="Brush value">0xff<code class="Brush plain">) { |
10 |
<code class="Brush plain">....................... |
11 |
<code class="Brush keyword">case <code class="Brush plain">ServerParse.SELECT: |
12 |
<code class="Brush comments">//select操作执行 |
13 |
<code class="Brush plain">SelectHandler.handle(sql, c, rs >>> <code class="Brush value">8<code class="Brush plain">); |
14 |
<code class="Brush keyword">break<code class="Brush plain">; |
15 |
<code class="Brush plain">....................... |
16 |
<code class="Brush plain">} |
17 |
<code class="Brush plain">} |
首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。
如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql
c.execute(stmt, ServerParse.SELECT);
在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。
2 |
RouteResultset rrs = <code class="Brush keyword">null<code class="Brush plain">; |
3 |
<code class="Brush keyword">try <code class="Brush plain">{ |
4 |
<code class="Brush plain">rrs = ServerRouter.route(schema, sql, <code class="Brush keyword">this<code class="Brush plain">.charset, <code class="Brush keyword">this<code class="Brush plain">); |
5 |
<code class="Brush plain">LOGGER.debug(<code class="Brush string">"路由计算结果:"<code class="Brush plain">+rrs.toString()); |
6 |
<code class="Brush plain">} |
具体的路由算法也是比较复杂,以后会专门分析。
Cobar的DEBUG控制台输出路由的计算结果如下:
11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={
该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。
经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。
01 |
private <code class="Brush keyword">void <code class="Brush plain">execute0(RouteResultsetNode rrn, Channel c, <code class="Brush keyword">boolean <code class="Brush plain">autocommit, BlockingSession ss, <code class="Brush keyword">int <code class="Brush plain">flag) { |
02 |
<code class="Brush plain">ServerConnection sc = ss.getSource(); |
03 |
<code class="Brush plain">......................... |
04 |
<code class="Brush keyword">try <code class="Brush plain">{ |
05 |
<code class="Brush comments">// 执行并等待返回 |
06 |
<code class="Brush plain">BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit); |
07 |
<code class="Brush comments">// 接收和处理数据,执行到这里就说明上面的执行已经得到执行结果的返回 |
08 |
<code class="Brush keyword">final <code class="Brush plain">ReentrantLock lock = MultiNodeExecutor.<code class="Brush keyword">this<code class="Brush plain">.lock; |
09 |
<code class="Brush plain">lock.lock(); |
10 |
<code class="Brush keyword">try <code class="Brush plain">{ |
11 |
<code class="Brush keyword">switch <code class="Brush plain">(bin.data[<code class="Brush value">0<code class="Brush plain">]) { |
12 |
<code class="Brush keyword">case <code class="Brush plain">ErrorPacket.FIELD_COUNT: |
13 |
<code class="Brush plain">c.setRunning(<code class="Brush keyword">false<code class="Brush plain">); |
14 |
<code class="Brush plain">handleFailure(ss, rrn, <code class="Brush keyword">new <code class="Brush plain">BinaryErrInfo((MySQLChannel) c, bin, sc, rrn)); |
15 |
<code class="Brush keyword">break<code class="Brush plain">; |
16 |
<code class="Brush keyword">case <code class="Brush plain">OkPacket.FIELD_COUNT: |
17 |
<code class="Brush plain">OkPacket ok = <code class="Brush keyword">new <code class="Brush plain">OkPacket(); |
18 |
<code class="Brush plain">ok.read(bin); |
19 |
<code class="Brush plain">affectedRows += ok.affectedRows; |
20 |
<code class="Brush comments">// set lastInsertId |
21 |
<code class="Brush keyword">if <code class="Brush plain">(ok.insertId > <code class="Brush value">0<code class="Brush plain">) { |
22 |
<code class="Brush plain">insertId = (insertId == <code class="Brush value">0<code class="Brush plain">) ? ok.insertId : Math.min(insertId, ok.insertId); |
23 |
<code class="Brush plain">} |
24 |
<code class="Brush plain">c.setRunning(<code class="Brush keyword">false<code class="Brush plain">); |
25 |
<code class="Brush plain">handleSuccessOK(ss, rrn, autocommit, ok); |
26 |
<code class="Brush keyword">break<code class="Brush plain">; |
27 |
<code class="Brush keyword">default<code class="Brush plain">: |
28 |
<code class="Brush keyword">final <code class="Brush plain">MySQLChannel mc = (MySQLChannel) c; |
29 |
<code class="Brush keyword">if <code class="Brush plain">(fieldEOF) { |
30 |
<code class="Brush keyword">for <code class="Brush plain">(;;) { |
31 |
<code class="Brush plain">bin = mc.receive(); |
32 |
<code class="Brush keyword">switch <code class="Brush plain">(bin.data[<code class="Brush value">0<code class="Brush plain">]) { |
33 |
<code class="Brush keyword">case <code class="Brush plain">ErrorPacket.FIELD_COUNT: |
34 |
<code class="Brush plain">c.setRunning(<code class="Brush keyword">false<code class="Brush plain">); |
35 |
<code class="Brush plain">handleFailure(ss, rrn, <code class="Brush keyword">new <code class="Brush plain">BinaryErrInfo(mc, bin, sc, rrn)); |
36 |
<code class="Brush keyword">return<code class="Brush plain">; |
37 |
<code class="Brush keyword">case <code class="Brush plain">EOFPacket.FIELD_COUNT: |
38 |
<code class="Brush plain">handleRowData(rrn, c, ss); |
39 |
<code class="Brush keyword">return<code class="Brush plain">; |
40 |
<code class="Brush keyword">default<code class="Brush plain">: |
41 |
<code class="Brush keyword">continue<code class="Brush plain">; |
42 |
<code class="Brush plain">} |
43 |
<code class="Brush plain">} |
44 |
<code class="Brush plain">} else { |
45 |
<code class="Brush plain">bin.packetId = ++packetId;<code class="Brush comments">// HEADER |
46 |
<code class="Brush plain">List<mysqlpacket> headerList = <code class="Brush keyword">new <code class="Brush plain">LinkedList<mysqlpacket>();</mysqlpacket> |
47 |
<code class="Brush plain">headerList.add(bin); |
48 |
<code class="Brush keyword">for <code class="Brush plain">(;;) { |
49 |
<code class="Brush plain">bin = mc.receive(); |
50 |
<code class="Brush keyword">switch <code class="Brush plain">(bin.data[<code class="Brush value">0<code class="Brush plain">]) { |
51 |
<code class="Brush keyword">case <code class="Brush plain">ErrorPacket.FIELD_COUNT: |
52 |
<code class="Brush plain">c.setRunning(<code class="Brush keyword">false<code class="Brush plain">); |
53 |
<code class="Brush plain">handleFailure(ss, rrn, <code class="Brush keyword">new <code class="Brush plain">BinaryErrInfo(mc, bin, sc, rrn)); |
54 |
<code class="Brush keyword">return<code class="Brush plain">; |
55 |
<code class="Brush keyword">case <code class="Brush plain">EOFPacket.FIELD_COUNT: |
56 |
<code class="Brush plain">bin.packetId = ++packetId;<code class="Brush comments">// FIELD_EOF |
57 |
<code class="Brush keyword">for <code class="Brush plain">(MySQLPacket packet : headerList) { |
58 |
<code class="Brush plain">buffer = packet.write(buffer, sc); |
59 |
<code class="Brush plain">} |
60 |
<code class="Brush plain">headerList = <code class="Brush keyword">null<code class="Brush plain">; |
61 |
<code class="Brush plain">buffer = bin.write(buffer, sc); |
62 |
<code class="Brush plain">fieldEOF = <code class="Brush keyword">true<code class="Brush plain">; |
63 |
<code class="Brush plain">handleRowData(rrn, c, ss); |
64 |
<code class="Brush keyword">return<code class="Brush plain">; |
65 |
<code class="Brush keyword">default<code class="Brush plain">: |
66 |
<code class="Brush plain">bin.packetId = ++packetId;<code class="Brush comments">// FIELDS |
67 |
<code class="Brush keyword">switch <code class="Brush plain">(flag) { |
68 |
<code class="Brush keyword">case <code class="Brush plain">RouteResultset.REWRITE_FIELD: |
69 |
<code class="Brush plain">StringBuilder fieldName = <code class="Brush keyword">new <code class="Brush plain">StringBuilder(); |
70 |
<code class="Brush plain">fieldName.append(<code class="Brush string">"Tables_in_"<code class="Brush plain">).append(ss.getSource().getSchema()); |
71 |
<code class="Brush plain">FieldPacket field = PacketUtil.getField(bin, fieldName.toString()); |
72 |
<code class="Brush plain">headerList.add(field); |
73 |
<code class="Brush keyword">break<code class="Brush plain">; |
74 |
<code class="Brush keyword">default<code class="Brush plain">: |
75 |
<code class="Brush plain">headerList.add(bin); |
76 |
<code class="Brush plain">} |
77 |
<code class="Brush plain">} |
78 |
<code class="Brush plain">} |
79 |
<code class="Brush plain">} |
80 |
<code class="Brush plain">} |
81 |
<code class="Brush plain">} finally { |
82 |
<code class="Brush plain">lock.unlock(); |
83 |
<code class="Brush plain">} |
84 |
<code class="Brush plain">} |
85 |
<code class="Brush plain">} |
这里真正的执行SQL语句,然后等待后端执行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。
当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。
响应报文类型 |
第1个字节取值范围 |
OK 响应报文 |
0×00 |
Error 响应报文 |
0xFF |
Result Set 报文 |
0×01 – 0xFA |
Field 报文 |
0×01 – 0xFA |
Row Data 报文 |
0×01 – 0xFA |
EOF 报文 |
0xFE |
注:响应报文的第1个字节在不同类型中含义不同,比如在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。
Result Set 消息分为五部分,结构如下:
结构 |
说明 |
[Result Set Header] |
列数量 |
[Field] |
列信息(多个) |
[EOF] |
列结束 |
[Row Data] |
行数据(多个) |
[EOF] |
数据结束 |
函数执行完成后,返回的结果都放入LinkedList中,当读取结果完成后放入多节点执行器的缓冲区。如果buffer满了,就通过前端连接写出给客户端。