我是靠谱客的博主 傲娇柠檬,最近开发中收集的这篇文章主要介绍libevent深入浅出引子前置知识I/O多路复用的实现及存在的问题Reactor模式libevent总结参考链接,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

引子

图1 一种简单的服务架构

在介绍libevent之前,首先抛出一个问题,如果让你设计一个高性能的服务架构,要求服务端能够处理多个客户端连接并响应客户端的请求,如图1所示,你会如何设计?

一个比较直观的想法是在服务端,每来一个客户端连接,即开启一个线程去处理并响应,这种设计的优点是简单易懂,缺点也很明显,如果客户端连接比较多,需要开启多个线程去处理,而操作系统开启线程是需要一定代价的,服务端在多个线程间切换也是比较消耗资源的;另外,多个线程操作临界资源时会带来锁竞争问题,当连接数比较多时竞争会非常激烈。那么接下来,我将介绍一种比较经典的处理方式,学习完之后,也许对于这个系统的设计会有不同的答案。

前置知识

在介绍之前,首先说明几个概念。

  1. I/O多路复用
    进程需要一种预先告知内核的能力,使得内核一旦发现进程指定的一个或多个I/O条件就绪(也就是说输入已准备好,或者描述符已经承接更多的输出),它就通知进程,这种能力称为I/O复用。IO多路复用是一种同步IO模型,实现一个线程可以监视多个文件句柄;一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;没有文件句柄就绪时会阻塞应用程序,交出cpu。多路是指网络连接,复用指的是同一个线程。

  2. 用户空间和内核空间
    现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间,如图2所示。这里还有一个比较重要的概念,叫DMA(Direct Memory Access直接存储器访问),它的作用是处理各种I/O,包括网络I/O和磁盘I/O。CPU是不会直接处理I/O的,这是因为CPU非常宝贵,而I/O是比较耗时的,如果CPU一直等待某一次I/O事件完成,会带来极大的浪费,且性能会急剧下降,因此需要一种机制能够完成I/O,并通知CPU,DMA即是这个角色。

在这里插入图片描述

图2 用户空间和内核空间

从图2中可以看到,对于一次I/O访问,数据会先被拷贝到内核空间中,然后才会拷贝到用户空间中供应用程序使用,所以在这个过程中会经历两个阶段(对应图2中的1,2两个步骤):

  1. 等待数据准备
  2. 将数据从内核空间拷贝到进程中

正是因为这两个阶段,linux系统中产生了两大类网络模式

  • 同步I/O
    • 阻塞I/O
    • 非阻塞IO
    • I/O多路复用
    • 信号驱动I/O(使用较少)
  • 异步I/O

同步I/O和异步I/O主要区别在于:
用户进程在发起I/O操作之后可以立刻去做其他事情,数据拷贝由硬件拷贝到内核空间、从内核空间拷贝到用户空间都不阻塞,这种就是异步I/O;两个步骤中有任何一步发生阻塞,就是同步I/O。

同步I/O又细分了很多模式,它们的区别是:

同步I/O模式详细说明
阻塞I/O在步骤1即阻塞
非阻塞I/O步骤1不阻塞,需轮询;步骤2阻塞
I/O多路复用与阻塞I/O一样,优点是可以处理多个连接

阻塞I/O典型代码(大部分socket接口都是阻塞型的,当然可以设置为非阻塞,以python语言为例,主要是代码简单清晰,其他语言类似)

## 客户端
import socket
sk = socket.socket()
sk.connect(('0.0.0.0', 9600))
while True:
    msg = input('>>>>')
    sk.send(msg.encode())
    if msg.lower() == 'quit':
        break
    ret = sk.recv(1024)
    if ret.lower() == 'quit':
        break
    print(ret.decode())
sk.close()
#########################################################################
## 服务端
import socket
# 默认TCP,参数可以不写
sk = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)  
sk.bind(('0.0.0.0', 9600))
sk.listen(1)
while True:
    conn, ip_addr = sk.accept()
    while True:
        msg = conn.recv(1024).decode()
        if msg.lower() == 'quit':
            break
        print(msg)
        send_msg = input('>>>>')
        conn.send(send_msg.encode())
        if send_msg.lower() == 'quit':
            break
    conn.close()
sk.close()

I/O多路复用的实现及存在的问题

