nginx epoll 事件模型
nginx做为一个异步高效的事件驱动型web服务器,在linux平台中当系统支持epoll时nginx默认采用epoll来高效的处理事件。nginx中使用ngx_event_t结构来表示一个事件,先介绍下ngx_event_t结构体中成员的含义:
-
struct ngx_event_s { -
void *data; //与事件关联的对象,常指向事件所在的ngx_connection_t连接对象 -
unsigned write:1; //可写标识位,1表示对应的tcp连接是可写的 -
unsigned accept:1;// 1表示对应的连接是处于监听状态的连接,即可接收新的连接 -
/* used to detect the stale events in kqueue, rtsig, and epoll */ -
unsigned instance:1; //可来区分事件是否已过期 -
/* -
* the event was passed or would be passed to a kernel; -
* in aio mode - operation was posted. -
*/ -
unsigned active:1;// 1表示事件活跃,即事件已添加到epoll中 -
unsigned disabled:1;//epoll中不使用该标识位 -
/* the ready event; in aio mode 0 means that no operation can be posted */ -
unsigned ready:1; //事件已就绪(即可读或可写) -
unsigned oneshot:1;//epoll不使用该标识位 -
/* aio operation is complete */ -
unsigned complete:1;//aio中使用,表示 事件对应的aio异步操作已完成(io_getevents函数已成功返回) -
unsigned eof:1;// 1表示当前处理的字符流已完成,如调用recv读取连接数据时返回0,此时置该标识位为1 -
unsigned error:1;// 1表示事件处理过程中发生错误 -
unsigned timedout:1; //事件是否超时,1:表示超时。超时后事件对应的请求不需再被处理(对于http模块来说事件超时后直接关闭请求) -
unsigned timer_set:1; //为1时表示这个事件在定时器中 -
unsigned delayed:1;// 1表示 需延迟处理该事件,常用于限速功能中 -
unsigned deferred_accept:1;//延迟接收接连,即当连接中收到对象发送的数据后才真正建立连接 -
/* the pending eof reported by kqueue, epoll or in aio chain operation */ -
unsigned pending_eof:1;// 1表示TCP连接对向关闭读端,即epoll返回EPOLLRDHUP -
#if !(NGX_THREADS) -
unsigned posted_ready:1;//该标识位在1.5.5版本源码中只在ngx_epoll_process_events函数中有置位,其它地方并没有用到 -
#endif -
#if (NGX_WIN32) -
/* setsockopt(SO_UPDATE_ACCEPT_CONTEXT) was successful */ -
unsigned accept_context_updated:1; -
#endif -
#if (NGX_HAVE_KQUEUE) -
unsigned kq_vnode:1; -
/* the pending errno reported by kqueue */ -
int kq_errno; -
#endif -
/* -
* kqueue only: -
* accept: number of sockets that wait to be accepted -
* read: bytes to read when event is ready -
* or lowat when event is set with NGX_LOWAT_EVENT flag -
* write: available space in buffer when event is ready -
* or lowat when event is set with NGX_LOWAT_EVENT flag -
* -
* iocp: TODO -
* -
* otherwise: -
* accept: 1 if accept many, 0 otherwise -
*/ -
#if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP) -
int available; -
#else -
unsigned available:1;// 1表示每次调用accept时尽可能多的接收TCP连接,与multi_accept配置项对应 -
#endif -
ngx_event_handler_pt handler; // 事件产生后的回调函数句柄 -
#if (NGX_HAVE_AIO) -
#if (NGX_HAVE_IOCP) -
ngx_event_ovlp_t ovlp; -
#else -
struct aiocb aiocb; -
#endif -
#endif -
ngx_uint_t index; //epoll中不使用 -
ngx_log_t *log; //ngx_log_t对象 -
ngx_rbtree_node_t timer; -
unsigned closed:1; // 1表示事件已关闭 -
/* to test on worker exit */ -
unsigned channel:1;// 只在ngx_add_channel_event函数中有置位,其它地方没用到 -
unsigned resolver:1; // resolver功能中使用? -
#if (NGX_THREADS) -
unsigned locked:1; -
unsigned posted_ready:1; -
unsigned posted_timedout:1; -
unsigned posted_eof:1; -
#if (NGX_HAVE_KQUEUE) -
/* the pending errno reported by kqueue */ -
int posted_errno; -
#endif -
#if (NGX_HAVE_KQUEUE) || (NGX_HAVE_IOCP) -
int posted_available; -
#else -
unsigned posted_available:1; -
#endif -
ngx_atomic_t *lock; -
ngx_atomic_t *own_lock; -
#endif -
/* the links of the posted queue */ -
ngx_event_t *next; -
ngx_event_t **prev; -
#if 0 -
/* the threads support */ -
/* -
* the event thread context, we store it here -
* if $(CC) does not understand __thread declaration -
* and pthread_getspecific() is too costly -
*/ -
void *thr_ctx; -
#if (NGX_EVENT_T_PADDING) -
/* event should not cross cache line in SMP */ -
uint32_t padding[NGX_EVENT_T_PADDING]; -
#endif -
#endif -
}; -
#if (NGX_HAVE_FILE_AIO) -
struct ngx_event_aio_s { -
void *data; -
ngx_event_handler_pt handler; -
ngx_file_t *file; -
ngx_fd_t fd; -
#if (NGX_HAVE_EVENTFD) -
int64_t res; -
#if (NGX_TEST_BUILD_EPOLL) -
ngx_err_t err; -
size_t nbytes; -
#endif -
#else -
ngx_err_t err; -
size_t nbytes; -
#endif -
#if (NGX_HAVE_AIO_SENDFILE) -
off_t last_offset; -
#endif -
ngx_aiocb_t aiocb; -
ngx_event_t event; -
}; -
#endif
nginx中使用ngx_epoll_module模块来封装epoll机制处理事件,ngx_epoll_module模块只对两个配置项感兴趣,其ngx_command_t结构如下:
-
static ngx_command_t ngx_epoll_commands[] = { -
{ -
/***epoll_events配置项表示epoll_wait函数每次最多返回多少个事件,在ngx_epoll_init函数中 -
会预先分配epoll_events配置项指定的epoll_event结构个数**/ -
ngx_string("epoll_events"), -
NGX_EVENT_CONF|NGX_CONF_TAKE1, -
ngx_conf_set_num_slot, -
0, -
offsetof(ngx_epoll_conf_t, events), -
NULL }, -
{ -
/***worker_aio_requests配置项表示创建的aio context能并发处理异步事件的个数,即io_setup函数的第一个参数***/ -
ngx_string("worker_aio_requests"), -
NGX_EVENT_CONF|NGX_CONF_TAKE1, -
ngx_conf_set_num_slot, -
0, -
offsetof(ngx_epoll_conf_t, aio_requests), -
NULL }, -
ngx_null_command -
};
ngx_epoll_module的ngx_event_module_t结构如下:
-
ngx_event_module_t ngx_epoll_module_ctx = { -
&epoll_name, -
ngx_epoll_create_conf, /* create configuration */ -
ngx_epoll_init_conf, /* init configuration */ -
{ -
//向epoll中添加事件时调用 -
ngx_epoll_add_event, /* add an event */ -
//从epoll中删除事件时调用 -
ngx_epoll_del_event, /* delete an event */ -
/***epoll中不存在enable/disable事件的情况,这里默认设置成添加/删除事件的函数***/ -
ngx_epoll_add_event, /* enable an event */ -
ngx_epoll_del_event, /* disable an event */ -
//向epoll中添加tcp连接时调用,每个tcp连接对象一个读事件和一个写事件 -
ngx_epoll_add_connection, /* add an connection */ -
//从epoll中删除事件时调用 -
ngx_epoll_del_connection, /* delete an connection */ -
NULL, /* process the changes */ -
// epoll 事件处理函数 -
ngx_epoll_process_events, /* process the events */ -
//epoll模块初始化函数 -
ngx_epoll_init, /* init the events */ -
//epoll模块清理函数只在多线程模型中被调用 -
ngx_epoll_done, /* done the events */ -
} -
};
ngx_epoll_create_conf在配置项解析前调用用来初始化配置结构,ngx_epoll_init_conf函数在配置项解析完后调用,如果配置文件是不存在epoll_events或worker_aio_requests配置项,默认将epoll_events设置为512,worker_aio_requests设置为32。ngx_epoll_module_ctx结构体中后十个函数对应于ngx_event_actions_t结构,它是事件模块独有的结构。ngx_epoll_init函数在什么时候被调用呢,它在nginx启动过程中每个worker进程启动后被调用(由ngx_event_core_module的ngx_event_process_init函数调用)。
ngx_epoll_module源码分析
ngx_epoll_init函数:
-
static ngx_int_t -
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) -
{ -
ngx_epoll_conf_t *epcf; -
// 获取ngx_epoll_module模块存放配置项的结构 -
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module); -
if (ep == -1) { -
// 创建epoll,成功返回描述符,失败返回-1 -
ep = epoll_create(cycle->connection_n / 2); -
if (ep == -1) { -
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, -
"epoll_create() failed"); -
return NGX_ERROR; -
} -
/***如果系统支持aio , 这里初始化aio***/ -
#if (NGX_HAVE_FILE_AIO) -
ngx_epoll_aio_init(cycle, epcf); -
#endif -
} -
/***预分配events个epoll_event结构, epcf->events由epoll_events配置项指定,默认为512***/ -
if (nevents < epcf->events) { -
if (event_list) { -
ngx_free(event_list); -
} -
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, -
cycle->log); -
if (event_list == NULL) { -
return NGX_ERROR; -
} -
} -
nevents = epcf->events; -
//指定I/O读写的方法 -
ngx_io = ngx_os_io; -
// 设置ngx_event_actions接口,后续通过ngx_event_actions来调用epoll模块中的方法 -
ngx_event_actions = ngx_epoll_module_ctx.actions; -
/***nginx使用epoll事件模型时NGX_HAVE_CLEAR_EVENT宏被定义, NGX_USE_CLEAR_EVENT宏表示使用epoll的ET模式***/ -
#if (NGX_HAVE_CLEAR_EVENT) -
ngx_event_flags = NGX_USE_CLEAR_EVENT -
#else -
ngx_event_flags = NGX_USE_LEVEL_EVENT -
#endif -
|NGX_USE_GREEDY_EVENT -
|NGX_USE_EPOLL_EVENT; -
return NGX_OK; -
}
ngx_epoll_add_event函数:
-
static ngx_int_t -
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) -
{ -
int op; -
uint32_t events, prev; -
ngx_event_t *e; -
ngx_connection_t *c; -
struct epoll_event ee; -
//获取事件关联的连接 -
c = ev->data; -
events = (uint32_t) event; -
/***根据event参数判断当前是添加读事件还是写事件***/ -
if (event == NGX_READ_EVENT) { -
e = c->write; -
prev = EPOLLOUT; -
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP) -
events = EPOLLIN|EPOLLRDHUP; -
#endif -
} else { -
e = c->read; -
prev = EPOLLIN|EPOLLRDHUP; -
#if (NGX_WRITE_EVENT != EPOLLOUT) -
events = EPOLLOUT; -
#endif -
} -
/***如果当前需添加读事件,就通过active标识判断读事件所关联的连接对应的写事件是否活跃( -
活跃表示事件已添加到epoll中)。***/ -
if (e->active) { -
op = EPOLL_CTL_MOD; -
events |= prev; -
} else { -
op = EPOLL_CTL_ADD; -
} -
//将flags参数加入到epoll标志中 -
ee.events = events | (uint32_t) flags; -
/*** ptr存储事件关联的连接对象(ngx_connection_t*)及事件过期比特位, -
linux平台中任何对象的地址最低位必定为零***/ -
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); -
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, -
"epoll add event: fd:%d op:%d ev:%08XD", -
c->fd, op, ee.events); -
//向epoll中添加事件 -
if (epoll_ctl(ep, op, c->fd, &ee) == -1) { -
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, -
"epoll_ctl(%d, %d) failed", op, c->fd); -
return NGX_ERROR; -
} -
//标识事件活跃 -
ev->active = 1; -
#if 0 -
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0; -
#endif -
return NGX_OK; -
}
ngx_epoll_del_event函数:
-
static ngx_int_t -
ngx_epoll_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) -
{ -
int op; -
uint32_t prev; -
ngx_event_t *e; -
ngx_connection_t *c; -
struct epoll_event ee; -
/* -
* when the file descriptor is closed, the epoll automatically deletes -
* it from its queue, so we do not need to delete explicitly the event -
* before the closing the file descriptor -
*/ -
/***上面的注释说得很清楚了,当文件描述符被关闭后,epoll会自动将其删除。***/ -
if (flags & NGX_CLOSE_EVENT) { -
ev->active = 0; -
return NGX_OK; -
} -
//获取事件关联的连接 -
c = ev->data; -
/***根据event参数判断当前是删除读事件还是写事件***/ -
if (event == NGX_READ_EVENT) { -
e = c->write; -
prev = EPOLLOUT; -
} else { -
e = c->read; -
prev = EPOLLIN|EPOLLRDHUP; -
} -
/***参考ngx_epoll_add_event函数***/ -
if (e->active) { -
op = EPOLL_CTL_MOD; -
ee.events = prev | (uint32_t) flags; -
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance); -
} else { -
op = EPOLL_CTL_DEL; -
ee.events = 0; -
ee.data.ptr = NULL; -
} -
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0, -
"epoll del event: fd:%d op:%d ev:%08XD", -
c->fd, op, ee.events); -
//从epoll中删除事件 -
if (epoll_ctl(ep, op, c->fd, &ee) == -1) { -
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, -
"epoll_ctl(%d, %d) failed", op, c->fd); -
return NGX_ERROR; -
} -
//清除事件活跃标识 -
ev->active = 0; -
return NGX_OK; -
}
ngx_epoll_add_connection及ngx_epoll_del_connection函数
这两个函数的实现很简单,也是通过调用epoll_ctl添加事件,只是会同时将读/写事件一起添加进epoll,这里不再列出源码。
ngx_epoll_process_events函数:
-
static ngx_int_t -
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) -
{ -
int events; -
uint32_t revents; -
ngx_int_t instance, i; -
ngx_uint_t level; -
ngx_err_t err; -
ngx_event_t *rev, *wev, **queue; -
ngx_connection_t *c; -
/* NGX_TIMER_INFINITE == INFTIM */ -
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, -
"epoll timer: %M", timer); -
//调用epoll_wait获取已准备就绪的事件 -
events = epoll_wait(ep, event_list, (int) nevents, timer); -
err = (events == -1) ? ngx_errno : 0; -
/***NGX_UPDATE_TIME标识在没有设置timer_resolution配置项时有效表示每次调用epoll_wait函数返回会都更新时间。 -
ngx_event_timer_alarm变量在设置timer_resolution配置项时有效,每间隔timer_resolution配置项参数值就会设置 -
ngx_event_timer_alarm变量为1表示需更新时间。***/ -
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { -
ngx_time_update(); -
} -
//err为非零指示epoll_wait失败 -
if (err) { -
if (err == NGX_EINTR) { -
if (ngx_event_timer_alarm) { -
ngx_event_timer_alarm = 0; -
return NGX_OK; -
} -
level = NGX_LOG_INFO; -
} else { -
level = NGX_LOG_ALERT; -
} -
ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); -
return NGX_ERROR; -
} -
if (events == 0) { -
if (timer != NGX_TIMER_INFINITE) { -
return NGX_OK; -
} -
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, -
"epoll_wait() returned no events without timeout"); -
return NGX_ERROR; -
} -
//仅在多线程环境下此锁才有效 -
ngx_mutex_lock(ngx_posted_events_mutex); -
/***循环处理已就绪的事件***/ -
for (i = 0; i < events; i++) { -
//获取事件关联的连接对象,对象地址最低位保存有在事件添加时设置的事件过期位 -
c = event_list[i].data.ptr; -
//取事件过期位 -
instance = (uintptr_t) c & 1; -
//屏蔽掉连接对象的最低位 -
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); -
rev = c->read; -
/***同一条连接的读/写事件的instance位值相同,由于下面先处理读事件这里通过读事件 -
的过期位来判断连接是否过期,当fd为-1时也表示连接过期。***/ -
if (c->fd == -1 || rev->instance != instance) { -
/* -
* the stale event from a file descriptor -
* that was just closed in this iteration -
*/ -
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, -
"epoll: stale event %p", c); -
continue; -
} -
//获取连接已就绪的事件类型 -
revents = event_list[i].events; -
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, -
"epoll: fd:%d ev:%04XD d:%p", -
c->fd, revents, event_list[i].data.ptr); -
/***连接出现错误,EPOLLHUP标识表示收到RST报文。检测到这两种类型时 tcp连接中可能还有 -
数据未被读取***/ -
if (revents & (EPOLLERR|EPOLLHUP)) { -
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, -
"epoll_wait() error on fd:%d ev:%04XD", -
c->fd, revents); -
} -
#if 0 -
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) { -
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, -
"strange epoll_wait() events fd:%d ev:%04XD", -
c->fd, revents); -
} -
#endif -
/***如果连接发生错误但未置EPOLLIN及EPOLLOUT,这时我们加上EPOLLIN和EPOLLOUT,在调用读/写事件的 -
回调函数时就会知道为什么出现错误。 如果不加EPOLLIN和EPOLLOUT,后面就没法调用读/写事件的 -
回调函数也就无法处理该连接了。***/ -
if ((revents & (EPOLLERR|EPOLLHUP)) -
&& (revents & (EPOLLIN|EPOLLOUT)) == 0) -
{ -
/* -
* if the error events were returned without EPOLLIN or EPOLLOUT, -
* then add these flags to handle the events at least in one -
* active handler -
*/ -
revents |= EPOLLIN|EPOLLOUT; -
} -
/***连接可读且活跃***/ -
if ((revents & EPOLLIN) && rev->active) { -
#if (NGX_HAVE_EPOLLRDHUP) -
//EPOLLRDHUP表示连接对方关闭了读端 -
if (revents & EPOLLRDHUP) { -
rev->pending_eof = 1; -
} -
#endif -
//NGX_POST_THREAD_EVENTS宏末被使用 -
if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { -
rev->posted_ready = 1; -
} else { -
//标识事件已就绪 -
rev->ready = 1; -
} -
/***NGX_POST_EVENTS表示事件需要延后处理,这里根据accept标识位将事件加入到相应队列中***/ -
if (flags & NGX_POST_EVENTS) { -
queue = (ngx_event_t **) (rev->accept ? -
&ngx_posted_accept_events : &ngx_posted_events); -
ngx_locked_post_event(rev, queue); -
} else { -
//调用事件的回调函数 -
rev->handler(rev); -
} -
} -
wev = c->write; -
/***连接可写且活跃***/ -
if ((revents & EPOLLOUT) && wev->active) { -
//重新检查事件是否过期,因为在处理读事件过程中该事件可能已结束。 -
if (c->fd == -1 || wev->instance != instance) { -
/* -
* the stale event from a file descriptor -
* that was just closed in this iteration -
*/ -
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, -
"epoll: stale event %p", c); -
continue; -
} -
if (flags & NGX_POST_THREAD_EVENTS) { -
wev->posted_ready = 1; -
} else { -
wev->ready = 1; -
} -
if (flags & NGX_POST_EVENTS) { -
ngx_locked_post_event(wev, &ngx_posted_events); -
} else { -
wev->handler(wev); -
} -
} -
} -
ngx_mutex_unlock(ngx_posted_events_mutex); -
return NGX_OK; -
}