本節內容和前節事件管理封裝是息息相關的,本節內容主要包含的程式碼在connection{.h, .cc}。
這裡面最主要的有兩個類:connection類和tcpsconn類,connetion類主要服務於單一套接字,包括套接字上的資料讀取寫入等,而tcpsconn類則是服務於套接字集合,如接收連接,更新失效套接字等。具體我們看頭檔。
<span style="color: #0000ff;">class</span><span style="color: #000000;"> chanmgr { </span><span style="color: #0000ff;">public</span><span style="color: #000000;">: </span><span style="color: #0000ff;">virtual</span> <span style="color: #0000ff;">bool</span> got_pdu(connection *c, <span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> sz) = <span style="color: #800080;">0</span><span style="color: #000000;">; </span><span style="color: #0000ff;">virtual</span> ~<span style="color: #000000;">chanmgr() {} };</span>
我們首先看到的是這個虛基類類,這個類會以委託的形式用在connection和tcpsconn類中,它只有一個方法即got_pdu,它在RPC實現中扮演著重要角色,後面使用的時候會再次介紹它。
connection類
<span style="color: #008080;"> 1</span> <span style="color: #0000ff;">class</span> connection : <span style="color: #0000ff;">public</span><span style="color: #000000;"> aio_callback { </span><span style="color: #008080;"> 2</span> <span style="color: #0000ff;">public</span><span style="color: #000000;">: </span><span style="color: #008080;"> 3</span> <span style="color: #008000;">//</span><span style="color: #008000;">内部buffer类,主要用于接收/写入数据的buffer</span> <span style="color: #008080;"> 4</span> <span style="color: #0000ff;">struct</span><span style="color: #000000;"> charbuf { </span><span style="color: #008080;"> 5</span> charbuf(): buf(NULL), sz(<span style="color: #800080;">0</span>), solong(<span style="color: #800080;">0</span><span style="color: #000000;">) {} </span><span style="color: #008080;"> 6</span> charbuf (<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> s) : buf(b), sz(s), solong(<span style="color: #800080;">0</span><span style="color: #000000;">){} </span><span style="color: #008080;"> 7</span> <span style="color: #0000ff;">char</span> *<span style="color: #000000;">buf; </span><span style="color: #008080;"> 8</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz; </span><span style="color: #008080;"> 9</span> <span style="color: #0000ff;">int</span> solong; <span style="color: #008000;">//</span><span style="color: #008000;">amount of bytes written or read so far</span> <span style="color: #008080;">10</span> <span style="color: #000000;"> }; </span><span style="color: #008080;">11</span> <span style="color: #008000;">//</span><span style="color: #008000;">m1: chanmgr, f1: socket or file, </span> <span style="color: #008080;">12</span> connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #008080;">13</span> ~<span style="color: #000000;">connection(); </span><span style="color: #008080;">14</span> <span style="color: #008080;">15</span> <span style="color: #0000ff;">int</span> channo() { <span style="color: #0000ff;">return</span><span style="color: #000000;"> fd_; } </span><span style="color: #008080;">16</span> <span style="color: #0000ff;">bool</span><span style="color: #000000;"> isdead(); </span><span style="color: #008080;">17</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> closeconn(); </span><span style="color: #008080;">18</span> <span style="color: #008080;">19</span> <span style="color: #0000ff;">bool</span> send(<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz); </span><span style="color: #008080;">20</span> <span style="color: #0000ff;">void</span> write_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s); </span><span style="color: #008080;">21</span> <span style="color: #0000ff;">void</span> read_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s); </span><span style="color: #008080;">22</span> <span style="color: #008000;">//</span><span style="color: #008000;">增加/减少引用计数</span> <span style="color: #008080;">23</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> incref(); </span><span style="color: #008080;">24</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> decref(); </span><span style="color: #008080;">25</span> <span style="color: #0000ff;">int</span> <span style="color: #0000ff;">ref</span><span style="color: #000000;">(); </span><span style="color: #008080;">26</span> <span style="color: #008080;">27</span> <span style="color: #0000ff;">int</span> compare(connection *<span style="color: #000000;">another); </span><span style="color: #008080;">28</span> <span style="color: #0000ff;">private</span><span style="color: #000000;">: </span><span style="color: #008080;">29</span> <span style="color: #008080;">30</span> <span style="color: #0000ff;">bool</span><span style="color: #000000;"> readpdu(); </span><span style="color: #008080;">31</span> <span style="color: #0000ff;">bool</span><span style="color: #000000;"> writepdu(); </span><span style="color: #008080;">32</span> <span style="color: #008080;">33</span> chanmgr *<span style="color: #000000;">mgr_; </span><span style="color: #008080;">34</span> <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> fd_; </span><span style="color: #008080;">35</span> <span style="color: #0000ff;">bool</span><span style="color: #000000;"> dead_; </span><span style="color: #008080;">36</span> <span style="color: #008080;">37</span> charbuf wpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">write pdu</span> <span style="color: #008080;">38</span> charbuf rpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">read pdu</span> <span style="color: #008080;">39</span> <span style="color: #008080;">40</span> <span style="color: #0000ff;">struct</span><span style="color: #000000;"> timeval create_time_; </span><span style="color: #008080;">41</span> <span style="color: #008080;">42</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> waiters_; </span><span style="color: #008080;">43</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> refno_; </span><span style="color: #008080;">44</span> <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_; </span><span style="color: #008080;">45</span> <span style="color: #008080;">46</span> <span style="color: #000000;"> pthread_mutex_t m_; </span><span style="color: #008080;">47</span> pthread_mutex_t ref_m_; <span style="color: #008000;">//</span><span style="color: #008000;">保护更新引用计数的安全性</span> <span style="color: #008080;">48</span> <span style="color: #000000;"> pthread_cond_t send_complete_; </span><span style="color: #008080;">49</span> <span style="color: #000000;"> pthread_cond_t send_wait_; </span><span style="color: #008080;">50</span> };
這段程式碼即是connetion類別的定義,它繼承至aio_callback,在上一節說過,aio_callback在事件管理類別中作為回呼類,讀取或寫入數據,現在connection類就相當於一個回呼類別。
我們從connection的構造函數中便可以得知。
connection::connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span><span style="color: #000000;"> l1) : mgr_(m1), fd_(f1), dead_(</span><span style="color: #0000ff;">false</span>),waiters_(<span style="color: #800080;">0</span>), refno_(<span style="color: #800080;">1</span><span style="color: #000000;">),lossy_(l1) { </span><span style="color: #0000ff;">int</span> flags =<span style="color: #000000;"> fcntl(fd_, F_GETFL, NULL); flags </span>|= O_NONBLOCK; <span style="color: #008000;">//</span><span style="color: #008000;">no blocking</span> <span style="color: #000000;"> fcntl(fd_, F_SETFL, flags); </span><span style="color: #008000;">//</span><span style="color: #008000;">ignore信号</span> <span style="color: #000000;"> signal(SIGPIPE, SIG_IGN); VERIFY(pthread_mutex_init(</span>&m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(pthread_mutex_init(</span>&ref_m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(pthread_cond_init(</span>&send_wait_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(pthread_cond_init(</span>&send_complete_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(gettimeofday(</span>&create_time_, NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #008000;">//</span><span style="color: #008000;">事件管理类将本类作为回调类添加到相应的事件管理数组中</span> PollMgr::Instance()->add_callback(fd_, CB_RDONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">); }</span>
那這個類別的具體作用是啥呢?其實它就是用於在給定套接字上通信用的,對於發送數據,會發送直到數據發送完成為止,未發送完成則會將該事件添加到事件管理中,在下一輪事件循環中繼續發送,這一點我們可以從send函數看出:
<span style="color: #0000ff;">bool</span><span style="color: #000000;"> connection::send(</span><span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz) { ScopedLock ml(</span>&<span style="color: #000000;">m_); waiters_</span>++<span style="color: #000000;">; </span><span style="color: #008000;">//</span><span style="color: #008000;">当活着,且write pdu中还有数据时等待数据清空(发送完)</span> <span style="color: #0000ff;">while</span> (!dead_ &&<span style="color: #000000;"> wpdu_.buf) { VERIFY(pthread_cond_wait(</span>&send_wait_, &m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); } waiters_</span>--<span style="color: #000000;">; </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) { </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">false</span><span style="color: #000000;">; } wpdu_.buf </span>=<span style="color: #000000;"> b; wpdu_.sz </span>=<span style="color: #000000;"> sz; wpdu_.solong </span>= <span style="color: #800080;">0</span><span style="color: #000000;">; </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (lossy_) { </span><span style="color: #0000ff;">if</span> ((random()%<span style="color: #800080;">100</span>) <<span style="color: #000000;"> lossy_) { jsl_log(JSL_DBG_1, </span><span style="color: #800000;">"</span><span style="color: #800000;">connection::send LOSSY TEST shutdown fd_ %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, fd_); shutdown(fd_,SHUT_RDWR); } } </span><span style="color: #008000;">//</span><span style="color: #008000;">发送失败时</span> <span style="color: #0000ff;">if</span> (!<span style="color: #000000;">writepdu()) { dead_ </span>= <span style="color: #0000ff;">true</span><span style="color: #000000;">; VERIFY(pthread_mutex_unlock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">); PollMgr::Instance()</span>-><span style="color: #000000;">block_remove_fd(fd_); VERIFY(pthread_mutex_lock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">); }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{ </span><span style="color: #0000ff;">if</span> (wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz) { }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{ </span><span style="color: #008000;">//</span><span style="color: #008000;">should be rare to need to explicitly add write callback </span><span style="color: #008000;">//</span><span style="color: #008000;">这会继续写,因为这会添加本类(回调),然后调用里面的回调函数write_cb, </span><span style="color: #008000;">//</span><span style="color: #008000;">就像是一个递归</span> PollMgr::Instance()->add_callback(fd_, CB_WRONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">); </span><span style="color: #0000ff;">while</span> (!dead_ && wpdu_.solong >= <span style="color: #800080;">0</span> && wpdu_.solong <<span style="color: #000000;"> wpdu_.sz) { VERIFY(pthread_cond_wait(</span>&send_complete_,&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">); } } } </span><span style="color: #008000;">//</span><span style="color: #008000;">清空写buffer</span> <span style="color: #0000ff;">bool</span> ret = (!dead_ && wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz); wpdu_.solong </span>= wpdu_.sz = <span style="color: #800080;">0</span><span style="color: #000000;">; wpdu_.buf </span>=<span style="color: #000000;"> NULL; </span><span style="color: #0000ff;">if</span> (waiters_ > <span style="color: #800080;">0</span><span style="color: #000000;">) pthread_cond_broadcast(</span>&send_wait_); <span style="color: #008000;">//</span><span style="color: #008000;">唤醒上面的等待</span> <span style="color: #0000ff;">return</span><span style="color: #000000;"> ret; }</span>
對於讀取數據,則當rpdu_(read buffer)未滿時繼續讀,讀取完成後就是用chanmgr類的got_pdu處理讀取後的數據。
注意發送資料/接收資料都會先發送資料大小/接收資料大小,然後再做後續發送資料/接收資料的工作。
除了connection類別的發送/接收資料的功能外,我們還看到一個私有變數refno_變量,該變數的作用是用於引用計數,引用計數是一種很常見的程式設計技巧,例如在python中,引用計數用於物件的管理,當引用計數為0時,物件就會銷毀,這裡的引用計數也是同樣的道理,這一點可以從decref函數得知
<span style="color: #0000ff;">void</span><span style="color: #000000;"> connection::decref() { VERIFY(pthread_mutex_lock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); refno_ </span>--<span style="color: #000000;">; VERIFY(refno_</span>>=<span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #008000;">//</span><span style="color: #008000;">当引用计数为0时,销毁对象</span> <span style="color: #0000ff;">if</span> (refno_==<span style="color: #800080;">0</span><span style="color: #000000;">) { VERIFY(pthread_mutex_lock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) { VERIFY(pthread_mutex_unlock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #0000ff;">delete</span> <span style="color: #0000ff;">this</span><span style="color: #000000;">; </span><span style="color: #0000ff;">return</span><span style="color: #000000;">; } VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); } pthread_mutex_unlock(</span>&<span style="color: #000000;">ref_m_); }</span>
tcpscon類:
這個類別是用來管理connection的,我們先來看它的定義
<span style="color: #008000;">/*</span><span style="color: #008000;">* * 管理客户连接,将连接放入一个map中map<int, connection*> * </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> tcpsconn { </span><span style="color: #0000ff;">public</span><span style="color: #000000;">: tcpsconn(chanmgr </span>*m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">); </span>~<span style="color: #000000;">tcpsconn(); </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> accept_conn(); </span><span style="color: #0000ff;">private</span><span style="color: #000000;">: pthread_mutex_t m_; pthread_t th_; </span><span style="color: #0000ff;">int</span> pipe_[<span style="color: #800080;">2</span><span style="color: #000000;">]; </span><span style="color: #0000ff;">int</span> tcp_; <span style="color: #008000;">//</span><span style="color: #008000;">file desciptor for accepting connection</span> chanmgr *<span style="color: #000000;">mgr_; </span><span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_; std::map</span><<span style="color: #0000ff;">int</span>, connection *><span style="color: #000000;"> conns_; </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> process_accept(); };</span>
可看到裡面定義了一個map,該map的key其實是connection類別指針對應的套接字,我們看建構子實作
tcpsconn::tcpsconn(chanmgr *m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossytest) : mgr_(m1), lossy_(lossytest) { VERIFY(pthread_mutex_init(</span>&m_,NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #0000ff;">struct</span><span style="color: #000000;"> sockaddr_in sin; memset(</span>&sin, <span style="color: #800080;">0</span>, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(sin)); sin.sin_family </span>=<span style="color: #000000;"> AF_INET; sin.sin_port </span>=<span style="color: #000000;"> htons(port); tcp_ </span>= socket(AF_INET, SOCK_STREAM, <span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #0000ff;">if</span>(tcp_ < <span style="color: #800080;">0</span><span style="color: #000000;">){ perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn accept_loop socket:</span><span style="color: #800000;">"</span><span style="color: #000000;">); VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">); } </span><span style="color: #0000ff;">int</span> yes = <span style="color: #800080;">1</span><span style="color: #000000;">; </span><span style="color: #008000;">//</span><span style="color: #008000;">设置TCP参数, reuseaddr, nodelay</span> setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, </span>&yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes)); </span><span style="color: #0000ff;">if</span>(bind(tcp_, (sockaddr *)&sin, <span style="color: #0000ff;">sizeof</span>(sin)) < <span style="color: #800080;">0</span><span style="color: #000000;">){ perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop tcp bind:</span><span style="color: #800000;">"</span><span style="color: #000000;">); VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">); } </span><span style="color: #0000ff;">if</span>(listen(tcp_, <span style="color: #800080;">1000</span>) < <span style="color: #800080;">0</span><span style="color: #000000;">) { perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen:</span><span style="color: #800000;">"</span><span style="color: #000000;">); VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">); } jsl_log(JSL_DBG_2, </span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen on %d %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, port, sin.sin_port); </span><span style="color: #0000ff;">if</span> (pipe(pipe_) < <span style="color: #800080;">0</span><span style="color: #000000;">) { perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop pipe:</span><span style="color: #800000;">"</span><span style="color: #000000;">); VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">); } </span><span style="color: #0000ff;">int</span> flags = fcntl(pipe_[<span style="color: #800080;">0</span><span style="color: #000000;">], F_GETFL, NULL); flags </span>|=<span style="color: #000000;"> O_NONBLOCK; fcntl(pipe_[</span><span style="color: #800080;">0</span>], F_SETFL, flags); <span style="color: #008000;">//</span><span style="color: #008000;">无阻塞管道</span> <span style="color: #000000;"> VERIFY((th_ </span>= method_thread(<span style="color: #0000ff;">this</span>, <span style="color: #0000ff;">false</span>, &tcpsconn::accept_conn)) != <span style="color: #800080;">0</span><span style="color: #000000;">); }</span>
這個建構函式主要是初始化伺服器端連接,然後建立一個執行緒來等待客戶端的連接,後面處理客戶端連接時,會將連接的客戶端套接字加入到conns_的map中,也就是建立套接字到connection指針的對應關係,然後遍歷conns_,清除死亡的connection,從而達到及時處理死亡連結的效果。