介绍到这里,我们对网络模型的分类以及实现有了基本的认识,接下来重点介绍几种非常经典的I/O多路复用机制。
在介绍之前,还是先抛出一个问题,如果让你写一个程序,要求监听5个文件描述符,当有事件发生时,调用相应的函数,你会怎么写?我们可能写出如下的代码:

while True:
    for fd in fd_list:
        if has_event(fd):
            invoke(fd)
    time.sleep(10)

这段代码的问题是需要不断地去迭代这几个文件描述符,然后检查他们是否有相应的事件到达,如果是则调用处理函数,否则继续遍历。CPU花费了大量的时间在检查上,即使用C语言写也会有很大的性能损耗。优化的方式也很直观,不要无休止地去遍历这几个文件描述符,而是当这几个文件描述符有事件到达时,通知CPU,然后再进行处理,select、poll、epoll多路复用机制就是在此基础上设计的。

select

  • 函数原型:
    int select (int nfds, fd_set * readfds,
                       fd_set * writefds,
                       fd_set *exceptfds,
                       struct timeval *timeout);
    
  • 参数介绍
参数名称类型含义
nfdsint文件描述符最大值+1
readfdsfd_set *读事件文件描述符集合
writefdsfd_set *写事件文件描述符集合
exceptfdsfd_set *异常事件文件描述符集合
timeouttimeval *超时时间,超过该时间即使没有事件
到达,select也会返回,避免无休止地等待
  • 代码分析(以read事件为例)
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
      memset(&addr, 0, sizeof (addr));
      addr.sin_family = AF_INET;
      addr.sin_port = htons(2000);
      addr.sin_addr.s_addr = INADDR_ANY;
      bind(sockfd,(struct sockaddr*)&addr ,sizeof(addr));
      listen (sockfd, 5); 
      for (i=0;i<5;i++) 
      {
        memset(&client, 0, sizeof (client));
        addrlen = sizeof(client);
        fds[i] = accept(sockfd,(struct sockaddr*)&client, &addrlen);
        if(fds[i] > max)
        	max = fds[i];
      }
      // ####################################################################
      while(1){
    	FD_ZERO(&rset);     // 1. 初始化/重置rset
      	for (i = 0; i< 5; i++ ) {
      		FD_SET(fds[i],&rset); // 2. 为rset赋值
      	}
     
    	select(max+1, &rset, NULL, NULL, NULL); // 3. select阻塞
     
    	for(i=0;i<5;i++) {
    		if (FD_ISSET(fds[i], &rset)){
    			memset(buffer,0,MAXBUF);
    			read(fds[i], buffer, MAXBUF);
    			puts(buffer);
    		}
    	}	
      }
      return 0;
    

用"#"符号分隔成了上下两部分,上面部分主要进行初始化工作,准备文件描述符集合,文件描述符是一个非负数,假设此时fds=[1,3,4,5,8],从这里也可以看到文件描述符并不连续,此时文件描述符的最大值max=8;下面部分是select的重点,rset是一个长度为1024的bit数组,它表示哪一个文件描述符被占用,被占用的部分置为1,fds和rset的取值情况如图3
在这里插入图片描述

图3 select中fds和rset的取值

随后就是调用select函数,因为网络I/O中主要是读事件,这里只传入rset,写事件、异常事件文件描述符集合都传NULL,因为select函数有默认超时时间,因此timeout也传NULL,那么,在select函数内部是怎么进行处理的呢,请看图4
在这里插入图片描述

图4 内核检测rset

内核会将传入的rset由用户态拷贝到内核态,然后在内核中判断每一个置位的rset是否有数据到达(将rset拷贝到内核态是因为在内核中判断是否有数据到达比在用户态判断效率高很多),如果有,则标记这一位,如果没有,继续往后判断,除非到达select的超时时间,所以select是一个阻塞的函数。select函数中第一个参数nfds有什么用呢?select在对rset遍历时,会截取0~nfds,因为大于nfds是不会有文件描述符的;当select函数返回时,循环遍历这5个文件描述符,然后检测每一个文件描述符是否被标记,如果标记了,说明有read事件发生,则读取缓存中的数据。这里需要注意某一个文件描述符是否有数据是随机的,可能有多个文件描述符同时有数据到达,因此需要执行for遍历每一个文件描述符。

