nginx 源码学习笔记(二十一)—— event 模块二 ——事件驱动核心ngx_process_events_and_timers

WBOY
发布: 2016-07-29 09:11:02
原创
1730 人浏览过

首先继续回忆下,之前子线程执行操作里面有一个未涉及的内容ngx_process_events_and_timers,今天我们就来研究下这个函数。

本篇文章来自于:http://blog.csdn.net/lengzijian/article/details/7601730

先来看一下第十九节的部分截图:

nginx 源码学习笔记(二十一)—— event 模块二 ——事件驱动核心ngx_process_events_and_timers

今天主要讲解的就是事件驱动函数,图中的红色部分:

[cpp] view plaincopyprint?

  1. src/event/ngx_event.c  
  2.   
  3. void  
  4. ngx_process_events_and_timers(ngx_cycle_t *cycle)  
  5. {  
  6.     ngx_uint_t  flags;  
  7.     ngx_msec_t  timer, delta;  
  8.   
  9.     if (ngx_timer_resolution) {  
  10.         timer = NGX_TIMER_INFINITE;  
  11.         flags = 0;  
  12.   
  13.     } else {  
  14.         timer = ngx_event_find_timer();  
  15.         flags = NGX_UPDATE_TIME;  
  16.     }  
  17.       
  18.     /* 
  19.     ngx_use_accept_mutex变量代表是否使用accept互斥体 
  20.     默认是使用,可以通过accept_mutex off;指令关闭; 
  21.     accept mutex 的作用就是避免惊群,同时实现负载均衡 
  22.     */  
  23.     if (ngx_use_accept_mutex) {  
  24.           
  25.         /* 
  26.         ngx_accept_disabled变量在ngx_event_accept函数中计算。 
  27.         如果ngx_accept_disabled大于0,就表示该进程接受的链接过多, 
  28.         因此放弃一次争抢accept mutex的机会,同时将自己减一。 
  29.         然后,继续处理已有连接上的事件。 
  30.         nginx就利用这一点实现了继承关于连接的基本负载均衡。 
  31.         */  
  32.         if (ngx_accept_disabled > 0) {  
  33.             ngx_accept_disabled--;  
  34.   
  35.         } else {  
  36.             /* 
  37.             尝试锁accept mutex,只有成功获取锁的进程,才会将listen套接字放到epoll中。 
  38.             因此,这就保证了只有一个进程拥有监听套接口,故所有进程阻塞在epoll_wait时, 
  39.             才不会惊群现象。 
  40.             */  
  41.             if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {  
  42.                 return;  
  43.             }  
  44.   
  45.             if (ngx_accept_mutex_held) {  
  46.                 /* 
  47.                 如果进程获得了锁,将添加一个 NGX_POST_EVENTS 标志。 
  48.                 这个标志的作用是将所有产生的事件放入一个队列中,等释放后,在慢慢来处理事件。 
  49.                 因为,处理时间可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁, 
  50.                 导致其他进程无法获取锁,这样accept的效率就低了。 
  51.                 */  
  52.                 flags |= NGX_POST_EVENTS;  
  53.   
  54.             } else {  
  55.                 /* 
  56.                 没有获得所得进程,当然不需要NGX_POST_EVENTS标志。 
  57.                 但需要设置延时多长时间,再去争抢锁。 
  58.                 */  
  59.                 if (timer == NGX_TIMER_INFINITE  
  60.                     || timer > ngx_accept_mutex_delay)  
  61.                 {  
  62.                     timer = ngx_accept_mutex_delay;  
  63.                 }  
  64.             }  
  65.         }  
  66.     }  
  67.   
  68.     delta = ngx_current_msec;  
  69.       
  70.     /*接下来,epoll要开始wait事件, 
  71.     ngx_process_events的具体实现是对应到epoll模块中的ngx_epoll_process_events函数 
  72.     这里之后会详细讲解的哦 
  73.     */  
  74.     (void) ngx_process_events(cycle, timer, flags);  
  75.     //统计本次wait事件的耗时  
  76.     delta = ngx_current_msec - delta;  
  77.   
  78.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,  
  79.                    "timer delta: %M", delta);  
  80.   
  81.     /* 
  82.     ngx_posted_accept_events是一个事件队列,暂存epoll从监听套接口wait到的accept事件。 
  83.     前文提到的NGX_POST_EVENTS标志被使用后,会将所有的accept事件暂存到这个队列 
  84.     */  
  85.     if (ngx_posted_accept_events) {  
  86.         ngx_event_process_posted(cycle, &ngx_posted_accept_events);  
  87.     }  
  88.     //所有accept事件处理完之后,如果持有锁的话,就释放掉。  
  89.     if (ngx_accept_mutex_held) {  
  90.         ngx_shmtx_unlock(&ngx_accept_mutex);  
  91.     }  
  92.       
  93.     /* 
  94.     delta是之前统计的耗时,存在毫秒级的耗时,就对所有时间的timer进行检查, 
  95.     如果timeout 就从time rbtree中删除到期的timer,同时调用相应事件的handler函数处理 
  96.     */  
  97.     if (delta) {  
  98.         ngx_event_expire_timers();  
  99.     }  
  100.   
  101.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,  
  102.                    "posted events %p", ngx_posted_events);  
  103.   
  104.     /* 
  105.     处理普通事件(连接上获得的读写事件), 
  106.     因为每个事件都有自己的handler方法, 
  107.     */  
  108.     if (ngx_posted_events) {  
  109.         if (ngx_threaded) {  
  110.             ngx_wakeup_worker_thread(cycle);  
  111.   
  112.         } else {  
  113.             ngx_event_process_posted(cycle, &ngx_posted_events);  
  114.         }  
  115.     }  
  116. }  

