1,前言
Libevent是一个轻量级的开源高性能网络库,使用者众多,研究者更甚,相关文章也不少。写这一系列文章的用意在于,一则分享心得;二则对libevent代码和设计思想做系统的、更深层次的分析,写出来,也可供后来者参考。
附带一句:Libevent是用c语言编写的(MS大牛们都偏爱c语言哪),而且几乎是无处不函数指针,学习其源代码也需要相当的c语言基础。
2,libevent简介
上来当然要先夸奖啦,Libevent 有几个显著的亮点:
=> 事件驱动(event-driven),高性能;
=> 轻量级,专注于网络,不如ACE那么臃肿庞大;
=> 源代码相当精炼、易读;
=> 跨平台,支持Windows、Linux、*BSD和Mac Os;
=> 支持多种I/O多路复用技术, epoll、poll、dev/poll、select和kqueue等;
=> 支持I/O,定时器和信号等事件;
=> 注册事件优先级;
Libevent已经被广泛的应用,作为底层的网络库;比如memcached、Vomit、Nylon、Netchat等等。
Libevent当前的最新稳定版是1.4.13;这也是本文参照的版本。
3,学习的好处
学习libevent有助于提升程序设计功力,除了网络程序设计方面外,Libevent的代码里有很多有用的设计技巧和基础数据结构,比如信息隐藏、函数指针、c语言的多态支持、链表和堆等等,都有助于提升自身的程序功力。
程序设计不止要了解框架,很多细节之处恰恰也是事关整个系统成败的关键。只对libevent本身的框架大概了解,那或许仅仅是一知半解,不深入代码分析,就难以了解其设计的精巧之处,也就难以为自己所用。
事实上Libevent本身就是一个典型的Reactor模型,理解Reactor模式是理解libevent的基石;因此下一节将介绍典型的事件驱动设计模式——Reactor模式。
参考资料:Libevent: http://monkey.org/~provos/libevent/
第二章
前面讲到,整个libevent本身就是一个Reactor,因此本节将专门对Reactor模式进行必要的介绍,并列出libevnet中的几个重要组件和Reactor的对应关系,在后面的章节中可能还会提到本节介绍的基本概念。
首先来回想一下普通函数调用的机制:程序调用某函数?函数执行,程序等待?函数将结果和控制权返回给程序?程序继续处理。
Reactor释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。使用Libevent也是想Libevent框架注册相应的事件和回调函数;当这些时间发声时,Libevent会调用这些回调函数处理相应的事件(I/O读写、定时和信号)。
用“好莱坞原则”来形容Reactor再合适不过了:不要打电话给我们,我们会打电话通知你。
举个例子:你去应聘某xx公司,面试结束后。
“普通函数调用机制”公司HR比较懒,不会记你的联系方式,那怎么办呢,你只能面试完后自己打电话去问结果;有没有被录取啊,还是被据了;
“Reactor”公司HR就记下了你的联系方式,结果出来后会主动打电话通知你:有没有被录取啊,还是被据了;你不用自己打电话去问结果,事实上也不能,你没有HR的留联系方式。
Reactor模式是编写高性能网络服务器的必备技术之一,它具有如下的优点
1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;
使用Reactor模型,必备的几个组件:事件源、Reactor框架、多路复用机制和事件处理程序,先来看看Reactor模型的整体框架,接下来再对每个组件做逐一说明。
1) 事件源
Linux上是文件描述符,Windows上就是Socket或者Handle了,这里统一称为“句柄集”;程序在指定的句柄上注册关心的事件,比如I/O事件。
2) event demultiplexer——事件多路分发机制
由操作系统提供的I/O多路复用机制,比如select和epoll。
程序首先将其关心的句柄(事件源)及其事件注册到event demultiplexer上;
当有事件到达时,event demultiplexer会发出通知“在已经注册的句柄集中,一个或多个句柄的事件已经就绪”;
程序收到通知后,就可以在非阻塞的情况下对事件进行处理了。
对应到libevent中,依然是select、poll、epoll等,但是libevent使用结构体eventop进行了封装,以统一的接口来支持这些I/O多路复用机制,达到了对外隐藏底层系统机制的目的。
3) Reactor——反应器
Reactor,是事件管理的接口,内部使用event demultiplexer注册、注销事件;并运行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数处理事件。
对应到libevent中,就是event_base结构体。
一个典型的Reactor声明方式
1 class Reactor
2 {
3 public:
4 int register_handler(Event_Handler *pHandler, int event);
5 int remove_handler(Event_Handler *pHandler, int event);
6 void handle_events(timeval *ptv);
7 // …
8 };
4) Event Handler——事件处理程序
事件处理程序提供了一组接口,每个接口对应了一种类型的事件,供Reactor在相应的事件发生时调用,执行相应的事件处理。通常它会绑定一个有效的句柄。
对应到libevent中,就是event结构体。
下面是两种典型的Event Handler类声明方式,二者互有优缺点。
1 class Event_Handler
2 {
3 public:
4 virtual void handle_read() = 0;
5 virtual void handle_write() = 0;
6 virtual void handle_timeout() = 0;
7 virtual void handle_close() = 0;
8 virtual HANDLE get_handle() = 0;
9 // …
10 };
11 class Event_Handler
12 {
13 public:
14 // events maybe read/write/timeout/close .etc
15 virtual void handle_events(int events) = 0;
16 virtual HANDLE get_handle() = 0;
17 // …
18 };
前面说过Reactor将事件流“逆置”了,那么使用Reactor模式后,事件控制流是什么样子呢?
可以参见下面的序列图。
上面讲到了Reactor的基本概念、框架和处理流程,对Reactor有个基本清晰的了解后,再来对比看libevent就会更容易理解了,接下来就正式进入到libevent的代码世界了,加油!
参考资料:
Pattern-Oriented Software Architecture, Patterns for Concurrent and Networked Objects, Volume 2
第三章
学习源代码该从哪里入手?我觉得从程序的基本使用场景和代码的整体处理流程入手是个不错的方法,至少从个人的经验上讲,用此方法分析libevent是比较有效的。
基本应用场景也是使用libevnet的基本流程,下面来考虑一个最简单的场景,使用livevent设置定时器,应用程序只需要执行下面几个简单的步骤即可。
1)首先初始化libevent库,并保存返回的指针
1 struct event_base * base = event_init();
实际上这一步相当于初始化一个Reactor实例;在初始化libevent后,就可以注册事件了。
2)初始化事件event,设置回调函数和关注的事件
1 evtimer_set(&ev, timer_cb, NULL);
事实上这等价于调用
1 event_set(&ev, -1, 0, timer_cb, NULL);
event_set的函数原型是:
1 void event_set(struct event *ev, int fd, short event, void (*cb)(int, short, void *), void *arg)
ev:执行要初始化的event对象;
fd:该event绑定的“句柄”,对于信号事件,它就是关注的信号;
event:在该fd上关注的事件类型,它可以是EV_READ, EV_WRITE, EV_SIGNAL;
cb:这是一个函数指针,当fd上的事件event发生时,调用该函数执行处理,它有三个参数,调用时由event_base负责传入,按顺序,实际上就是event_set时的fd, event和arg;
arg:传递给cb函数指针的参数;
由于定时事件不需要fd,并且定时事件是根据添加时(event_add)的超时值设定的,因此这里event也不需要设置。
这一步相当于初始化一个event handler,在libevent中事件类型保存在event结构体中。
注意:libevent并不会管理event事件集合,这需要应用程序自行管理;
3)设置event从属的event_base
1 event_base_set(base, &ev);
这一步相当于指明event要注册到哪个event_base实例上;
4)是正式的添加事件的时候了
1 event_add(&ev, timeout);
基本信息都已设置完成,只要简单的调用event_add()函数即可完成,其中timeout是定时值;
这一步相当于调用Reactor::register_handler()函数注册事件。
5)程序进入无限循环,等待就绪事件并执行事件处理
1 event_base_dispatch(base);
上面例子的程序代码如下所示
1 struct event ev;
2 struct timeval tv;
3 void time_cb(int fd, short event, void *argc)
4 {
5 printf("timer wakeup/n");
6 event_add(&ev, &tv); // reschedule timer
7 }
8 int main()
9 {
10 struct event_base *base = event_init();
11 tv.tv_sec = 10; // 10s period
12 tv.tv_usec = 0;
13 evtimer_set(&ev, time_cb, NULL);
14 event_add(&ev, &tv);
15 event_base_dispatch(base);
16 }
当应用程序向libevent注册一个事件后,libevent内部是怎么样进行处理的呢?下面的图就给出了这一基本流程。
1)首先应用程序准备并初始化event,设置好事件类型和回调函数;这对应于前面第步骤2和3;
2)向libevent添加该事件event。对于定时事件,libevent使用一个小根堆管理,key为超时时间;对于Signal和I/O事件,libevent将其放入到等待链表(wait list)中,这是一个双向链表结构;
3)程序调用event_base_dispatch()系列函数进入无限循环,等待事件,以select()函数为例;每次循环前libevent会检查定时事件的最小超时时间tv,根据tv设置select()的最大等待时间,以便于后面及时处理超
时事件;
当select()返回后,首先检查超时事件,然后检查I/O事件;
Libevent将所有的就绪事件,放入到激活链表中;
然后对激活链表中的事件,调用事件的回调函数执行事件处理;
本节介绍了libevent的简单实用场景,并旋风般的介绍了libevent的事件处理流程,读者应该对libevent有了基本的印象,下面将会详细介绍libevent的事件管理框架(Reactor模式中的Reactor框架)做详细的介绍,在此之前会对源代码文件做简单的分类。
第四章
libevent源代码文件组织
详细分析源代码之前,如果能对其代码文件的基本结构有个大概的认识和分类,对于代码的分析将是大有裨益的。本节内容不多,我想并不是说它不重要!
Libevent的源代码虽然都在一层文件夹下面,但是其代码分类还是相当清晰的,主要可分为头文件、内部使用的头文件、辅助功能函数、日志、libevent框架、对系统I/O多路复用机制的封装、信号管理、定时事件管理、缓冲区管理、基本数据结构和基于libevent的两个实用库等几个部分,有些部分可能就是一个源文件。
源代码中的test部分就不在我们关注的范畴了。
1)头文件
主要就是event.h:事件宏定义、接口函数声明,主要结构体event的声明;
2)内部头文件
xxx-internal.h:内部数据结构和函数,对外不可见,以达到信息隐藏的目的;
3)libevent框架
event.c:event整体框架的代码实现;
4)对系统I/O多路复用机制的封装
epoll.c:对epoll的封装;
select.c:对select的封装;
devpoll.c:对dev/poll的封装;
kqueue.c:对kqueue的封装;
5)定时事件管理
min-heap.h:其实就是一个以时间作为key的小根堆结构;
6)信号管理
signal.c:对信号事件的处理;
7)辅助功能函数
evutil.h 和evutil.c:一些辅助功能函数,包括创建socket pair和一些时间操作函数:加、减和比较等。
8)日志
log.h和log.c:log日志函数
9)缓冲区管理
evbuffer.c和buffer.c:libevent对缓冲区的封装;
10)基本数据结构
compat/sys下的两个源文件:queue.h是libevent基本数据结构的实现,包括链表,双向链表,队列等;_libevent_time.h:一些用于时间操作的结构体定义、函数和宏定义;
11)实用网络库
http和evdns:是基于libevent实现的http服务器和异步dns查询库;
本节介绍了libevent的组织和分类,下面将会详细介绍libevent的核心部分event结构。
第五章 libevent的核心:事件event
对事件处理流程有了高层的认识后,本节将详细介绍libevent的核心结构event,以及libevent对event的管理。
Libevent是基于事件驱动(event-driven)的,从名字也可以看到event是整个库的核心。event就是Reactor框架中的事件处理程序组件;它提供了函数接口,供Reactor在事件发生时调用,以执行相应的事件处理,通常它会绑定一个有效的句柄。
首先给出event结构体的声明,它位于event.h文件中:
1 struct event {
2 TAILQ_ENTRY (event) ev_next;
3 TAILQ_ENTRY (event) ev_active_next;
4 TAILQ_ENTRY (event) ev_signal_next;
5 unsigned int min_heap_idx; /* for managing timeouts */
6 struct event_base *ev_base;
7 int ev_fd;
8 short ev_events;
9 short ev_ncalls;
10 short *ev_pncalls; /* Allows deletes in callback */
11 struct timeval ev_timeout;
12 int ev_pri; /* smaller numbers are higher priority */
13 void (*ev_callback)(int, short, void *arg);
14 void *ev_arg;
15 int ev_res; /* result passed to event callback */
16 int ev_flags;
17 };
下面简单解释一下结构体中各字段的含义。
1)ev_events:event关注的事件类型,它可以是以下3种类型:
I/O事件: EV_WRITE和EV_READ
定时事件:EV_TIMEOUT
信号: EV_SIGNAL
辅助选项:EV_PERSIST,表明是一个永久事件
Libevent中的定义为:
1 #define EV_TIMEOUT 0x01
2 #define EV_READ 0x02
3 #define EV_WRITE 0x04
4 #define EV_SIGNAL 0x08
5 #define EV_PERSIST 0x10 /* Persistant event */
可以看出事件类型可以使用“|”运算符进行组合,需要说明的是,信号和I/O事件不能同时设置;
还可以看出libevent使用event结构体将这3种事件的处理统一起来;
2)ev_next,ev_active_next和ev_signal_next都是双向链表节点指针;它们是libevent对不同事件类型和在不同的时期,对事件的管理时使用到的字段。
libevent使用双向链表保存所有注册的I/O和Signal事件,ev_next就是该I/O事件在链表中的位置;称此链表为“已注册事件链表”;
同样ev_signal_next就是signal事件在signal事件链表中的位置;
ev_active_next:libevent将所有的激活事件放入到链表active list中,然后遍历active list执行调度,ev_active_next就指明了event在active list中的位置;
3)min_heap_idx和ev_timeout,如果是timeout事件,它们是event在小根堆中的索引和超时值,libevent使用小根堆来管理定时事件,这将在后面定时事件处理时专门讲解
4)ev_base该事件所属的反应堆实例,这是一个event_base结构体,下一节将会详细讲解;
5)ev_fd,对于I/O事件,是绑定的文件描述符;对于signal事件,是绑定的信号;
6)ev_callback,event的回调函数,被ev_base调用,执行事件处理程序,这是一个函数指针,原型为:
1 void (*ev_callback)(int fd, short events, void *arg)
其中参数fd对应于ev_fd;events对应于ev_events;arg对应于ev_arg;
7)ev_arg:void*,表明可以是任意类型的数据,在设置event时指定;
8)eb_flags:libevent用于标记event信息的字段,表明其当前的状态,可能的值有:
1 #define EVLIST_TIMEOUT 0x01 // event在time堆中
2 #define EVLIST_INSERTED 0x02 // event在已注册事件链表中
3 #define EVLIST_SIGNAL 0x04 // 未见使用
4 #define EVLIST_ACTIVE 0x08 // event在激活链表中
5 #define EVLIST_INTERNAL 0x10 // 内部使用标记
6 #define EVLIST_INIT 0x80 // event已被初始化
9)ev_ncalls:事件就绪执行时,调用ev_callback的次数,通常为1;
10)ev_pncalls:指针,通常指向ev_ncalls或者为NULL;
11)ev_res:记录了当前激活事件的类型;
从event结构体中的3个链表节点指针和一个堆索引出发,大体上也能窥出libevent对event的管理方法了,可以参见下面的示意图:
每次当有事件event转变为就绪状态时,libevent就会把它移入到active event list[priority]中,其中priority是event的优先级;
接着libevent会根据自己的调度策略选择就绪事件,调用其cb_callback()函数执行事件处理;并根据就绪的句柄和事件类型填充cb_callback函数的参数。
要向libevent添加一个事件,需要首先设置event对象,这通过调用libevent提供的函数有:event_set(), event_base_set(), event_priority_set()来完成;下面分别进行讲解。
1 void event_set(struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg)
1.设置事件ev绑定的文件描述符或者信号,对于定时事件,设为-1即可;
2.设置事件类型,比如EV_READ|EV_PERSIST, EV_WRITE, EV_SIGNAL等;
3.设置事件的回调函数以及参数arg;
4.初始化其它字段,比如缺省的event_base和优先级;
1 int event_base_set(struct event_base *base, struct event *ev)
设置event ev将要注册到的event_base;
libevent有一个全局event_base指针current_base,默认情况下事件ev将被注册到current_base上,使用该函数可以指定不同的event_base;
如果一个进程中存在多个libevent实例,则必须要调用该函数为event设置不同的event_base;
1 int event_priority_set(struct event *ev, int pri)
设置event ev的优先级,没什么可说的,注意的一点就是:当ev正处于就绪状态时,不能设置,返回-1。
本节讲述了libevent的核心event结构,以及libevent支持的事件类型和libevent对event的管理模型;接下来将会描述libevent的事件处理框架,以及其中使用的重要的结构体event_base;
第六章 初见事件处理框架
前面已经对libevent的事件处理框架和event结构体做了描述,现在是时候剖析libevent对事件的详细处理流程了,本节将分析libevent的事件处理框架event_base和libevent注册、删除事件的具体流程,可结合前一节libevent对event的管理。
回想Reactor模式的几个基本组件,本节讲解的部分对应于Reactor框架组件。在libevent中,这就表现为event_base结构体,结构体声明如下,它位于event-internal.h文件中:
1 struct event_base {
2 const struct eventop *evsel;
3 void *evbase;
4 int event_count; /* counts number of total events */
5 int event_count_active; /* counts number of active events */
6 int event_gotterm; /* Set to terminate loop */
7 int event_break; /* Set to terminate loop immediately */
8 /* active event management */
9 struct event_list **activequeues;
10 int nactivequeues;
11 /* signal handling info */
12 struct evsignal_info sig;
13 struct event_list eventqueue;
14 struct timeval event_tv;
15 struct min_heap timeheap;
16 struct timeval tv_cache;
17 };
下面详细解释一下结构体中各字段的含义。
1)evsel和evbase这两个字段的设置可能会让人有些迷惑,这里你可以把evsel和evbase看作是类和静态函数的关系,
比如添加事件时的调用行为:evsel->add(evbase, ev),实际执行操作的是evbase;这相当于class::add(instance, ev),instance就是class的一个对象实例。
evsel指向了全局变量static const struct eventop *eventops[]中的一个;
前面也说过,libevent将系统提供的I/O demultiplex机制统一封装成了eventop结构;因此eventops[]包含了select、poll、kequeue和epoll等等其中的若干个全局实例对象。
evbase实际上是一个eventop实例对象;
先来看看eventop结构体,它的成员是一系列的函数指针, 在event-internal.h文件中:
1 struct eventop {
2 const char *name;
3 void *(*init)(struct event_base *); // 初始化
4 int (*add)(void *, struct event *); // 注册事件
5 int (*del)(void *, struct event *); // 删除事件
6 int (*dispatch)(struct event_base *, void *, struct timeval *); // 事件分发
7 void (*dealloc)(struct event_base *, void *); // 注销,释放资源
8 /* set if we need to reinitialize the event base */
9 int need_reinit;
10 };
也就是说,在libevent中,每种I/O demultiplex机制的实现都必须提供这五个函数接口,来完成自身的初始化、销毁释放;对事件的注册、注销和分发。
比如对于epoll,libevent实现了5个对应的接口函数,并在初始化时并将eventop的5个函数指针指向这5个函数,那么程序就可以使用epoll作为I/O demultiplex机制了,这个在后面会再次提到。
2)activequeues是一个二级指针,前面讲过libevent支持事件优先级,因此你可以把它看作是数组,其中的元素activequeues[priority]是一个链表,链表的每个节点指向一个优先级为priority的就绪事件event。
3)eventqueue,链表,保存了所有的注册事件event的指针。
4)sig是由来管理信号的结构体,将在后面信号处理时专门讲解;
5)timeheap是管理定时事件的小根堆,将在后面定时事件处理时专门讲解;
6)event_tv和tv_cache是libevent用于时间管理的变量,将在后面讲到;
其它各个变量都能因名知意,就不再啰嗦了。
创建一个event_base对象也既是创建了一个新的libevent实例,程序需要通过调用event_init()(内部调用event_base_new函数执行具体操作)函数来创建,该函数同时还对新生成的libevent实例进行了初始化。
该函数首先为event_base实例申请空间,然后初始化timer mini-heap,选择并初始化合适的系统I/O 的demultiplexer机制,初始化各事件链表;
函数还检测了系统的时间设置,为后面的时间管理打下基础。
前面提到Reactor框架的作用就是提供事件的注册、注销接口;根据系统提供的事件多路分发机制执行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数来处理事件。
Libevent中对应的接口函数主要就是:
1 int event_add(struct event *ev, const struct timeval *timeout);
2 int event_del(struct event *ev);
3 int event_base_loop(struct event_base *base, int loops);
4 void event_active(struct event *event, int res, short events);
5 void event_process_active(struct event_base *base);
本节将按介绍事件注册和删除的代码流程,libevent的事件循环框架将在下一节再具体描述。
对于定时事件,这些函数将调用timer
heap管理接口执行插入和删除操作;对于I/O和Signal事件将调用eventopadd和delete接口函数执行插入和删除操作(eventop会对Signal事件调用Signal处理接口执行操作);这些组件将在后面的内容描述。
1)注册事件
函数原型:
1 int event_add(struct event *ev, const struct timeval *tv)
参数:
ev:指向要注册的事件;
tv:超时时间;
函数将ev注册到ev->ev_base上,事件类型由ev->ev_events指明,如果注册成功,ev将被插入到已注册链表中;如果tv不是NULL,则会同时注册定时事件,将ev添加到timer堆上;
如果其中有一步操作失败,那么函数保证没有事件会被注册,可以讲这相当于一个原子操作。这个函数也体现了libevent细节之处的巧妙设计,且仔细看程序代码,部分有省略,注释直接附在代码中。
1 int event_add(struct event *ev, const struct timeval *tv)
2 {
3 struct event_base *base = ev->ev_base; // 要注册到的event_base
4 const struct eventop *evsel = base->evsel;
5 void *evbase = base->evbase; // base使用的系统I/O策略
6 // 新的timer事件,调用timer heap接口在堆上预留一个位置
7 // 注:这样能保证该操作的原子性:
8 // 向系统I/O机制注册可能会失败,而当在堆上预留成功后,
9 // 定时事件的添加将肯定不会失败;
10 // 而预留位置的可能结果是堆扩充,但是内部元素并不会改变
11 if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
12 if (min_heap_reserve(&base->timeheap,
13 1 + min_heap_size(&base->timeheap)) == -1)
14 return (-1); /* ENOMEM == errno */
15 }
16 // 如果事件ev不在已注册或者激活链表中,则调用evbase注册事件
17 if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
18 !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
19 res = evsel->add(evbase, ev);
20 if (res != -1) // 注册成功,插入event到已注册链表中
21 event_queue_insert(base, ev, EVLIST_INSERTED);
22 }
23 // 准备添加定时事件
24 if (res != -1 && tv != NULL) {
25 struct timeval now;
26 // EVLIST_TIMEOUT表明event已经在定时器堆中了,删除旧的
27 if (ev->ev_flags & EVLIST_TIMEOUT)
28 event_queue_remove(base, ev, EVLIST_TIMEOUT);
29 // 如果事件已经是就绪状态则从激活链表中删除
30 if ((ev->ev_flags & EVLIST_ACTIVE) &&
31 (ev->ev_res & EV_TIMEOUT)) {
32 // 将ev_callback调用次数设置为0
33 if (ev->ev_ncalls && ev->ev_pncalls) {
34 *ev->ev_pncalls = 0;
35 }
36 event_queue_remove(base, ev, EVLIST_ACTIVE);
37 }
38 // 计算时间,并插入到timer小根堆中
39 gettime(base, &now);
40 evutil_timeradd(&now, tv, &ev->ev_timeout);
41 event_queue_insert(base, ev, EVLIST_TIMEOUT);
42 }
43 return (res);
44 }
45
46 event_queue_insert()负责将事件插入到对应的链表中,下面是程序代码;
47 event_queue_remove()负责将事件从对应的链表中删除,这里就不再重复贴代码了;
48 void event_queue_insert(struct event_base *base, struct event *ev, int queue)
49 {
50 // ev可能已经在激活列表中了,避免重复插入
51 if (ev->ev_flags & queue) {
52 if (queue & EVLIST_ACTIVE)
53 return;
54 }
55 // …
56 ev->ev_flags |= queue; // 记录queue标记
57 switch (queue) {
58 case EVLIST_INSERTED: // I/O或Signal事件,加入已注册事件链表
59 TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
60 break;
61 case EVLIST_ACTIVE: // 就绪事件,加入激活链表
62 base->event_count_active++;
63 TAILQ_INSERT_TAIL(base->activequeues[ev->ev_pri], ev, ev_active_next);
64 break;
65 case EVLIST_TIMEOUT: // 定时事件,加入堆
66 min_heap_push(&base->timeheap, ev);
67 break;
68 }
69 }
2)删除事件:
函数原型为:
1 int event_del(struct event *ev);
函数将删除事件ev,对于I/O事件,从I/O 的demultiplexer上将事件注销;对于Signal事件,将从Signal事件链表中删除;对于定时事件,将从堆上删除;
同样删除事件的操作则不一定是原子的,比如删除时间事件之后,有可能从系统I/O机制中注销会失败。
1 int event_del(struct event *ev)
2 {
3 struct event_base *base;
4 const struct eventop *evsel;
5 void *evbase;
6 // ev_base为NULL,表明ev没有被注册
7 if (ev->ev_base == NULL)
8 return (-1);
9 // 取得ev注册的event_base和eventop指针
10 base = ev->ev_base;
11 evsel = base->evsel;
12 evbase = base->evbase;
13 // 将ev_callback调用次数设置为
14 if (ev->ev_ncalls && ev->ev_pncalls) {
15 *ev->ev_pncalls = 0;
16 }
17
18 // 从对应的链表中删除
19 if (ev->ev_flags & EVLIST_TIMEOUT)
20 event_queue_remove(base, ev, EVLIST_TIMEOUT);
21 if (ev->ev_flags & EVLIST_ACTIVE)
22 event_queue_remove(base, ev, EVLIST_ACTIVE);
23 if (ev->ev_flags & EVLIST_INSERTED) {
24 event_queue_remove(base, ev, EVLIST_INSERTED);
25 // EVLIST_INSERTED表明是I/O或者Signal事件,
26 // 需要调用I/O demultiplexer注销事件
27 return (evsel->del(evbase, ev));
28 }
29 return (0);
30 }
4 ,小节
分析了event_base这一重要结构体,初步看到了libevent对系统的I/O demultiplex机制的封装event_op结构,并结合源代码分析了事件的注册和删除处理,下面将会接着分析事件管理框架中的主事件循环部分。
第七章 事件主循环
现在我们已经初步了解了libevent的Reactor组件——event_base和事件管理框架,接下来就是libevent事件处理的中心部分——事件主循环,根据系统提供的事件多路分发机制执行事件循环,对已注册的就绪事件,调用注册事件的回调函数来处理事件。
Libevent的事件主循环主要是通过event_base_loop ()函数完成的,其主要操作如下面的流程图所示,event_base_loop所作的就是持续执行下面的循环。
清楚了event_base_loop所作的主要操作,就可以对比源代码看个究竟了,代码结构还是相当清晰的。
1 int event_base_loop(struct event_base *base, int flags)
2 {
3 const struct eventop *evsel = base->evsel;
4 void *evbase = base->evbase;
5 struct timeval tv;
6 struct timeval *tv_p;
7 int res, done;
8 // 清空时间缓存
9 base->tv_cache.tv_sec = 0;
10 // evsignal_base是全局变量,在处理signal时,用于指名signal所属的event_base实例
11 if (base->sig.ev_signal_added)
12 evsignal_base = base;
13 done = 0;
14 while (!done) { // 事件主循环
15 // 查看是否需要跳出循环,程序可以调用event_loopexit_cb()设置event_gotterm标记
16 // 调用event_base_loopbreak()设置event_break标记
17 if (base->event_gotterm) {
18 base->event_gotterm = 0;
19 break;
20 }
21 if (base->event_break) {
22 base->event_break = 0;
23 break;
24 }
25 // 校正系统时间,如果系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间
26 // 在timeout_correct函数里,比较last wait time和当前时间,如果当前时间< last wait time
27 // 表明时间有问题,这是需要更新timer_heap中所有定时事件的超时时间。
28 timeout_correct(base, &tv);
29
30 // 根据timer heap中事件的最小超时时间,计算系统I/O demultiplexer的最大等待时间
31 tv_p = &tv;
32 if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
33 timeout_next(base, &tv_p);
34 } else {
35 // 依然有未处理的就绪时间,就让I/O demultiplexer立即返回,不必等待
36 // 下面会提到,在libevent中,低优先级的就绪事件可能不能立即被处理
37 evutil_timerclear(&tv);
38 }
39 // 如果当前没有注册事件,就退出
40 if (!event_haveevents(base)) {
41 event_debug(("%s: no events registered.", __func__));
42 return (1);
43 }
44 // 更新last wait time,并清空time cache
45 gettime(base, &base->event_tv);
46 base->tv_cache.tv_sec = 0;
47 // 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等;
48 // 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中
49 res = evsel->dispatch(base, evbase, tv_p);
50 if (res == -1)
51 return (-1);
52 // 将time cache赋值为当前系统时间
53 gettime(base, &base->tv_cache);
54 // 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中
55 timeout_process(base);
56 // 调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理
57 // 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,
58 // 然后处理链表中的所有就绪事件;
59 // 因此低优先级的就绪事件可能得不到及时处理;
60 if (base->event_count_active) {
61 event_process_active(base);
62 if (!base->event_count_active && (flags & EVLOOP_ONCE))
63 done = 1;
64 } else if (flags & EVLOOP_NONBLOCK)
65 done = 1;
66 }
67 // 循环结束,清空时间缓存
68 base->tv_cache.tv_sec = 0;
69 event_debug(("%s: asked to terminate loop.", __func__));
70 return (0);
71 }
Libevent将Timer和Signal事件都统一到了系统的I/O 的demultiplex机制中了,相信读者从上面的流程和代码中也能窥出一斑了,下面就再啰嗦一次了。
首先将Timer事件融合到系统I/O多路复用机制中,还是相当清晰的,因为系统的I/O机制像select()和epoll_wait()都允许程序制定一个最大等待时间(也称为最大超时时间)timeout,即使没有I/O事件发生,它们也保证能在timeout时间内返回。
那么根据所有Timer事件的最小超时时间来设置系统I/O的timeout时间;当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统的I/O机制中了。
这是在Reactor和Proactor模式(主动器模式,比如Windows上的IOCP)中处理Timer事件的经典方法了,ACE采用的也是这种方法,大家可以参考POSA vol2书中的Reactor模式一节。
堆是一种经典的数据结构,向堆中插入、删除元素时间复杂度都是O(lgN),N为堆中元素的个数,而获取最小key值(小根堆)的复杂度为O(1);因此变成了管理Timer事件的绝佳人选(当然是非唯一的),libevent就是采用的堆结构。
Signal是异步事件的经典事例,将Signal事件统一到系统的I/O多路复用中就不像Timer事件那么自然了,Signal事件的出现对于进程来讲是完全随机的,进程不能只是测试一个变量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下的操作”。
如果当Signal发生时,并不立即调用event的callback函数处理信号,而是设法通知系统的I/O机制,让其返回,然后再统一和I/O事件以及Timer一起处理,不就可以了嘛。是的,这也是libevent中使用的方法。
问题的核心在于,当Signal发生时,如何通知系统的I/O多路复用机制,这里先买个小关子,放到信号处理一节再详细说明,我想读者肯定也能想出通知的方法,比如使用pipe。
介绍了libevent的事件主循环,描述了libevent是如何处理就绪的I/O事件、定时器和信号事件,以及如何将它们无缝的融合到一起。
第八章 集成信号处理
现在我们已经了解了libevent的基本框架:事件管理框架和事件主循环。上节提到了libevent中I/O事件和Signal以及Timer事件的集成,这一节将分析如何将Signal集成到事件主循环的框架中。
前一节已经做了足够多的介绍了,基本方法就是采用“消息机制”。在libevent中这是通过socket pair完成的,下面就来详细分析一下。
Socket pair就是一个socket对,包含两个socket,一个读socket,一个写socket。工作方式如下图所示:
创建一个socket pair并不是复杂的操作,可以参见下面的流程图,清晰起见,其中忽略了一些错误处理和检查。
Libevent提供了辅助函数evutil_socketpair()来创建一个socket pair,可以结合上面的创建流程来分析该函数。
Socket
pair创建好了,可是libevent的事件主循环还是不知道Signal是否发生了啊,看来我们还差了最后一步,那就是:为socket
pair的读socket在libevent的event_base实例上注册一个persist的读事件。
这样当向写socket写入数据时,读socket就会得到通知,触发读事件,从而event_base就能相应的得到通知了。
前面提到过,Libevent会在事件主循环中检查标记,来确定是否有触发的signal,如果标记被设置就处理这些signal,这段代码在各个具体的I/O机制中,以Epoll为例,在epoll_dispatch()函数中,代码片段如
下:
1 res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
2 if (res == -1) {
3 if (errno != EINTR) {
4 event_warn("epoll_wait");
5 return (-1);
6 }
7 evsignal_process(base);// 处理signal事件
8 return (0);
9 } else if (base->sig.evsignal_caught) {
10 evsignal_process(base);// 处理signal事件
11 }
完整的处理框架如下所示:
注1:libevent中,初始化阶段并不注册读socket的读事件,而是在注册信号阶段才会测试并注册;
注2:libevent中,检查I/O事件是在各系统I/O机制的dispatch()函数中完成的,该dispatch()函数在event_base_loop()函数中被调用;
Libevent中Signal事件的管理是通过结构体evsignal_info完成的,结构体位于evsignal.h文件中,定义如下:
1 struct evsignal_info {
2 struct event ev_signal;
3 int ev_signal_pair[2];
4 int ev_signal_added;
5 volatile sig_atomic_t evsignal_caught;
6 struct event_list evsigevents[NSIG];
7 sig_atomic_t evsigcaught[NSIG];
8 #ifdef HAVE_SIGACTION
9 struct sigaction **sh_old;
10 #else
11 ev_sighandler_t **sh_old;
12 #endif
13 int sh_old_max;
14 };
下面详细介绍一下个字段的含义和作用:
1)ev_signal, 为socket pair的读socket向event_base注册读事件时使用的event结构体;
2)ev_signal_pair,socket pair对,作用见第一节的介绍;
3)ev_signal_added,记录ev_signal事件是否已经注册了;
4)evsignal_caught,是否有信号发生的标记;是volatile类型,因为它会在另外的线程中被修改;
5)evsigvents[NSIG],数组,evsigevents[signo]表示注册到信号signo的事件链表;
6)evsigcaught[NSIG],具体记录每个信号触发的次数,evsigcaught[signo]是记录信号signo被触发的次数;
7)sh_old记录了原来的signal处理函数指针,当信号signo注册的event被清空时,需要重新设置其处理函数;
evsignal_info的初始化包括,创建socket pair,设置ev_signal事件(但并没有注册,而是等到有信号注册时才检查并注册),并将所有标记置零,初始化信号的注册事件链表指针等。
注册signal事件是通过evsignal_add(struct event *ev)函数完成的,libevent对所有的信号注册同一个处理函数evsignal_handler(),该函数将在下一段介绍,注册过程如下:
1 取得ev要注册到的信号signo;
2 如果信号signo未被注册,那么就为signo注册信号处理函数evsignal_handler();
3 如果事件ev_signal还没哟注册,就注册ev_signal事件;
4 将事件ev添加到signo的event链表中;
从signo上注销一个已注册的signal事件就更简单了,直接从其已注册事件的链表中移除即可。如果事件链表已空,那么就恢复旧的处理函数;
下面的讲解都以signal()函数为例,sigaction()函数的处理和signal()相似。
处理函数evsignal_handler()函数做的事情很简单,就是记录信号的发生次数,并通知event_base有信号触发,需要处理:
1 static void evsignal_handler(int sig)
2 {
3 int save_errno = errno; // 不覆盖原来的错误代码
4 if (evsignal_base == NULL) {
5 event_warn("%s: received signal %d, but have no base configured", __func__, sig);
6 return;
7 }
8 // 记录信号sig的触发次数,并设置event触发标记
9 evsignal_base->sig.evsigcaught[sig]++;
10 evsignal_base->sig.evsignal_caught = 1;
11 #ifndef HAVE_SIGACTION
12 signal(sig, evsignal_handler); // 重新注册信号
13 #endif
14 // 向写socket写一个字节数据,触发event_base的I/O事件,从而通知其有信号触发,需要处理
15 send(evsignal_base->sig.ev_signal_pair[0], "a", 1, 0);
16 errno = save_errno; // 错误代码
17 }
5,小节
本节介绍了libevent对signal事件的具体处理框架,包括事件注册、删除和socket pair通知机制,以及是如何将Signal事件集成到事件主循环之中的。
第九章 集成定时器事件
现在再来详细分析libevent中I/O事件和Timer事件的集成,与Signal相比,Timer事件的集成会直观和简单很多。Libevent对堆的调整操作做了一些优化,本节还会描述这些优化方法。
因为系统的I/O机制像select()和epoll_wait()都允许程序制定一个最大等待时间(也称为最大超时时间)timeout,即使没有I/O事件发生,它们也保证能在timeout时间内返回。
那么根据所有Timer事件的最小超时时间来设置系统I/O的timeout时间;当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统的I/O机制中了。
具体的代码在源文件event.c的event_base_loop()中,现在就对比代码来看看这一处理方法:
1 if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
2 // 根据Timer事件计算evsel->dispatch的最大等待时间
3 timeout_next(base, &tv_p);
4 } else {
5 // 如果还有活动事件,就不要等待,让evsel->dispatch立即返回
6 evutil_timerclear(&tv);
7 }
8 // …
9 // 调用select() or epoll_wait() 等待就绪I/O事件
10 res = evsel->dispatch(base, evbase, tv_p);
11 // …
12 // 处理超时事件,将超时事件插入到激活链表中
13 timeout_process(base);
timeout_next()函数根据堆中具有最小超时值的事件和当前时间来计算等待时间,下面看看代码:
1 static int timeout_next(struct event_base *base, struct timeval **tv_p)
2 {
3 struct timeval now;
4 struct event *ev;
5 struct timeval *tv = *tv_p;
6 // 堆的首元素具有最小的超时值
7 if ((ev = min_heap_top(&base->timeheap)) == NULL) {
8 // 如果没有定时事件,将等待时间设置为NULL,表示一直阻塞直到有I/O事件发生
9 *tv_p = NULL;
10 return (0);
11 }
12 // 取得当前时间
13 gettime(base, &now);
14 // 如果超时时间<=当前值,不能等待,需要立即返回
15 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
16 evutil_timerclear(tv);
17 return (0);
18 }
19 // 计算等待的时间=当前时间-最小的超时时间
20 evutil_timersub(&ev->ev_timeout, &now, tv);
21 return (0);
22 }
Libevent使用堆来管理Timer事件,其key值就是事件的超时时间,源代码位于文件min_heap.h中。
所有的数据结构书中都有关于堆的详细介绍,向堆中插入、删除元素时间复杂度都是O(lgN),N为堆中元素的个数,而获取最小key值(小根堆)的复杂度为O(1)。堆是一个完全二叉树,基本存储方式是一个数组。
Libevent实现的堆还是比较轻巧的,虽然我不喜欢这种编码方式(搞一些复杂的表达式)。轻巧到什么地方呢,就以插入元素为例,来对比说明,下面伪代码中的size表示当前堆的元素个数:
典型的代码逻辑如下:
1 Heap[size++] = new; // 先放到数组末尾,元素个数+1
2 // 下面就是shift_up()的代码逻辑,不断的将new向上调整
3 _child = size;
4 while(_child>0) // 循环
5 {
6 _parent = (_child-1)/2; // 计算parent
7 if(Heap[_parent].key < Heap[_child].key)
8 break; // 调整结束,跳出循环
9 swap(_parent, _child); // 交换parent和child
10 }
而libevent的heap代码对这一过程做了优化,在插入新元素时,只是为新元素预留了一个位置hole(初始时hole位于数组尾部),但并不立刻将新元素插入到hole上,而是不断向上调整hole的值,将父节点向下调整,最后确认hole就是新元素的所在位置时,才会真正的将新元素插入到hole上,因此在调整过程中就比上面的代码少了一次赋值的操作,代码逻辑是:
下面就是shift_up()的代码逻辑,不断的将new的“预留位置”向上调整
1 // 下面就是shift_up()的代码逻辑,不断的将new的“预留位置”向上调整
2 _hole = size; // _hole就是为new预留的位置,但并不立刻将new放上
3 while(_hole>0) // 循环
4 {
5 _parent = (_hole-1)/2; // 计算parent
6 if(Heap[_parent].key < new.key)
7 break; // 调整结束,跳出循环
8 Heap[_hole] = Heap[_parent]; // 将parent向下调整
9 _hole = _parent; // 将_hole调整到_parent
10 }
11 Heap[_hole] = new; // 调整结束,将new插入到_hole指示的位置
12 size++; // 元素个数+1
由于每次调整都少做一次赋值操作,在调整路径比较长时,调整效率会比第一种有所提高。libevent中的min_heap_shift_up_()函数就是上面逻辑的具体实现,对应的向下调整函数是min_heap_shift_down_()。
举个例子,向一个小根堆3, 5, 8, 7, 12中插入新元素2,使用第一中典型的代码逻辑,其调整过程如下图所示:
使用libevent中的堆调整逻辑,调整过程如下图所示:
对于删除和元素修改操作,也遵从相同的逻辑,就不再罗嗦了。
通过设置系统I/O机制的wait时间,从而简捷的集成Timer事件;主要分析了libevent对堆调整操作的优化。
第十章 支持I/O多路复用技术
Libevent的核心是事件驱动、同步非阻塞,为了达到这一目标,必须采用系统提供的I/O多路复用技术,而这些在Windows、Linux、Unix等不同平台上却各有不同,如何能提供优雅而统一的支持方式,是首要关键的问题,这其实不难,本节就来分析一下。
Libevent支持多种I/O多路复用技术的关键就在于结构体eventop,这个结构体前面也曾提到过,它的成员是一系列的函数指针, 定义在event-internal.h文件中:
1 struct eventop {
2 const char *name;
3 void *(*init)(struct event_base *); // 初始化
4 int (*add)(void *, struct event *); // 注册事件
5 int (*del)(void *, struct event *); // 删除事件
6 int (*dispatch)(struct event_base *, void *, struct timeval *); // 事件分发
7 void (*dealloc)(struct event_base *, void *); // 注销,释放资源
8 /* set if we need to reinitialize the event base */
9 int need_reinit;
10 };
在libevent中,每种I/O demultiplex机制的实现都必须提供这五个函数接口,来完成自身的初始化、销毁释放;对事件的注册、注销和分发。
比如对于epoll,libevent实现了5个对应的接口函数,并在初始化时并将eventop的5个函数指针指向这5个函数,那么程序就可以使用epoll作为I/O demultiplex机制了。
Libevent把所有支持的I/O demultiplex机制存储在一个全局静态数组eventops中,并在初始化时选择使用何种机制,数组内容根据优先级顺序声明如下:
1 /* In order of preference */
2 static const struct eventop *eventops[] = {
3 #ifdef HAVE_EVENT_PORTS
4 &evportops,
5 #endif
6 #ifdef HAVE_WORKING_KQUEUE
7 &kqops,
8 #endif
9 #ifdef HAVE_EPOLL
10 &epollops,
11 #endif
12 #ifdef HAVE_DEVPOLL
13 &devpollops,
14 #endif
15 #ifdef HAVE_POLL
16 &pollops,
17 #endif
18 #ifdef HAVE_SELECT
19 &selectops,
20 #endif
21 #ifdef WIN32
22 &win32ops,
23 #endif
24 NULL
25 };
然后libevent根据系统配置和编译选项决定使用哪一种I/O demultiplex机制,这段代码在函数event_base_new()中:
1 base->evbase = NULL;
2 for (i = 0; eventops[i] && !base->evbase; i++) {
3 base->evsel = eventops[i];
4 base->evbase = base->evsel->init(base);
5 }
可以看出,libevent在编译阶段选择系统的I/O demultiplex机制,而不支持在运行阶段根据配置再次选择。
以Linux下面的epoll为例,实现在源文件epoll.c中,eventops对象epollops定义如下:
1 const struct eventop epollops = {
2 "epoll",
3 epoll_init,
4 epoll_add,
5 epoll_del,
6 epoll_dispatch,
7 epoll_dealloc,
8 1 /* need reinit */
9 };
变量epollops中的函数指针具体声明如下,注意到其返回值和参数都和eventop中的定义严格一致,这是函数指针的语法限制。
1 static void *epoll_init (struct event_base *);
2 static int epoll_add (void *, struct event *);
3 static int epoll_del (void *, struct event *);
4 static int epoll_dispatch(struct event_base *, void *, struct timeval *);
5 static void epoll_dealloc (struct event_base *, void *);
那么如果选择的是epoll,那么调用结构体eventop的init和dispatch函数指针时,实际调用的函数就是epoll的初始化函数epoll_init()和事件分发函数epoll_dispatch()了;
关于epoll的具体用法这里就不多说了,可以参见介绍epoll的文章(本人的哈哈):http://blog.csdn.net/sparkliang/archive/2009/11/05/4770655.aspx
C++语言提供了虚函数来实现多态,在C语言中,这是通过函数指针实现的。对于各类函数指针的详细说明可以参见文章:http://blog.csdn.net/sparkliang/archive/2009/06/09/4254115.aspx
同样的,上面epollops以及epoll的各种函数都直接定义在了epoll.c源文件中,对外都是不可见的。对于libevent的使用者而言,完全不会知道它们的存在,对epoll的使用也是通过eventop来完成的,达到了信息隐藏的目的。
支持多种I/O demultiplex机制的方法其实挺简单的,借助于函数指针就OK了。通过对源代码的分析也可以看出,Libevent是在编译阶段选择系统的I/O demultiplex机制的,而不支持在运行阶段根据配置再次选择。
第十一章 时间管理
为了支持定时器,Libevent必须和系统时间打交道,这一部分的内容也比较简单,主要涉及到时间的加减辅助函数、时间缓存、时间校正和定时器堆的时间值调整等。下面就结合源代码来分析一下。
Libevent在初始化时会检测系统时间的类型,通过调用函数detect_monotonic()完成,它通过调用clock_gettime()来检测系统是否支持monotonic时钟类型:
1 static void detect_monotonic(void)
2 {
3 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4 struct timespec ts;
5 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0)
6 use_monotonic = 1; // 系统支持monotonic时间
7 #endif
8 }
Monotonic时间指示的是系统从boot后到现在所经过的时间,如果系统支持Monotonic时间就将全局变量use_monotonic设置为1,设置use_monotonic到底有什么用,这个在后面说到时间校正时就能看出来了。
结构体event_base中的tv_cache,用来记录时间缓存。这个还要从函数gettime()说起,先来看看该函数的代码:
1 static int gettime(struct event_base *base, struct timeval *tp)
2 {
3 // 如果tv_cache时间缓存已设置,就直接使用
4 if (base->tv_cache.tv_sec) {
5 *tp = base->tv_cache;
6 return (0);
7 }
8 // 如果支持monotonic,就用clock_gettime获取monotonic时间
9 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
10 if (use_monotonic) {
11 struct timespec ts;
12 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1)
13 return (-1);
14 tp->tv_sec = ts.tv_sec;
15 tp->tv_usec = ts.tv_nsec / 1000;
16 return (0);
17 }
18 #endif
19 // 否则只能取得系统当前时间
20 return (evutil_gettimeofday(tp, NULL));
21 }
如果tv_cache已经设置,那么就直接使用缓存的时间;否则需要再次执行系统调用获取系统时间。
函数evutil_gettimeofday()用来获取当前系统时间,在Linux下其实就是系统调用gettimeofday();Windows没有提供函数gettimeofday,而是通过调用_ftime()来完成的。
在每次系统事件循环中,时间缓存tv_cache将会被相应的清空和设置,再次来看看下面event_base_loop的主要代码逻辑:
1 int event_base_loop(struct event_base *base, int flags)
2 {
3 // 清空时间缓存
4 base->tv_cache.tv_sec = 0;
5 while(!done){
6 timeout_correct(base, &tv); // 时间校正
7 // 更新event_tv到tv_cache指示的时间或者当前时间(第一次)
8 // event_tv <--- tv\_cache
9 gettime(base, &base->event_tv);
10 // 清空时间缓存-- 时间点1
11 base->tv_cache.tv_sec = 0;
12 // 等待I/O事件就绪
13 res = evsel->dispatch(base, evbase, tv_p);
14 // 缓存tv_cache存储了当前时间的值-- 时间点2
15 // tv_cache <--- now
16 gettime(base, &base->tv_cache);
17 // .. 处理就绪事件
18 }
19 // 退出时也要清空时间缓存
20 base->tv_cache.tv_sec = 0;
21 return (0);
22 }
时间event_tv指示了dispatch()上次返回,也就是I/O事件就绪时的时间,第一次进入循环时,由于tv_cache被清空,因此gettime()执行系统调用获取当前系统时间;而后将会更新为tv_cache指示的时间。
时间tv_cache在dispatch()返回后被设置为当前系统时间,因此它缓存了本次I/O事件就绪时的时间(event_tv)。
从代码逻辑里可以看出event_tv取得的是tv_cache上一次的值,因此event_tv应该小于tv_cache的值。
设置时间缓存的优点是不必每次获取时间都执行系统调用,这是个相对费时的操作;在上面标注的时间点2到时间点1的这段时间(处理就绪事件时),调用gettime()取得的都是tv_cache缓存的时间。
如果系统支持monotonic时间,该时间是系统从boot后到现在所经过的时间,因此不需要执行校正。
根据前面的代码逻辑,如果系统不支持monotonic时间,用户可能会手动的调整时间,如果时间被向前调整了(MS前面第7部分讲成了向后调整,要改正),比如从5点调整到了3点,那么在时间点2取得的值可能
会小于上次的时间,这就需要调整了,下面来看看校正的具体代码,由函数timeout_correct()完成:
1 static void timeout_correct(struct event_base *base, struct timeval *tv)
2 {
3 struct event **pev;
4 unsigned int size;
5 struct timeval off;
6 if (use_monotonic) // monotonic时间就直接返回,无需调整
7 return;
8 gettime(base, tv); // tv <---tv_cache
9 // 根据前面的分析可以知道event_tv应该小于tv_cache
10 // 如果tv < event_tv表明用户向前调整时间了,需要校正时间
11 if (evutil_timercmp(tv, &base->event_tv, >=)) {
12 base->event_tv = *tv;
13 return;
14 }
15 // 计算时间差值
16 evutil_timersub(&base->event_tv, tv, &off);
17 // 调整定时事件小根堆
18 pev = base->timeheap.p;
19 size = base->timeheap.n;
20 for (; size-- > 0; ++pev) {
21 struct timeval *ev_tv = &(**pev).ev_timeout;
22 evutil_timersub(ev_tv, &off, ev_tv);
23 }
24 base->event_tv = *tv; // 更新event_tv为tv_cache
25 }
在调整小根堆时,因为所有定时事件的时间值都会被减去相同的值,因此虽然堆中元素的时间键值改变了,但是相对关系并没有改变,不会改变堆的整体结构。因此只需要遍历堆中的所有元素,将每个元素的时间键值减去相同的值即可完成调整,不需要重新调整堆的结构。
当然调整完后,要将event_tv值重新设置为tv_cache值了。
主要分析了一下libevent对系统时间的处理,时间缓存、时间校正和定时堆的时间值调整等,逻辑还是很简单的,时间的加减、设置等辅助函数则非常简单,主要在头文件evutil.h中,就不再多说了
第十二章 让libevent支持多线程
Libevent本身不是多线程安全的,在多核的时代,如何能充分利用CPU的能力呢,这一节来说说如何在多线程环境中使用libevent,跟源代码并没有太大的关系,纯粹是使用上的技巧。
在多核的CPU上只使用一个线程始终是对不起CPU的处理能力啊,那好吧,那就多创建几个线程,比如下面的简单服务器场景
1> 主线程创建工作线程1;
2> 接着主线程监听在端口上,等待新的连接;
3> 在线程1中执行event事件循环,等待事件到来;
4> 新连接到来,主线程调用libevent接口event_add将新连接注册到libevent上;
上面的逻辑看起来没什么错误,在很多服务器设计中都可能用到主线程和工作线程的模式….
可是就在线程1注册事件时,主线程很可能也在操作事件,比如删除,修改,通过libevent的源代码也能看到,没有同步保护机制,问题麻烦了,看起来不能这样做啊,难道只能使用单线程不成!?
Libevent并不是线程安全的,但这不代表libevent不支持多线程模式,其实方法在前面已经将signal事件处理时就接触到了,那就是消息通知机制。
一句话,“你发消息通知我,然后再由我在合适的时间来处理”;
说到这就再多说几句,再打个比方,把你自己比作一个工作线程,而你的头是主线程,你有一个消息信箱来接收别人发给你的消息,当时头有个新任务要指派给你。
那么第一节中使用的多线程方法相当下面的流程:
1> 当时你正在做事,比如在写文档;
2> 你的头找到了一个任务,要指派给你,比如帮他搞个PPT,哈;
3> 头命令你马上搞PPT,你这是不得不停止手头的工作,把PPT搞定了再接着写文档;
那么基于纯粹的消息通知机制的多线程方式就像下面这样:
1> 当时你正在写文档;
2> 你的头找到了一个任务,要指派给你,帮他搞个PPT;
3> 头发个消息到你信箱,有个PPT要帮他搞定,这时你并不鸟他;
4> 你写好文档,接着检查消息发现头有个PPT要你搞定,你开始搞PPT;
第一种的好处是消息可以立即得到处理,但是很方法很粗暴,你必须立即处理这个消息,所以你必须处理好切换问题,省得把文档上的内容不小心写到PPT里。在操作系统的进程通信中,消息队列(消息信箱)都是操作系统维护的,你不必关心。
第二种的优点是通过消息通知,切换问题省心了,不过消息是不能立即处理的(基于消息通知机制,这个总是难免的),而且所有的内容都通过消息发送,比如PPT的格式、内容等等信息,这无疑增加了通信开销。
有个折中机制可以减少消息通信的开销,就是提取一个同步层,还拿上面的例子来说,你把工作安排都存放在一个工作队列中,而且你能够保证“任何人把新任务扔到这个队列”,“自己取出当前第一个任务”等这些操作都能够保证不会把队列搞乱(其实就是个加锁的队列容器)。
再来看看处理过程和上面有什么不同:
1> 当时你正在写文档;
2> 你的头找到了一个任务,要指派给你,帮他搞个PPT;
3> 头有个PPT要你搞定,他把任务push到你的工作队列中,包括了PPT的格式、内容等信息;
4> 头发个消息(一个字节)到你信箱,有个PPT要帮他搞定,这时你并不鸟他;
5> 你写好文档,发现有新消息(这预示着有新任务来了),检查工作队列知道头有个PPT要你搞定,你开始搞PPT;
…
工作队列其实就是一个加锁的容器(队列、链表等等),这个很容易实现实现;而消息通知仅需要一个字节,具体的任务都push到了在工作队列中,因此想比2.2减少了不少通信开销。
多线程编程有很多陷阱,线程间资源的同步互斥不是一两句能说得清的,而且出现bug很难跟踪调试;这也有很多的经验和教训,因此如果让我选择,在绝大多数情况下都会选择机制3作为实现多线程的方法。
Memcached中的网络部分就是基于libevent完成的,其中的多线程模型就是典型的消息通知+同步层机制。下面的图足够说明其多线程模型了,其中有详细的文字说明。
本节更是libevent的使用方面的技巧,讨论了一下如何让libevent支持多线程,以及几种支持多线程的机制,和memcached使用libevent的多线程模型
第十三章 libevent 信号处理注意点
前面讲到了 libevent 实现多线程的方法,然而在多线程的环境中注册信号事件,还是有一些情况需要小心处理,那就是不能在多个 libevent 实例上注册信号事件。依然冠名追加到 libevent 系列。
以 2 个线程为例,做简单的场景分析。
1> 首先是创建并初始化线程 1 的 libevent 实例 base1 ,线程 1 的 libevent 实例 base2 ;
2 >在 base1 上注册 SIGALRM 信号;在 base2 上注册 SIGINT 信号;
3 >假设当前 base1 和 base2 上都没有注册其他的事件;
4 >线程 1 和 2 都进入 event_base_loop 事件循环:
1 event_base_loop(base1) event_base_loop(base2)
2
3 { {
4
5 if (base2->sig.ev_signal_added) if (base2->sig.ev_signal_added)
6
7 evsignal_base = base1; evsignal_base = base2;
8
9 while(!done) while(!done)
10
11 { {
12
13 … …
14
15 evsel->dispatch(…); evsel->dispatch(…);
16
17 … …
18
19 } }
20
21 } }
5> 假设线程 1 先进入 event_base_loop ,并设置 evsignal_base = base1 ;并等待;
6> 接着线程 2 也进入 event_base_loop ,并设置 evsignal_base = base2 ;并等待;
于是 evsignal_base 就指向了 base2 ;
7> 信号 ALARM 触发,调用服务例程:
1 static void evsignal_handler(int sig)
2
3 {
4
5 …
6
7 evsignal_base->sig.evsigcaught[sig]++;
8
9 evsignal_base->sig.evsignal_caught = 1;
10
11 /* Wake up our notification mechanism */
12
13 send(evsignal_base->sig.ev_signal_pair[0], "a", 1, 0);
14
15 …
16
17 }
于是 base2 得到通知 ALARM 信号发生了,而实际上 ALARM 是注册在 base1 上的, base2 上的 ALARM 注册 event 是空的,于是处理函数将不能得到调用;
因此在 libevent 中,如果需要处理信号,只能将信号注册到一个 libevent 实例上。
memcached 就没有使用 libevent 提供的 signal 接口,而是直接使用系统提供的原生 API ,看起来这样更简洁。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章