从select的实现代码及逻辑中可以发现select也存在许多问题,主要有以下几点:

  1. len(rset)=1024,所以最多只能监听1024个文件描述符。这个是操作系统进行的限制,虽然可以更改,但是有上限;
  2. rset不可重用。从代码中可以看到,每次循环时,都需要调用FD_ZERO(&rest),重置rset,这是因为内核检测是否有数据时会更改rset,如果不重置,上一次select返回的结果会对下一次内核判断某一个文件描述符是否有数据产生影响;
  3. 解除select阻塞后需遍历每一个fd。这是因为内核判断是否有数据后会直接返回,但是并没有返回究竟是哪一个文件描述符有数据。

poll

poll的实现和select非常相似,这里采用同样的方法进行分析。

  • 函数原型
    int poll (struct pollfd *fds, unsigned int nfds, int timeout);
    
    struct pollfd {
          int fd;
          short events;  // 用户感兴趣的事件
          short revents; // 系统出发的事件
    };
    
  • 参数介绍
参数名称类型含义
fdspollfd *文件描述符数组,和select不同,fds中
每一个元素不是一个数字,而是pollfd类型
nfdsunsigned int监听的文件描述符数量
timeoutint超时时间,含义和select中timeout类似
  • 代码分析
      for (i=0;i<5;i++) 
      {
        memset(&client, 0, sizeof (client));
        addrlen = sizeof(client);
        pollfds[i].fd = accept(sockfd,(struct sockaddr*)&client, &addrlen);
        pollfds[i].events = POLLIN;
      }
      sleep(1);
      // ######################################################################
      while(1){
    	poll(pollfds, 5, 50000);
    
    	for(i=0;i<5;i++) {
    		if (pollfds[i].revents & POLLIN){
    			pollfds[i].revents = 0;
    			memset(buffer,0,MAXBUF);
    			read(pollfds[i].fd, buffer, MAXBUF);
    			puts(buffer);
    		}
    	}
      }
    

这里同样用"#"分隔成上下两部分,poll的原理和select非常类似,也是将pollfds拷贝到内核空间。不同的是,当内核判断某个pollfd有数据时,会将pollfd.revents置位,然后解除poll的阻塞。因此,for中遍历每一个pollfd先检测该文件描述符是否被置位,如果是说明有数据,则将置位的revents恢复成默认,随后读取该数据。
poll解决了select中rset无法重用的问题,通过自定义的pollfd结构,巧妙地完成了"置位-恢复"的操作;而且,poll中文件描述符不是一个数字而是pollfd结构,可以监听更多的文件描述符,也就突破了select中len(rst)=1024的限制,但是也存在一个问题:

  • poll解除阻塞后依旧需要循环遍历所有的pollfds,然后才知道具体哪一个文件描述符有数据

epoll

epoll在poll的基础上,通过双向链表的结构,解决了poll中存在的问题,接下来看epoll的具体实现

  • 代码实现
      struct epoll_event events[4];
      int epfd = epoll_create(10);
      // ...
      // ...
      for (i=0;i<4;i++) 
      {
        static struct epoll_event ev;
        memset(&client, 0, sizeof (client));
        addrlen = sizeof(client);
        ev.data.fd = accept(sockfd,(struct sockaddr*)&client, &addrlen);
        ev.events = EPOLLIN;
        epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev); 
      }
      // ###################################
      while(1){
      	puts("round again");
      	nfds = epoll_wait(epfd, events, 4, 10000);
    	
    	for(i=0;i<nfds;i++) {
    			memset(buffer,0,MAXBUF);
    			read(events[i].data.fd, buffer, MAXBUF);
    			puts(buffer);
    	}
      }
    

epoll不像select和poll那样只有一个同名的函数,epoll的实现是通过3个主要的函数完成的,对应上述代码中的

int epfd = epoll_create(10); // 这里的参数10没有什么意义,事实上,通过源码可以知道这里只要传入一个正数就OK
epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
nfds = epoll_wait(epfd, events, 4, 10000);

上述3个函数对应epoll的3个步骤(请结合图5)

  1. 生成epoll面板epfd(epoll_create函数的功能,对应图5中的橙黄色部分)
  2. 在 epfd中将文件描述符和事件绑定(epoll_ctl函数的功能,对应图中的fd[x]–>events)
  3. 阻塞并监听文件描述符(epoll_wait函数的功能)
    在这里插入图片描述
图5 epoll的原理

图5还介绍了内核空间部分是如何处理的,当某一个文件描述符有数据到达时,内核空间会将该文件描述符调整到最前面,且nfds+1,对应图中绿色部分,表示fd3和fd2有数据,此时nfds=2,解除epoll_wait函数的阻塞并返回nfds的值。在for进行遍历时,只需要遍历0~nfds即可,也就解决了select和poll中需要遍历整个文件描述符数组的问题。

