Der Inhalt dieses Abschnitts steht in engem Zusammenhang mit der Event-Management-Kapselung im vorherigen Abschnitt. Der in diesem Abschnitt enthaltene Hauptcode befindet sich in Verbindung{.h, .cc}.
Hier gibt es zwei Hauptklassen: die Verbindungsklasse und die tcpsconn-Klasse, die hauptsächlich einen einzelnen Socket bedient, einschließlich des Lesens und Schreibens von Daten auf dem Socket, während die tcpsconn-Klasse die Socket-Sammlung bedient, z. B. das Empfangen von Verbindungen. Aktualisieren ungültiger Sockets usw. Schauen wir uns insbesondere die Header-Datei an.
<span style="color: #0000ff;">class</span><span style="color: #000000;"> chanmgr { </span><span style="color: #0000ff;">public</span><span style="color: #000000;">: </span><span style="color: #0000ff;">virtual</span> <span style="color: #0000ff;">bool</span> got_pdu(connection *c, <span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> sz) = <span style="color: #800080;">0</span><span style="color: #000000;">; </span><span style="color: #0000ff;">virtual</span> ~<span style="color: #000000;">chanmgr() {} };</span>
Das erste, was wir sehen, ist diese virtuelle Basisklasse, die in den Verbindungs- und TCPCONN-Klassen in Form einer Delegation verwendet wird. Sie hat nur eine Methode, got_pdu, die eine wichtige Rolle bei der RPC-Implementierung spielt Wird später noch einmal verwendet.
Verbindungsklasse
<span style="color: #008080;"> 1</span> <span style="color: #0000ff;">class</span> connection : <span style="color: #0000ff;">public</span><span style="color: #000000;"> aio_callback { </span><span style="color: #008080;"> 2</span> <span style="color: #0000ff;">public</span><span style="color: #000000;">: </span><span style="color: #008080;"> 3</span> <span style="color: #008000;">//</span><span style="color: #008000;">内部buffer类,主要用于接收/写入数据的buffer</span> <span style="color: #008080;"> 4</span> <span style="color: #0000ff;">struct</span><span style="color: #000000;"> charbuf { </span><span style="color: #008080;"> 5</span> charbuf(): buf(NULL), sz(<span style="color: #800080;">0</span>), solong(<span style="color: #800080;">0</span><span style="color: #000000;">) {} </span><span style="color: #008080;"> 6</span> charbuf (<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> s) : buf(b), sz(s), solong(<span style="color: #800080;">0</span><span style="color: #000000;">){} </span><span style="color: #008080;"> 7</span> <span style="color: #0000ff;">char</span> *<span style="color: #000000;">buf; </span><span style="color: #008080;"> 8</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz; </span><span style="color: #008080;"> 9</span> <span style="color: #0000ff;">int</span> solong; <span style="color: #008000;">//</span><span style="color: #008000;">amount of bytes written or read so far</span> <span style="color: #008080;">10</span> <span style="color: #000000;"> }; </span><span style="color: #008080;">11</span> <span style="color: #008000;">//</span><span style="color: #008000;">m1: chanmgr, f1: socket or file, </span> <span style="color: #008080;">12</span> connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #008080;">13</span> ~<span style="color: #000000;">connection(); </span><span style="color: #008080;">14</span> <span style="color: #008080;">15</span> <span style="color: #0000ff;">int</span> channo() { <span style="color: #0000ff;">return</span><span style="color: #000000;"> fd_; } </span><span style="color: #008080;">16</span> <span style="color: #0000ff;">bool</span><span style="color: #000000;"> isdead(); </span><span style="color: #008080;">17</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> closeconn(); </span><span style="color: #008080;">18</span> <span style="color: #008080;">19</span> <span style="color: #0000ff;">bool</span> send(<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz); </span><span style="color: #008080;">20</span> <span style="color: #0000ff;">void</span> write_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s); </span><span style="color: #008080;">21</span> <span style="color: #0000ff;">void</span> read_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s); </span><span style="color: #008080;">22</span> <span style="color: #008000;">//</span><span style="color: #008000;">增加/减少引用计数</span> <span style="color: #008080;">23</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> incref(); </span><span style="color: #008080;">24</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> decref(); </span><span style="color: #008080;">25</span> <span style="color: #0000ff;">int</span> <span style="color: #0000ff;">ref</span><span style="color: #000000;">(); </span><span style="color: #008080;">26</span> <span style="color: #008080;">27</span> <span style="color: #0000ff;">int</span> compare(connection *<span style="color: #000000;">another); </span><span style="color: #008080;">28</span> <span style="color: #0000ff;">private</span><span style="color: #000000;">: </span><span style="color: #008080;">29</span> <span style="color: #008080;">30</span> <span style="color: #0000ff;">bool</span><span style="color: #000000;"> readpdu(); </span><span style="color: #008080;">31</span> <span style="color: #0000ff;">bool</span><span style="color: #000000;"> writepdu(); </span><span style="color: #008080;">32</span> <span style="color: #008080;">33</span> chanmgr *<span style="color: #000000;">mgr_; </span><span style="color: #008080;">34</span> <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> fd_; </span><span style="color: #008080;">35</span> <span style="color: #0000ff;">bool</span><span style="color: #000000;"> dead_; </span><span style="color: #008080;">36</span> <span style="color: #008080;">37</span> charbuf wpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">write pdu</span> <span style="color: #008080;">38</span> charbuf rpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">read pdu</span> <span style="color: #008080;">39</span> <span style="color: #008080;">40</span> <span style="color: #0000ff;">struct</span><span style="color: #000000;"> timeval create_time_; </span><span style="color: #008080;">41</span> <span style="color: #008080;">42</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> waiters_; </span><span style="color: #008080;">43</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> refno_; </span><span style="color: #008080;">44</span> <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_; </span><span style="color: #008080;">45</span> <span style="color: #008080;">46</span> <span style="color: #000000;"> pthread_mutex_t m_; </span><span style="color: #008080;">47</span> pthread_mutex_t ref_m_; <span style="color: #008000;">//</span><span style="color: #008000;">保护更新引用计数的安全性</span> <span style="color: #008080;">48</span> <span style="color: #000000;"> pthread_cond_t send_complete_; </span><span style="color: #008080;">49</span> <span style="color: #000000;"> pthread_cond_t send_wait_; </span><span style="color: #008080;">50</span> };
Dieser Code ist die Definition der Verbindungsklasse. Wie im vorherigen Abschnitt erwähnt, wird aio_callback als Rückrufklasse zum Lesen oder Schreiben von Daten verwendet eine Art Rückruf.
Wir können es vom Konstruktor der Verbindung erfahren.
connection::connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span><span style="color: #000000;"> l1) : mgr_(m1), fd_(f1), dead_(</span><span style="color: #0000ff;">false</span>),waiters_(<span style="color: #800080;">0</span>), refno_(<span style="color: #800080;">1</span><span style="color: #000000;">),lossy_(l1) { </span><span style="color: #0000ff;">int</span> flags =<span style="color: #000000;"> fcntl(fd_, F_GETFL, NULL); flags </span>|= O_NONBLOCK; <span style="color: #008000;">//</span><span style="color: #008000;">no blocking</span> <span style="color: #000000;"> fcntl(fd_, F_SETFL, flags); </span><span style="color: #008000;">//</span><span style="color: #008000;">ignore信号</span> <span style="color: #000000;"> signal(SIGPIPE, SIG_IGN); VERIFY(pthread_mutex_init(</span>&m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(pthread_mutex_init(</span>&ref_m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(pthread_cond_init(</span>&send_wait_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(pthread_cond_init(</span>&send_complete_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(gettimeofday(</span>&create_time_, NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #008000;">//</span><span style="color: #008000;">事件管理类将本类作为回调类添加到相应的事件管理数组中</span> PollMgr::Instance()->add_callback(fd_, CB_RDONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">); }</span>
Was ist also die spezifische Funktion dieser Klasse? Tatsächlich wird es zum Senden von Daten verwendet, bis die Daten gesendet werden. Wenn die Daten nicht gesendet werden, wird das Ereignis zur Ereignisverwaltung hinzugefügt und weiterhin gesendet In der nächsten Runde der Ereignisschleife können wir dies an der Sendefunktion erkennen:
<span style="color: #0000ff;">bool</span><span style="color: #000000;"> connection::send(</span><span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz) { ScopedLock ml(</span>&<span style="color: #000000;">m_); waiters_</span>++<span style="color: #000000;">; </span><span style="color: #008000;">//</span><span style="color: #008000;">当活着,且write pdu中还有数据时等待数据清空(发送完)</span> <span style="color: #0000ff;">while</span> (!dead_ &&<span style="color: #000000;"> wpdu_.buf) { VERIFY(pthread_cond_wait(</span>&send_wait_, &m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); } waiters_</span>--<span style="color: #000000;">; </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) { </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">false</span><span style="color: #000000;">; } wpdu_.buf </span>=<span style="color: #000000;"> b; wpdu_.sz </span>=<span style="color: #000000;"> sz; wpdu_.solong </span>= <span style="color: #800080;">0</span><span style="color: #000000;">; </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (lossy_) { </span><span style="color: #0000ff;">if</span> ((random()%<span style="color: #800080;">100</span>) <<span style="color: #000000;"> lossy_) { jsl_log(JSL_DBG_1, </span><span style="color: #800000;">"</span><span style="color: #800000;">connection::send LOSSY TEST shutdown fd_ %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, fd_); shutdown(fd_,SHUT_RDWR); } } </span><span style="color: #008000;">//</span><span style="color: #008000;">发送失败时</span> <span style="color: #0000ff;">if</span> (!<span style="color: #000000;">writepdu()) { dead_ </span>= <span style="color: #0000ff;">true</span><span style="color: #000000;">; VERIFY(pthread_mutex_unlock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">); PollMgr::Instance()</span>-><span style="color: #000000;">block_remove_fd(fd_); VERIFY(pthread_mutex_lock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">); }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{ </span><span style="color: #0000ff;">if</span> (wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz) { }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{ </span><span style="color: #008000;">//</span><span style="color: #008000;">should be rare to need to explicitly add write callback </span><span style="color: #008000;">//</span><span style="color: #008000;">这会继续写,因为这会添加本类(回调),然后调用里面的回调函数write_cb, </span><span style="color: #008000;">//</span><span style="color: #008000;">就像是一个递归</span> PollMgr::Instance()->add_callback(fd_, CB_WRONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">); </span><span style="color: #0000ff;">while</span> (!dead_ && wpdu_.solong >= <span style="color: #800080;">0</span> && wpdu_.solong <<span style="color: #000000;"> wpdu_.sz) { VERIFY(pthread_cond_wait(</span>&send_complete_,&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">); } } } </span><span style="color: #008000;">//</span><span style="color: #008000;">清空写buffer</span> <span style="color: #0000ff;">bool</span> ret = (!dead_ && wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz); wpdu_.solong </span>= wpdu_.sz = <span style="color: #800080;">0</span><span style="color: #000000;">; wpdu_.buf </span>=<span style="color: #000000;"> NULL; </span><span style="color: #0000ff;">if</span> (waiters_ > <span style="color: #800080;">0</span><span style="color: #000000;">) pthread_cond_broadcast(</span>&send_wait_); <span style="color: #008000;">//</span><span style="color: #008000;">唤醒上面的等待</span> <span style="color: #0000ff;">return</span><span style="color: #000000;"> ret; }</span>
Zum Lesen von Daten fahren Sie mit dem Lesen fort, wenn rpdu_ (Lesepuffer) nicht voll ist. Verwenden Sie nach Abschluss des Lesens got_pdu der chanmgr-Klasse, um die gelesenen Daten zu verarbeiten.
Beachten Sie, dass beim Senden von Daten/Empfangen von Daten zuerst die Datengröße gesendet/Datengröße empfangen wird und dann die anschließende Arbeit des Sendens/Empfangens von Daten ausgeführt wird.
Zusätzlich zur Funktion des Sendens/Empfangens von Daten der Verbindungsklasse sehen wir auch eine private Variable refno_, die für die Referenzzählung verwendet wird. Die Referenzzählung ist eine sehr verbreitete Programmiertechnik, beispielsweise die Referenzzählung in Python wird für die Objektverwaltung verwendet. Wenn der Referenzzähler 0 ist, gilt das Gleiche für die Referenzzählung, die hier aus der Decref-Funktion ersichtlich ist
<span style="color: #0000ff;">void</span><span style="color: #000000;"> connection::decref() { VERIFY(pthread_mutex_lock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); refno_ </span>--<span style="color: #000000;">; VERIFY(refno_</span>>=<span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #008000;">//</span><span style="color: #008000;">当引用计数为0时,销毁对象</span> <span style="color: #0000ff;">if</span> (refno_==<span style="color: #800080;">0</span><span style="color: #000000;">) { VERIFY(pthread_mutex_lock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) { VERIFY(pthread_mutex_unlock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #0000ff;">delete</span> <span style="color: #0000ff;">this</span><span style="color: #000000;">; </span><span style="color: #0000ff;">return</span><span style="color: #000000;">; } VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">); } pthread_mutex_unlock(</span>&<span style="color: #000000;">ref_m_); }</span>
Diese Klasse wird zum Verwalten von Verbindungen verwendet. Schauen wir uns zunächst ihre Definition an
<span style="color: #008000;">/*</span><span style="color: #008000;">* * 管理客户连接,将连接放入一个map中map<int, connection*> * </span><span style="color: #008000;">*/</span> <span style="color: #0000ff;">class</span><span style="color: #000000;"> tcpsconn { </span><span style="color: #0000ff;">public</span><span style="color: #000000;">: tcpsconn(chanmgr </span>*m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">); </span>~<span style="color: #000000;">tcpsconn(); </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> accept_conn(); </span><span style="color: #0000ff;">private</span><span style="color: #000000;">: pthread_mutex_t m_; pthread_t th_; </span><span style="color: #0000ff;">int</span> pipe_[<span style="color: #800080;">2</span><span style="color: #000000;">]; </span><span style="color: #0000ff;">int</span> tcp_; <span style="color: #008000;">//</span><span style="color: #008000;">file desciptor for accepting connection</span> chanmgr *<span style="color: #000000;">mgr_; </span><span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_; std::map</span><<span style="color: #0000ff;">int</span>, connection *><span style="color: #000000;"> conns_; </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> process_accept(); };</span>
tcpsconn::tcpsconn(chanmgr *m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossytest) : mgr_(m1), lossy_(lossytest) { VERIFY(pthread_mutex_init(</span>&m_,NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #0000ff;">struct</span><span style="color: #000000;"> sockaddr_in sin; memset(</span>&sin, <span style="color: #800080;">0</span>, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(sin)); sin.sin_family </span>=<span style="color: #000000;"> AF_INET; sin.sin_port </span>=<span style="color: #000000;"> htons(port); tcp_ </span>= socket(AF_INET, SOCK_STREAM, <span style="color: #800080;">0</span><span style="color: #000000;">); </span><span style="color: #0000ff;">if</span>(tcp_ < <span style="color: #800080;">0</span><span style="color: #000000;">){ perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn accept_loop socket:</span><span style="color: #800000;">"</span><span style="color: #000000;">); VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">); } </span><span style="color: #0000ff;">int</span> yes = <span style="color: #800080;">1</span><span style="color: #000000;">; </span><span style="color: #008000;">//</span><span style="color: #008000;">设置TCP参数, reuseaddr, nodelay</span> setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes)); setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, </span>&yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes)); </span><span style="color: #0000ff;">if</span>(bind(tcp_, (sockaddr *)&sin, <span style="color: #0000ff;">sizeof</span>(sin)) < <span style="color: #800080;">0</span><span style="color: #000000;">){ perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop tcp bind:</span><span style="color: #800000;">"</span><span style="color: #000000;">); VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">); } </span><span style="color: #0000ff;">if</span>(listen(tcp_, <span style="color: #800080;">1000</span>) < <span style="color: #800080;">0</span><span style="color: #000000;">) { perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen:</span><span style="color: #800000;">"</span><span style="color: #000000;">); VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">); } jsl_log(JSL_DBG_2, </span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen on %d %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, port, sin.sin_port); </span><span style="color: #0000ff;">if</span> (pipe(pipe_) < <span style="color: #800080;">0</span><span style="color: #000000;">) { perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop pipe:</span><span style="color: #800000;">"</span><span style="color: #000000;">); VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">); } </span><span style="color: #0000ff;">int</span> flags = fcntl(pipe_[<span style="color: #800080;">0</span><span style="color: #000000;">], F_GETFL, NULL); flags </span>|=<span style="color: #000000;"> O_NONBLOCK; fcntl(pipe_[</span><span style="color: #800080;">0</span>], F_SETFL, flags); <span style="color: #008000;">//</span><span style="color: #008000;">无阻塞管道</span> <span style="color: #000000;"> VERIFY((th_ </span>= method_thread(<span style="color: #0000ff;">this</span>, <span style="color: #0000ff;">false</span>, &tcpsconn::accept_conn)) != <span style="color: #800080;">0</span><span style="color: #000000;">); }</span>