libevent添加一个间隔1s持续触发的定时器如下:
struct event_base *base = event_base_new();
struct event *timer_ev = event_new(base , -1, EV_PERSIST, cb_func, NULL );
struct timeval one_sec = {1,0};
event_add(timer_ev , &one_sec);
event_base_dispatch(base );
就是new出一个不与任何fd关联的event,然后以1s做参数添加进入事件系统。而其中定时器的原理就是维护一个最小堆队列,key就是每一个定时器的绝对时间,每次事件循环取出最小超时时间,减去当前时间得到的TimeOut值作为参数传给io多路复用等待。当超时后,从最小堆中pop出那些超时的event,调用回调函数。针对添加定时器,event_add的实现细节是:event_add->event_add_internal->event_queue_insert,event_queue_insert函数的实现是把一个定时器event push到全局的最小堆中。event_base_dispatch是事件循环执行入口,主要是在一个超时的调用上等待, 只看定时器,把event_base_loop代码可以简化成如下
int
event_base_loop( struct event_base *base, int flags)
{
while (!done) {
timeout\_next(base, &tv\_p); // 从全局最小堆中取出最小超时时间
res = evsel->dispatch(base, tv\_p); // 以最小超时时间等待
timeout\_process(base); // 取出所有超时的event放入active events list
int n = event_process_active(base); // 遍历active events list,回调每一个active event的回调函数
}
}
上述是主要流程,省略了一些细节。比如会去校验当前时间,如果发现当前时间往后跑了,要去调整最小堆中所有事件的时间(timeout_correct的实现),还有就是被标记为persistent的event,在触发后要马上重新添加进全局最小堆中,等待下次触发。
libevent为了把一个io fd关联到event,定义了evmap_io结构体和evmap_io_map:
/** An entry for an evmap_io list: notes all the events that want to read or
write on a given fd, and the number of each.
*/
struct evmap_io {
struct event_list events;
ev_uint16_t nread ;
ev_uint16_t nwrite ;
};
/* Used to map signal numbers to a list of events. If EVMAP_USE_HT is not
defined, this structure is also used as event_io_map, which maps fds to a
list of events.
*/
struct event_signal_map {
/* An array of evmap_io * or of evmap_signal *; empty entries are
\* set to NULL. \*/
void **entries ;
/* The number of entries available in entries */
int nentries ;
};
因为一个io可以关联多个事件,所以evmap_io就是一个event链表。evmap_io_map就是具体的 io --> evmap_io的映射,这里是一个数组是基于fd是比较小的相互靠近的整数这个事实,像在windows下就不是这么回事了,io句柄是类似指针的东西,这时候就要用到hash表了。
针对evmap就有evmap_io_add操作来建立一个io到event的映射,实现比较简单:在evmap_io_map中找到对应的event_list,然后把新的event插到event_list后面。过程中如果添加的event使得libevent要首次关心这个io的可读(可写)状态,还要把这个io的可读(可写)加到多路复用的等待集合中去。
关于多路复用的封装,event_base中的const struct eventop * evsel 指定哪一个具体的多路复用的实现,这是一个策略模式,策略选择在event_init的时候做出,代码显示是可以配置使用哪种实现。eventop是一些接口原型:
/** Structure to define the backend of a given event_base. */
struct eventop {
/** The name of this backend. */
const char *name;
/** Function to set up an event_base to use this backend. It should
\* create a new structure holding whatever information is needed to
\* run the backend, and return it. The returned pointer will get
\* stored by event\_init into the event\_base.evbase field. On failure,
\* this function should return NULL. \*/
void *(*init)(struct event_base *);
/** Enable reading/writing on a given fd or signal. 'events' will be
\* the events that we're trying to enable: one or more of EV\_READ,
\* EV\_WRITE, EV\_SIGNAL, and EV\_ET. 'old' will be those events that
\* were enabled on this fd previously. 'fdinfo' will be a structure
\* associated with the fd by the evmap; its size is defined by the
\* fdinfo field below. It will be set to 0 the first time the fd is
\* added. The function should return 0 on success and -1 on error.
\*/
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** As "add", except 'events' contains the events we mean to disable. */
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** Function to implement the core of an event loop. It must see which
added events are ready, and cause event\_active to be called for each
active event (usually via event\_io\_active or such). It should
return 0 on success and -1 on error.
\*/
int (*dispatch)(struct event_base *, struct timeval *);
/** Function to clean up and free our data from the event_base. */
void (*dealloc)(struct event_base *);
/** Flag: set if we need to reinitialize the event base after we fork.
\*/
int need_reinit;
/** Bit-array of supported event_method_features that this backend can
\* provide. \*/
enum event_method_feature features;
/** Length of the extra information we should record for each fd that
has one or more active events. This information is recorded
as part of the evmap entry for each fd, and passed as an argument
to the add and del functions above.
\*/
size\_t fdinfo\_len;
};
这里以linux上用的最多的epoll来说明,epoll对应之上的接口实现为:
static const struct eventop epollops_changelist = {
"epoll (with changelist)",
epoll_init,
event_changelist_add,
event_changelist_del,
epoll_dispatch,
epoll_dealloc,
1, /* need reinit */
EV_FEATURE_ET|EV_FEATURE_O1,
EVENT_CHANGELIST_FDINFO_SIZE
};
epoll_init是创建一个epoll fd, event_changelist_add是注册一个io事件。event_change是添加删除fd可读(可写)事件的时候一个保存变量,event_change_list是change的一个map,基本来说是一个evmap_io对应一个change了,其实也就是一个io fd对应其event和change。event_changelist_add, event_changelist_del是改变change的操作。epoll_wait之前会更据这个fd对应的chang来调用相应的epoll_ctl,这样不管用户在一个IO上调用多少次添加删除事件,最后只会累积成一次epoll_ctl调用。
io event添加的过程是这样,event_add触发evmap_io_add, 把这个event与其fd对应起来,之后在事件循环中等待,如果fd激活了,就把该fd对应的激活event放到全局的active events list中,这和timer event一样,之后会在event_process_active统一处理掉。
信号事件在libevent的实现是这样,socketpair创建unix域套接字,sigaction注册对应的signo,当信号发生时进入统一的回调处理函数,这个函数使用之前创建的unix域套接字一端给另一端发送激活信号,接收的那一端socket根据收到的信号,调用用户的回调函数。
这个和通常的想法,直接注册信号回调到自己的回调实现不一样,看起来libevent这样做就把signal event转成io event和其他event统一起来通过dispatch等待,之后都在active events list中处理激活event。
libevent现在是支持在不同线程中添加事件了,因为加了锁。但是在激活回调用户实现的时候没法实现异步,只能同步串行化处理,这样就不能充分利用多核CPU了。有两种方法可以解决这个问题,一个就是在不同线程中分别创建不同的event_base和各自dispatch;另一个就是实现那还种半同步半异步模型,维护一个线程池和一个任务队列,libevent同步添加激活的回调放到任务队列,线程池工作线程异步并行处理任务。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章