epoll除了解决select和poll中的问题外,还细化了事件的出发机制

  1. 边缘触发(edge triggered ET)
    对于边缘出发,epoll_wait()只返回一次,即只在该读写事件发生时返回,也就是说如果事件处理函数只读取了该文件描述缓冲区的部分内容时返回,再次调用epoll_wait(),虽然此时该描述符对应缓冲区中还有数据,但epoll_wait()函数不会返回。
  2. 水平触发(level triggered LT)
    对于水平出发,它不管是否有事件反生,只要文件描述符对应的缓冲区中有数据可读写,epoll_wait()就会返回。

Reactor模式

上文用了大量的篇幅介绍I/O多路复用机制,本节将介绍Reactor模式,它是libevent的核心,因此有必要对其进行介绍,有了上文中select、poll、epoll的基础,Reactor模式就比较好理解了。
Reactor释义"反应堆"(中文翻译的名称和它的功能几乎没有什么联系,因此下文还是采用英文名),Reactor模式的是一种基于事件驱动的异步回调机制。一般的函数调用是压栈式调用,函数之间调用需要同步等待,而Reactor模式不同,应用程序提供相应的接口并注册到Reactor上,如果相应的事件发生,Reactor将主动调用应用程序注册的接口(也就是回调函数)。所以,上文中的select、poll、epoll其实都是Reactor模式。

Reactor模式框架

在这里插入图片描述

图6 Reactor模式框架

主要的代码如下:

class Reactor
{
 public:
    int register_handler(Event_Handler *pHandler, int event);
    int remove_handler(Event_Handler *pHeadler, int event);
    void handle_event(timeval *ptv);
}

class Event_Handler
{
public:
    // events maybe read/write/tiomeout .etc
    virtual void handle_events(int events) = 0;
    virtual HANDLE get_handle() = 0;
}

结合代码与图6,下面对Reactor的组件进行介绍

  1. 事件源(图6中的Handle)
    在linux系统中是文件描述符,在window上是socket或Handle,下文统称为"句柄"
  2. 事件多路分发机制(图6中的Event Demultiplexer)
    由操作系统提供的I/O多路分发机制,比如上文所述的select、poll、epoll
  3. 反应器(图6中的Reactor)
    Reactor模式中的事件管理接口,提供注册、注销事件和事件循环,它的声明参见代码中的class Reactor
  4. 事件处理程序(图6中的Event_Handler和Concrete_Event_Handler)
    包括抽象处理程序和具体处理程序,主要是考虑扩展性,它的声明参见class Event_Handler

到此,Reactor就介绍完了,它是libevent的核心,有了上文select等多路复用机制的基础,掌握Reactor模式并不困难。

libevent

讲到这里,本文的主角终于登场,上文所述都是为其做铺垫,因为libevent需要一些基础知识,如果不了解,不利于有效掌握libevent,本文中所引用的libevent版本为1.4.13。

libevent入门案例:定时器的实现

首先我们来看一个比较简单的案例:使用libevent实现一个定时器,只需简短的几行代码即可实现,代码虽短,但是可以了解libevent的大体流程。

  • 代码示例
struct event ev;
struct timeval tv;
void time_cb(int fd, short event, void *args){
    printf("timer wakeupn");
    event_add(&ev, &tv); // reschedule timer
}

int main(){

    // 1. 初始化libevent,相当于初始化Reactor实例
    struct event_base *base = event_init();
    tv.tv_sec = 10; // 10s period
    tv.tv_usec = 0;
    
    // 2. 初始化时间event,设置回调函数和关注的事件
    evtimer_set(&ev, time_cb, NULL);
    
    // 3. 设置event从属的event_base
    event_base_set(base, &ev);
    
    // 4. 正式添加事件
    event_add(&ev, &tv);
    
    // 5. 等待事件就绪,阻塞
    event_base_dispatch(base);
}

打开libevent源码,可以看到#define evtimer_set(ev, cb, arg) event_set(ev, -1, 0, cb, arg),这里对event_set函数进行介绍

void
event_set(struct event *ev, int fd, short events,
	  void (*callback)(int, short, void *), void *arg);

参数意义及取值介绍

