我们已经大致知道event_base的初始化过程,上一节的最后给出了一张event_base中管理的各种数据结构的图示。本节就是要探究event_base是如何利用这些结构处理事件的。
I/O事件管理 首先前面提到event_base成员struct event_io_map io
是一个用数组实现的hashmap,用来保存未决的I/O事件(struct event_signal_map
同样,用来保存signal事件)。
libevent通过evmap_io_add_/evmap_io_del_函数往该hashmap里调加/删除event。以evmap_io_add_为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 int evmap_io_add_ (struct event_base *base, evutil_socket_t fd, struct event *ev) { const struct eventop *evsel = base->evsel; struct event_io_map *io = &base->io; struct evmap_io *ctx = NULL ; int nread, nwrite, nclose, retval = 0 ; short res = 0 , old = 0 ; struct event *old_ev ; EVUTIL_ASSERT(fd == ev->ev_fd); if (fd < 0 ) return 0 ; GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init, evsel->fdinfo_len); nread = ctx->nread; nwrite = ctx->nwrite; nclose = ctx->nclose; if (nread) old |= EV_READ; if (nwrite) old |= EV_WRITE; if (nclose) old |= EV_CLOSED; if (ev->ev_events & EV_READ) { if (++nread == 1 ) res |= EV_READ; } if (ev->ev_events & EV_WRITE) { if (++nwrite == 1 ) res |= EV_WRITE; } if (ev->ev_events & EV_CLOSED) { if (++nclose == 1 ) res |= EV_CLOSED; } if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff || nclose > 0xffff )) { event_warnx("Too many events reading or writing on fd %d" , (int )fd); return -1 ; } if (EVENT_DEBUG_MODE_IS_ON() && (old_ev = LIST_FIRST(&ctx->events)) && (old_ev->ev_events&EV_ET) != (ev->ev_events&EV_ET)) { event_warnx("Tried to mix edge-triggered and non-edge-triggered" " events on fd %d" , (int )fd); return -1 ; } if (res) { void *extra = ((char *)ctx) + sizeof (struct evmap_io); if (evsel->add(base, ev->ev_fd, old, (ev->ev_events & EV_ET) | res, extra) == -1 ) return (-1 ); retval = 1 ; } ctx->nread = (ev_uint16_t ) nread; ctx->nwrite = (ev_uint16_t ) nwrite; ctx->nclose = (ev_uint16_t ) nclose; LIST_INSERT_HEAD(&ctx->events, ev, ev_io_next); return (retval); }
也就是说使用libevent时,对新建的I/O事件调用event_add后,首先会将这个事件添加到event_io_map这样的hashmap里,然后才会将对应的文件描述符添加到后端的多路复用方法epoll之中。
信号和定时器管理 对于一个网络库而言,除了I/O事件之外还要处理signal事件以及timer事件。其中signal事件和I/O事件采用的管理容器基本一致,都是用的hashmap,上图里的fd
替换为signal value
即为event_signal_map的示意图。
timer事件采用的是一个最小堆
的结构存储,并且巧妙的将其转化为I/O事件统一在epoll处理了。
更新:最近我更新的两个博文说明了eventloop中是如何处理信号和定时器的,链接分别是:Effective epoll 和Linux下定时器的设计与实现 。
事件主循环 现在我们开始分析libevent里最重要的一个函数(没有之一)。前面”reactor模式”一章,已经用libevent-like风格简单封装了一个reactor模式的dispatcher——event_base_dispatch()函数。
1 2 3 4 struct epoll_event { uint32_t events; epoll_data_t data; };
当时利用struct epoll_event
的data
域存放了文件描述符对应的事件回调函数的指针,当监控的事件发生时,再从epoll_event里取回并调用回调函数。在libevent里并不是这么做的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 int event_base_dispatch (struct event_base *event_base) { return (event_base_loop(event_base, 0 )); } int event_base_loop (struct event_base *base, int flags) { ... while (!done) { tv_p = &tv; if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) { timeout_next(base, &tv_p); } else { evutil_timerclear(&tv); } if (0 ==(flags&EVLOOP_NO_EXIT_ON_EMPTY) && !event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) { event_debug(("%s: no events registered." , __func__)); retval = 1 ; goto done; } event_queue_make_later_events_active(base); clear_time_cache(base); res = evsel->dispatch(base, tv_p); if (res == -1 ) { event_debug(("%s: dispatch returned unsuccessfully." , __func__)); retval = -1 ; goto done; } update_time_cache(base); timeout_process(base); if (N_ACTIVE_CALLBACKS(base)) { int n = event_process_active(base); if ((flags & EVLOOP_ONCE) && N_ACTIVE_CALLBACKS(base) == 0 && n != 0 ) done = 1 ; } else if (flags & EVLOOP_NONBLOCK) done = 1 ; } ... }
总结一下,它主要工作流程大体上是这样的:
设置超时时间
开始epoll_dispatch(对epoll_wait的封装)
处理超时事件
处理激活的I/O事件
标记激活事件 再来分析一下event_base_dispatch()里的evsel->dispatch(base, tv_p);
是如何标记激活的事件的,看epoll_dispatch()这个函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 static int epoll_dispatch (struct event_base *base, struct timeval *tv) { ... res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); ... for (i = 0 ; i < res; i++) { int what = events[i].events; short ev = 0 ; ... evmap_io_active_(base, events[i].data.fd, ev | EV_ET); } ... return (0 ); } void evmap_io_active_ (struct event_base *base, evutil_socket_t fd, short events) { struct event_io_map *io = &base->io; struct evmap_io *ctx ; struct event *ev ; GET_IO_SLOT(ctx, io, fd, evmap_io); if (NULL == ctx) return ; LIST_FOREACH(ev, &ctx->events, ev_io_next) { if (ev->ev_events & events) event_active_nolock_(ev, ev->ev_events & events, 1 ); } } void event_active_nolock_ (struct event *ev, int res, short ncalls) { ... if (ev->ev_pri < base->event_running_priority) base->event_continue = 1 ; event_callback_activate_nolock_(base, event_to_event_callback(ev)); } int event_callback_activate_nolock_ (struct event_base *base, struct event_callback *evcb) { ... event_queue_insert_active(base, evcb); ... } static void event_queue_insert_active (struct event_base *base, struct event_callback *evcb) { ... base->event_count_active++; MAX_EVENT_COUNT(base->event_count_active_max, base->event_count_active); EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); }
标记激活事件的步骤相对来说还是比较麻烦的,主要流程归纳如下:
epoll_wait返回,拿到活动的fd
取出fd对应的所有的event,可能不止一个,循环以下步骤。
将事件对应的回调函数取出
将回调函数插入到对应优先级的链表上
事件优先级 libevent的事件是支持优先级的,优先级越高的事件越先被处理。
event_base中activequeues
是一个数组,数组的每一个元素是一个struct evcallback_list*
指针,数组下标表示优先级,下标越小,优先级越高。
初始化事件时,如果不显示指定事件的优先级,会默认被指为优先级总数的一半。
处理激活事件 再回来看event_base_loop主循环的最后一部分,调用了event_process_active()
处理激活事件。
处理事件的时候,会用到event_base里有三个成员,它们分别是:
1 2 3 4 5 6 struct event_base { struct timeval max_dispatch_time ; int max_dispatch_callbacks; int limit_callbacks_after_prio; };
默认情况下,它们的值分别是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 struct event_config* event_config_new (void ) { struct event_config *cfg = mm_calloc(1 , sizeof (*cfg)); if (cfg == NULL ) return (NULL ); TAILQ_INIT(&cfg->entries); cfg->max_dispatch_interval.tv_sec = -1 ; cfg->max_dispatch_callbacks = INT_MAX; cfg->limit_callbacks_after_prio = 1 ; return (cfg); }
有了这三个标志位位,现在就可以处理所有激活的回调函数了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 static int event_process_active (struct event_base *base) { ... if (base->max_dispatch_time.tv_sec >= 0 ) { update_time_cache(base); gettime(base, &tv); evutil_timeradd(&base->max_dispatch_time, &tv, &tv); endtime = &tv; } else { endtime = NULL ; } for (i = 0 ; i < base->nactivequeues; ++i) { if (TAILQ_FIRST(&base->activequeues[i]) != NULL ) { base->event_running_priority = i; activeq = &base->activequeues[i]; if (i < limit_after_prio) c = event_process_active_single_queue(base, activeq, INT_MAX, NULL ); else c = event_process_active_single_queue(base, activeq, maxcb, endtime); if (c < 0 ) { goto done; } else if (c > 0 ) break ; } } done: base->event_running_priority = -1 ; return c; }
至于event_process_active_single_queue
就不继续往下跟了,就是调用回调函数,并将其从队列里移除。
处理激活事件的大致流程总结:
设置超时时间
遍历每一个优先级的队列
优先级数值小于 limit_after_prio 的,确保全部处理完毕
优先级数值大于 limit_after_prio 的,尽量的多处理,不一定能够全部处理掉,处理不完的下一轮dispatch中再处理
默认情况下,libevent会确保所有的事件都会得到处理,然后如果你配置了这个三个事件处理的标志位后,就要当心了,因为这里存在一种饥饿(Starvation)的可能,低优先级事件过多的时候(数值大于limit_after_prio),最低等级的事件的回调函数可能永远也不会被调用。
总结 到这里 event_base,event结构,reactor模式,以及事件处理流程已经都分析完毕了,它们是libevent的最基础的原理。后边的章节将会介绍libevent的一些高级功能。