之前有说过accept事件,其实他就是监听套接口上是否有新来的事件,下面介绍下accept时间的handler方法:

ngx_event_accept:

[cpp] view plaincopyprint?

  1. src/event/ngx_event_accept.c  
  2.   
  3. void  
  4. ngx_event_accept(ngx_event_t *ev)  
  5. {  
  6.     socklen_t          socklen;  
  7.     ngx_err_t          err;  
  8.     ngx_log_t         *log;  
  9.     ngx_socket_t       s;  
  10.     ngx_event_t       *rev, *wev;  
  11.     ngx_listening_t   *ls;  
  12.     ngx_connection_t  *c, *lc;  
  13.     ngx_event_conf_t  *ecf;  
  14.     u_char             sa[NGX_SOCKADDRLEN];  
  15.       
  16.     //省略部分代码  
  17.   
  18.     lc = ev->data;  
  19.     ls = lc->listening;  
  20.     ev->ready = 0;  
  21.   
  22.     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,  
  23.                    "accept on %V, ready: %d", &ls->addr_text, ev->available);  
  24.   
  25.     do {  
  26.         socklen = NGX_SOCKADDRLEN;  
  27.         //accept一个新的连接  
  28.         s = accept(lc->fd, (struct sockaddr *) sa, &socklen);  
  29.         //省略部分代码  
  30.           
  31.         /* 
  32.         accept到一个新的连接后,就重新计算ngx_accept_disabled的值, 
  33.         它主要是用来做负载均衡,之前有提过。 
  34.          
  35.         这里,我们可以看到他的就只方式 
  36.         “总连接数的八分之一   -   剩余的连接数“ 
  37.         总连接指每个进程设定的最大连接数,这个数字可以再配置文件中指定。 
  38.          
  39.         所以每个进程到总连接数的7/8后,ngx_accept_disabled就大于零,连接超载了 
  40.          
  41.         */  
  42.   
  43.         ngx_accept_disabled = ngx_cycle->connection_n / 8  
  44.                               - ngx_cycle->free_connection_n;  
  45.           
  46.         //获取一个connection  
  47.         c = ngx_get_connection(s, ev->log);  
  48.   
  49.         //为新的链接创建起一个memory pool  
  50.         //连接关闭的时候,才释放pool  
  51.   
  52.         c->pool = ngx_create_pool(ls->pool_size, ev->log);  
  53.         if (c->pool == NULL) {  
  54.             ngx_close_accepted_connection(c);  
  55.             return;  
  56.         }  
  57.   
  58.         c->sockaddr = ngx_palloc(c->pool, socklen);  
  59.         if (c->sockaddr == NULL) {  
  60.             ngx_close_accepted_connection(c);  
  61.             return;  
  62.         }  
  63.   
  64.         ngx_memcpy(c->sockaddr, sa, socklen);  
  65.   
  66.         log = ngx_palloc(c->pool, sizeof(ngx_log_t));  
  67.         if (log == NULL) {  
  68.             ngx_close_accepted_connection(c);  
  69.             return;  
  70.         }  
  71.   
  72.         /* set a blocking mode for aio and non-blocking mode for others */  
  73.   
  74.         if (ngx_inherited_nonblocking) {  
  75.             if (ngx_event_flags & NGX_USE_AIO_EVENT) {  
  76.                 if (ngx_blocking(s) == -1) {  
  77.                     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,  
  78.                                   ngx_blocking_n " failed");  
  79.                     ngx_close_accepted_connection(c);  
  80.                     return;  
  81.                 }  
  82.             }  
  83.   
  84.         } else {  
  85.             //我们使用epoll模型,这里我们设置连接为nonblocking  
  86.             if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {  
  87.                 if (ngx_nonblocking(s) == -1) {  
  88.                     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,  
  89.                                   ngx_nonblocking_n " failed");  
  90.                     ngx_close_accepted_connection(c);  
  91.                     return;  
  92.                 }  
  93.             }  
  94.         }  
  95.   
  96.         *log = ls->log;  
  97.         //初始化新的连接  
  98.         c->recv = ngx_recv;  
  99.         c->send = ngx_send;  
  100.         c->recv_chain = ngx_recv_chain;  
  101.         c->send_chain = ngx_send_chain;  
  102.   
  103.         c->log = log;  
  104.         c->pool->log = log;  
  105.   
  106.         c->socklen = socklen;  
  107.         c->listening = ls;  
  108.         c->local_sockaddr = ls->sockaddr;  
  109.   
  110.         c->unexpected_eof = 1;  
  111.   
  112. #if (NGX_HAVE_UNIX_DOMAIN)  
  113.         if (c->sockaddr->sa_family == AF_UNIX) {  
  114.             c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;  
  115.             c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;  
  116. #if (NGX_SOLARIS)  
  117.             /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */  
  118.             c->sendfile = 0;  
  119. #endif  
  120.         }  
  121. #endif  
  122.   
  123.         rev = c->read;  
  124.         wev = c->write;  
  125.   
  126.         wev->ready = 1;  
  127.   
  128.         if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {  
  129.             /* rtsig, aio, iocp */  
  130.             rev->ready = 1;  
  131.         }  
  132.   
  133.         if (ev->deferred_accept) {  
  134.             rev->ready = 1;  
  135. #if (NGX_HAVE_KQUEUE)  
  136.             rev->available = 1;  
  137. #endif  
  138.         }  
  139.   
  140.         rev->log = log;  
  141.         wev->log = log;  
  142.   
  143.         /* 
  144.          * TODO: MT: - ngx_atomic_fetch_add() 
  145.          *             or protection by critical section or light mutex 
  146.          * 
  147.          * TODO: MP: - allocated in a shared memory 
  148.          *           - ngx_atomic_fetch_add() 
  149.          *             or protection by critical section or light mutex 
  150.          */  
  151.   
  152.         c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);  
  153.           
  154.         if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {  
  155.             if (ngx_add_conn(c) == NGX_ERROR) {  
  156.                 ngx_close_accepted_connection(c);  
  157.                 return;  
  158.             }  
  159.         }  
  160.   
  161.         log->data = NULL;  
  162.         log->handler = NULL;  
  163.           
  164.         /* 
  165.         这里listen handler很重要,它将完成新连接的最后初始化工作, 
  166.         同时将accept到的新的连接放入epoll中;挂在这个handler上的函数, 
  167.         就是ngx_http_init_connection 在之后http模块中在详细介绍 
  168.         */  
  169.         ls->handler(c);  
  170.   
  171.         if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {  
  172.             ev->available--;  
  173.         }  
  174.   
  175.     } while (ev->available);  
  176. }  

accpt事件的handler方法也就是如此了。之后就是每个连接的读写事件handler方法,这一部分会直接将我们引入http模块,我们还不急,还要学习下nginx经典模块epoll。

以上就介绍了nginx 源码学习笔记(二十一)—— event 模块二 ——事件驱动核心ngx_process_events_and_timers,包括了队列方面的内容,希望对PHP教程有兴趣的朋友有所帮助。

相关标签:
来源:php.cn
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责声明 Sitemap
PHP中文网:公益在线PHP培训,帮助PHP学习者快速成长!