参数名称类型含义及取值
evevent *执行要初始化的event对象
fdint文件描述符,如果是信号事件,它就是关注的信号;
定时器由于没有文件描述符,因此fd=-1
cb-函数指针,它是事件的回调函数
argvoid *回调函数需要的参数

给出代码示例以及逻辑介绍后,对于libevent的使用有了大体的认识,当然这里并不需要完全弄懂每一行代码含义,当介绍完libevent的框架及流程再反观这个小案例会有更深刻的认识。

libevent源代码组织结构

libevent源代码分布在同一级目录下,好在文件的命名比较有讲究,因此实际看起来也没有那么吃力,这里简单列举一下代码包含的内容。

  • 头文件(主要是event.h)

    • 事件宏定义
    • 接口函数声明
    • 主要结构体event声明
  • 内部头文件(xxx-internal.h)
    内部数据结构和函数

  • libevent框架(event.c)
    event整体框架的代码实现

  • 对系统I/O多路复用的封装

    • epoll.c
    • devpoll.c
    • select.c
    • kqueue.c
  • 定时事件管理(min-heap.h)
    以时间为key的小根堆

  • 信号管理(signal.c)

    • 对信号的管理
  • 辅助功能函数

    • evutil.h
    • evutil.c 辅助功能函数的实现
  • 时间操作(时间的加减比较等)

  • 日志

    • log.h
    • log.c 日志函数具体实现
  • 缓冲区管理(libevent对缓冲区的封装)

    • evbuffer.c
    • buffer.c
  • 基本数据结构 compat/sys下源文件

    • queue.h
      libevent基本数据结构的实现, 包括链表,双向链表,队列

    • _libevent_time.h

  • 一些用于实践操作的结构体定义/宏定义/函数

  • 实用网络库

    • http
      基于libevent实现的http服务器

    • evdns
      异步dns查询库

libevent的核心:事件event

libevent的核心结构就是event,下面就来一探event究竟。

event结构分析

在这里插入图片描述

图7 event的结构

图7给出了event的结构,这里对其中比较关键的几个结构做进一步介绍,剩下的参数含义比较简单,从命名即可知道含义,有些参数的含义在图中也已经注释了。

  • 双向链表事件(使用双向链表结构保存了event的所有事件)
    • event_next(保存所有已注册的I/O事件)
    • ev_signal_next(保存所有已注册的信号事件)
    • ev_active_next(保存所有已激活的事件,激活的含义就是文件描述符有数据)
  • ev_base(该事件所属的Reactor实例,是一个event_base的指针,关于event_base,后面将详细介绍)
  • ev_events(它是event关注的事件类型,有多种取值,且可以"|“组合,比如"EV_ERAD|EV_PERSIST”,当然,定时器事件不能和I/O事件组合在一起,可以考虑下这是为什么)

事件管理

在这里插入图片描述

图8 libevent事件管理

图8给出了libevent事件管理的结构,左侧红色部分表示I/O事件和signal的结构,它们都是由若干事件组成的双向链表,图中把它们画在一起是因为它们结构相同,但实际上它们是两个双向链表;中间蓝色部分表示时间事件,时间事件在底层是一个以超时时间为key的小根堆,由于libevent需要频繁获取最近的时间事件,因此采用小根堆存储,小根堆在底层是用数组实现的;右边绿色部分表示激活事件的结构,它的底层也是双向链表,但它与I/O事件的双向链表不同,它是根据事件的priority进行排序的,优先级高的在前面,优先被执行。因此,有可能出现优先级比较低的事件一直得不到执行。libevent提供了设置事件优先级的api,如果不设置,会给一个默认的优先级。

事件接口函数

关于事件的接口主要有以下3个,含义也比较好理解,相关参数以及函数的功能参见下面的代码。

// 事件设置接口函数
void event_set(struct event *ev, int fd, short events,
void (*callback)(int,short,void *), void *arg)

// fd:事件ev绑定的文件描述符或信号,如果是超时事件,fd=-1
// events:设置事件的类型 EV_READ|EV_PERSIST, EV_WRITE


int event_base_set(struct event_base *base, struct event *ev)
// 设置event的ev到event_base上
// libevent 有一个全局的event_base指针,
// 使用该函数可以将ev注册到自定义的event_base上


int event_priority_set(struct event *ev, int pri)
// 设置ev的优先级

事件处理框架

上一节对libevent时间处理框架的event结构做了描述,接下来将详细介绍event结构中最重要的结构成员:event_base。

event_base结构

在这里插入图片描述

图9 event_base结构

event_base定义位于event-internal.h下,它的结构如图9所示,这里对于图9中一些比较重要的字段进行介绍。

  • 核心变量
    核心变量包括evsel和evbase。添加事件的调用关系为evsel->add(evbase, ev),evsel只负责调用,将具体的事件ev注册到evbase中。
  • 事件指针
    主要包括activequeues和event_queue,分别保存带有优先级的激活事件和所有注册时间的event指针。activequeues[priority]是一个链表,链表中的每个节点指向一个优先级为priority的激活事件event。
  • 时间管理变量
    包括时间缓存tv_cache和event_tv,下文会继续介绍
  • 其他变量
    主要是各个结构数量的统计变量,比如event_count是注册事件的总和,nactivequeues是激活事件的总量。

这里还要值得一提的是evsel的类型,即struct eventop*,现在介绍一下它的结构体,它的成员是一系列的函数指针,在event-internal.h文件中。

struct eventop {
	const char *name;
	void *(*init)(struct event_base *);
	int (*add)(void *, struct event *);
	int (*del)(void *, struct event *);
	int (*dispatch)(struct event_base *, void *, struct timeval *); // 事件分发
	void (*dealloc)(struct event_base *, void *);
	/* set if we need to reinitialize the event base */
	int need_reinit;
};

上文说过libevent将系统提供的I/O多路复用机制统一封装成了eventop结构;因此eventops[]包含了select、poll、epoll等全局实例对象,libevent所有对select、poll、epoll的封装都是eventop类型,可以参考libevent封装poll的代码:

const struct eventop pollops = {
	"poll",
	poll_init,
	poll_add,
	poll_del,
	poll_dispatch,
	poll_dealloc,
    0
};

接口函数

libevent提供的对于事件的注册、注销以及当事件处于激活状态时调用回到接口的api如下。

int event_add(struct event *ev, const struct timeval *timeout);
int event_del(struct event *ev);
int event_base_loop(struct event_base *base, int loops);
void event_active(struct event *event, int res, short events);
void event_process_active(struct event_base *base);

这里着重介绍event_add函数

int event_add(struct event *ev, const struct timeval *tv){
    // ev:指向要注册的事件;tv:超时时间
    struct event_base *base = ev->base;
    // event_base使用的系统IO策略
    const struct eventop *evsel = base->evsel; 
    void *evbase = base->evbase;
    
    // 新的timer事件,在小根堆上预留一个位置
    if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
		if (min_heap_reserve(&base->timeheap,
			1 + min_heap_size(&base->timeheap)) == -1)
			return (-1);  /* ENOMEM == errno */
	}

	if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
	    !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
		res = evsel->add(evbase, ev);
		if (res != -1) // 如果事件添加成功则插入到eventqueue中
			event_queue_insert(base, ev, EVLIST_INSERTED);
	}
	
	if (res != -1 && tv != NULL) {
		struct timeval now;
		if (ev->ev_flags & EVLIST_TIMEOUT) // EVLIST_TIMEOUT表示该事件已经存在与定时器堆中,则删除旧的
			event_queue_remove(base, ev, EVLIST_TIMEOUT);
			
        gettime(base, &now);
        
        if ((ev->ev_flags & EVLIST_ACTIVE) &&
		    (ev->ev_res & EV_TIMEOUT)) { // 如果事件已经就绪则从激活链表中删除
			event_queue_remove(base, ev, EVLIST_ACTIVE);
		}
	// 插入定时器事件
        event_queue_insert(base, ev, EVLIST_TIMEOUT);
	}
	return (res);
}

在该函数中多次调用了event_queue_insert函数,这个函数的定义如下:

void event_queue_insert(struct event_base *base,
struct event *ev, int queue){
    if (ev->ev_flags & queue) {
		if (queue & EVLIST_ACTIVE)
			return;
	}

	if (~ev->ev_flags & EVLIST_INTERNAL)
		base->event_count++;

	ev->ev_flags |= queue;
	switch (queue) {
	case EVLIST_INSERTED:
		TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
		break;
	case EVLIST_ACTIVE:
		base->event_count_active++;
		TAILQ_INSERT_TAIL(base->activequeues[ev->ev_pri],
		    ev,ev_active_next);
		break;
	case EVLIST_TIMEOUT: {
		min_heap_push(&base->timeheap, ev);
		break;
	}
	default:
		event_errx(1, "%s: unknown queue %x", __func__, queue);
	}
}

event_queue_insert函数就好理解得多,看了源码基本上就能理解。删除事件的逻辑和插入类似,这里就不继续展开,基本思想就是将事件从激活链表、I/O链表以及时间小根堆中删除。

事件循环

事件循环时libevent非常重要的一部分,它的主要职责就是阻塞等待事件到来并调用事件绑定的回到函数。

事件循环逻辑结构

在这里插入图片描述

图10 事件主循环

事件循环实现

int event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase;
struct timeval tv;
struct timeval *tv_p;
int res, done;
// 清空时间缓存
base->tv_cache.tv_sec = 0;
// evsignal_base是全局变量,在处理signal时,用于指名signal所属的event_base实例
if (base->sig.ev_signal_added)
	evsignal_base = base;
done = 0;
while (!done) { // 事件主循环
	// 校正系统时间
	timeout_correct(base, &tv);
	// 根据timer heap中事件的最小超时时间,计算系统I/O demultiplexer的最大等待时间
	tv_p = &tv;
	if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
		timeout_next(base, &tv_p);
	} else {
		// 依然有未处理的就绪时间,就让I/O demultiplexer立即返回,不必等待
			// 下面会提到,在libevent中,低优先级的就绪事件可能不能立即被处理
			evutil_timerclear(&tv);
	}
	// 如果当前没有注册事件,就退出
	if (!event_haveevents(base)) {
		event_debug(("%s: no events registered.", __func__));
		return (1);
	}
	// 更新last wait time,并清空time cache
	gettime(base, &base->event_tv);
	base->tv_cache.tv_sec = 0;
	// 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等;
	// 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中
	res = evsel->dispatch(base, evbase, tv_p);
	if (res == -1)
		return (-1);
	// 将time cache赋值为当前系统时间
	gettime(base, &base->tv_cache);
	// 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中
	timeout_process(base);
	// 调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理
	// 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,
	// 然后处理链表中的所有就绪事件;
	// 因此低优先级的就绪事件可能得不到及时处理;
	if (base->event_count_active) {
		event_process_active(base);
		if (!base->event_count_active && (flags & EVLOOP_ONCE))
			done = 1;
	} else if (flags & EVLOOP_NONBLOCK)
		done = 1;
}
// 循环结束,清空时间缓存
base->tv_cache.tv_sec = 0;
event_debug(("%s: asked to terminate loop.", __func__));
return (0);

libevent函数的一大亮点就是将I/O和timer事件统一起来了,这里就有个疑问,为什么将IO事件的timeout设置为所有定时器事件的最小时间,就能将timer事件完美融合到系统IO机制中?原因是timer事件主要目标是定时器到期后需要触发一个回调,IO机制是已经激活的event才会触发回调函数,所以,只要有一种机制可以将到期的timer放到激活链表中就可以。libevent将IO事件的timeout设置为定时器事件的最小时间,那么当IO事件激活时,从timer堆中取出最小的timer判断是否到期,如果到期将其也放入激活链表中;如果IO时间在timeout内没有激活,则会触发超时,此时也检查最小的timer是否到期,这样就能将timer事件很好地放到激活事件中。也就实现了将I/O和timer事件统一起来。

总结

本文的主要目的是剖析libevent的源码,为了能较好地理解和掌握libevent内部的实现逻辑,本文用了大量的篇幅介绍了I/O多路复用机制的分类以及各种实现,同时也介绍了libevent采用的Reactor模式,之后便是libevent细节深究。当然,libevent的源码绝非文中列举的那样简单,仅靠几个函数便能完成,本文只是挑出几个重要的结构以及接口进行介绍。

作者水平有限,文中出现纰漏在所难免,恳请大家批评指正,不胜感激。同时欢迎大家一起讨论,交流,学习。

参考链接

select,poll,epoll多路复用
Linux IO multiplexing select vs poll vs epoll
I/O multiplexing:the select and poll functions
epoll内核源码详解
libevent源码深度剖析pdf

最后

以上就是傲娇柠檬为你收集整理的libevent深入浅出引子前置知识I/O多路复用的实现及存在的问题Reactor模式libevent总结参考链接的全部内容,希望文章能够帮你解决libevent深入浅出引子前置知识I/O多路复用的实现及存在的问题Reactor模式libevent总结参考链接所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(70)

评论列表共有 0 条评论

立即
投稿
返回
顶部