当前位置:   article > 正文

深入浅出理解libevent——2万字总结_libev 堆

libev 堆

概述

libevent,libev,libuv都是c实现的异步事件库,注册异步事件,检测异步事件,根据事件的触发先后顺序,调用相对应回调函数处理事件。处理的事件包括:网络 io 事件、定时事件以及信号事件。这三个事件驱动着服务器的运行。

  1. 网络io事件:
    linux:epoll、poll、select
    mac:kqueue
    window:iocp
  2. 定时事件:
    红黑树
    最小堆:二叉树、四叉树
    跳表
    时间轮
  3. 信号事件

libevent 和 libev 主要封装了异步事件库与操作系统的交互简单的事件管理接口,让用户无需关注平台检测处理事件的机制的差异,只需关注事件的具体处理。

从设计理念出发,libev 是为了改进 libevent 中的一些架构决策;例如:全局变量的使用使得在多线程环境中很难安全地使用 libevent;event 的数据结构设计太大,它包含了 io、时间以及信号处理全封装在一个结构体中,额外的组件如 http、dns、openssl 等实现质量差(容易产生安全问题),计时器不精确,不能很好地处理时间事件;

libev 通过完全去除全局变量的使用,而是通过回调传参来传递上下文(后面libevent也这样做了);并且根据不同事件类型构建不同的数据结构,以此来减低事件耦合性;计时器使用最小四叉堆。libev 小而高效;只关注事件处理。

libevent 和 libev 对 window 支持比较差,由此产生了 libuv 库;libuv 基于 libev,在window 平台上更好的封装了 iocp;node.js 基于 libuv;

libevent的优点

上来当然要先夸奖啦,Libevent 有几个显著的亮点:
  => 事件驱动(event-driven),高性能;
  => 轻量级,专注于网络,不如ACE那么臃肿庞大;
  => 源代码相当精炼、易读;
  => 跨平台,支持Windows、Linux、*BSD和Mac Os;
  => 支持多种I/O多路复用技术, epoll、poll、dev/poll、select和kqueue等;
  => 支持I/O,定时器和信号等事件;
  => 注册事件优先级;

Libevent已经被广泛的应用,作为底层的网络库;比如memcached、Vomit、Nylon、Netchat等等。

Libevent当前的最新稳定版是1.4.13;这也是本文参照的版本。

工作流程图:
libevent的封装层次

 如果不想自己操作IO事件,那么我们就将IO读写的操作交给libevent进行管理,让其帮我们去处理边界问题。从较高的封装层次去使用libevent,我们只需要在libevent完成读写I/O的处理后自己仅需从缓冲区中读数据来完成事件的逻辑处理,至于边界的问题,我们不需要操心。下面会有更详细的介绍
 

IO事件检测的封装与api介绍


libevent封装了两个层次,一个是事件检测,一个是事件操作。事件检测是低层次的封装,由libevent完成事件的检测,然后调用者自己完成IO操作,类似于将底层的epoll,select,poll的细节隐藏掉。该层次封装了事件管理器的操作和事件本身的操作接口。

事件管理器event_base
构建事件管理器event_base_new

使用libevent 函数之前需要分配一个或者多个 event_base 结构体, 每个event_base结构体持有一个事件集合, 可以检测以确定哪个事件是激活的, event_base结构相当于epoll红黑树的树根节点, 每个event_base都有一种用于检测某种事件已经就绪的 “方法”
 

  1. struct event_base *event_base_new(void);
  2. 函数说明: 获得event_base结构,当于epoll红黑树的树根节点
  3. 参数说明: 无
  4. 返回值:
  5. 成功返回event_base结构体指针;
  6. 失败返回NULL;
释放事件管理器event_base_free
  1. void event_base_free(struct event_base *);
  2. 函数说明: 释放event_base指针
event_reinit
  1. int event_reinit(struct event_base *base);
  2. 函数说明: 如果有子进程, 且子进程也要使用base, 则子进程需要对event_base重新初始化,
  3. 此时需要调用event_reinit函数.
  4. 函数参数: 由event_base_new返回的执行event_base结构的指针
  5. 返回值: 成功返回0, 失败返回-1
  6. 对于不同系统而言, event_base就是调用不同的多路IO接口去判断事件是否已经被激活,
  7. 对于linux系统而言, 核心调用的就是epoll, 同时支持poll和select.
event_get_supported_methods
  1. const char **event_get_supported_methods(void);
  2. 函数说明: 获得当前系统(或者称为平台)支持的方法有哪些
  3. 参数: 无
  4. 返回值: 返回二维数组, 类似与main函数的第二个参数**argv.
event_base_get_method
  1. const char * event_base_get_method(const struct event_base *base);
  2. 函数说明: 获得当前base节点使用的多路io方法
  3. 函数参数: event_base结构的base指针.
  4. 返回值: 获得当前base节点使用的多路io方法的指针
 event_set()
  1. void event_set(struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg)
  2. event_set 初始化事件event,设置回调函数和关注的事件。
  3. 参数说明:
  4. ev:执行要初始化的event对象;
  5. fd:该event绑定的“句柄”,对于信号事件,它就是关注的信号;
  6. events:在该fd上关注的事件类型,它可以是EV_READ, EV_WRITE, EV_SIGNAL;
  7. callback:这是一个函数指针,当fd上的事件event发生时,调用该函数执行处理,它有三个参数,调用时由event_base负责传入,按顺序,实际上就是event_set时的fd, event和arg;
  8. arg:传递给callback函数指针的参数;
  9. 定时事件说明:evtimer_set(&ev, timer_cb, NULL) = event_set(&ev, -1, 0, timer_cb, NULL)
  10. 由于定时事件不需要fd,并且定时事件是根据添加时(event_add)的超时值设定的,因此这里event也不需要设置。
  11. 这一步相当于初始化一个event handler,在libevent中事件类型保存在event结构体中。
  12. 注意:libevent并不会管理event事件集合,这需要应用程序自行管理;

  1. #include <event.h>
  2. #include <stdio.h>
  3. #include <string.h>
  4. int main()
  5. {
  6. const char** p = event_get_supported_methods();
  7. //获取当前系统支持的方法有哪些
  8. int i = 0;
  9. while(p[i] != NULL)
  10. {
  11. printf("[%s] ",p[i]);
  12. }
  13. printf("\n");
  14. struct event_base* base = event_base_new();
  15. if(base == NULL) printf("event_base_new error\n");
  16. printf("[%s]\n",event_base_get_method(base));
  17. event_base_free(base);
  18. return 0;
  19. }
 struct event结构体分析
  1. struct event {
  2. TAILQ_ENTRY (event) ev_next;
  3. TAILQ_ENTRY (event) ev_active_next;
  4. TAILQ_ENTRY (event) ev_signal_next;
  5. unsigned int min_heap_idx; /* for managing timeouts 用于管理超时默认是-1 */
  6. struct event_base *ev_base; //属于哪个一event_base
  7. int ev_fd; //设置事件监听对象,也就是监听句柄
  8. short ev_events; //设置监听对象触发的动作:EV_READ, EV_WRITE, EV_SIGNAL,EV_TIMEOUT,EV_PERSIST
  9. short ev_ncalls; //事件被调用了几次
  10. short *ev_pncalls; /* Allows deletes in callback */
  11. struct timeval ev_timeout;
  12. int ev_pri; /* smaller numbers are higher priority */
  13. //设置事件的回调函数
  14. void (*ev_callback)(int, short, void *arg);
  15. //设置事件的回调函数的参数
  16. void *ev_arg;
  17. int ev_res; /* result passed to event callback */
  18. int ev_flags; //事件的状态,EVLIST_INIT,EVLIST_INTERNAL,EVLIST_ACTIVE,EVLIST_SIGNAL,EVLIST_INSERTED,EVLIST_TIMEOUT
  19. };
事件循环event_base_dispatch和event_base_loop


libevent在event_base_new好之后, 需要等待事件的产生, 也就是等待事件被激活, 所以程序不能退出, 对于epoll来说, 我们需要自己控制循环, 而在libevent中也给我们提供了API接口, 类似where(1)的功能.
 

  1. //这个函数一般不用, 而大多数都调用libevent给我们提供的另外一个API:
  2. int event_base_loop(struct event_base *base, int flags);
  3. 函数说明: 进入循环等待事件
  4. 参数说明:
  5. base: 由event_base_new函数返回的指向event_base结构的指针
  6. flags的取值:
  7. #define EVLOOP_ONCE 0x01
  8. 只触发一次, 如果事件没有被触发, 阻塞等待
  9. #define EVLOOP_NONBLOCK 0x02
  10. 非阻塞方式检测事件是否被触发, 不管事件触发与否, 都会立即返回.
  1. int event_base_dispatch(struct event_base *base);
  2. 函数说明: 进入循环等待事件
  3. 参数说明:由event_base_new函数返回的指向event_base结构的指针
  4. 调用该函数, 相当于没有设置标志位的event_base_loop。程序将会一直运行,
  5. 直到没有需要检测的事件了, 或者被结束循环的API终止。
 事件循环推出event_base_loopbreak和event_base_loopexit
  1. int event_base_loopexit(struct event_base *base, const struct timeval *tv);
  2. int event_base_loopbreak(struct event_base *base);
  3. struct timeval {
  4. long tv_sec;
  5. long tv_usec;
  6. };

两个函数的区别是如果正在执行激活事件的回调函数, 那么event_base_loopexit将在事件回调执行结束后终止循环(如果tv时间非NULL, 那么将等待tv设置的时间后立即结束循环), 而event_base_loopbreak会立即终止循环。

event_process_active

主要是处理激活队列中的数据

  1. static void
  2. event_process_active(struct event_base *base)
  3. {
  4. struct event *ev;
  5. struct event_list *activeq = NULL;
  6. int i;
  7. short ncalls;
  8. 获得就绪链表中有就绪事件并且高优先级的表头
  9. for (i = 0; i < base->nactivequeues; ++i) {
  10. if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
  11. activeq = base->activequeues[i];
  12. break;
  13. }
  14. }
  15. assert(activeq != NULL);
  16. for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
  17. if (ev->ev_events & EV_PERSIST)
  18. event_queue_remove(base, ev, EVLIST_ACTIVE);
  19. else
  20. event_del(ev);//如果不是永久事件则需要进行一系统的删除工作,包括移除注册在事件链表的事件等
  21. /* Allows deletes to work */
  22. ncalls = ev->ev_ncalls;
  23. ev->ev_pncalls = &ncalls;
  24. while (ncalls) {
  25. ncalls--;
  26. ev->ev_ncalls = ncalls;
  27. //根据回调次数调用回调函数
  28. (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
  29. if (event_gotsig || base->event_break)
  30. return;
  31. }
  32. }
  33. }
事件对象
  1. typedef void (*event_callback_fn)(evutil_socket_t fd, short events, void *arg);
  2. struct event *event_new(struct event_base *base, evutil_socket_t fd,
  3. short events, event_callback_fn cb, void *arg);
  4. #define evsignal_new(b, x, cb, arg) event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))
  5. 函数说明: event_new负责创建event结构指针, 同时指定对应的base(epfd), 还有对应的文件描述符
  6. , 事件, 以及回调函数和回调函数的参数。
  7. 参数说明:
  8. base: 对应的根节点--epfd
  9. fd: 要监听的文件描述符
  10. events:要监听的事件
  11. #define EV_TIMEOUT 0x01 //超时事件
  12. #define EV_READ 0x02 //读事件
  13. #define EV_WRITE 0x04 //写事件
  14. #define EV_SIGNAL 0x08 //信号事件
  15. #define EV_PERSIST 0x10 //周期性触发
  16. #define EV_ET 0x20 //边缘触发, 如果底层模型支持设置 则有效, 若不支持则无效.
  17. 若要想设置持续的读事件则: EV_READ | EV_PERSIST
  18. cb: 回调函数, 原型如下:
  19. typedef void (*event_callback_fn)(evutil_socket_t fd, short events, void *arg);
  20. 注意: 回调函数的参数就对应于event_new函数的fd, event和arg
销毁事件对象event_free
  1. void event_free(struct event *ev);
  2. 函数说明: 释放由event_new申请的event节点。
注册事件event_add(类似于epoll_ctl)
  1. int event_add(struct event *ev, const struct timeval *timeout);
  2. 函数说明: 将非未决态事件转为未决态, 相当于调用epoll_ctl函数(EPOLL_CTL_ADD),
  3. 开始监听事件是否产生, 相当于epoll的上树操作.
  4. 参数说明:
  5. ev: 调用event_new创建的事件
  6. timeout: 限时等待事件的产生(定时事件使用), 也可以设置为NULL, 没有限时。
注销事件event_del(类似于epoll的del)
  1. int event_del(struct event *ev);
  2. 函数说明: 将事件从未决态变为非未决态, 相当于epoll的下树(epoll_ctl调用EPOLL_CTL_DEL操作)操作。
  3. 参数说明: ev指的是由event_new创建的事件.
事件驱动event介绍
事件驱动是libevent的核心思想

比较重要,但是学过epoll和reactor的话,学起来还是比较简单

主要几个状态:

无效的指针: 此时仅仅是定义了 struct event *ptr;
非未决:相当于创建了事件, 但是事件还没有处于被监听状态, 类似于我们使用epoll的时候定义了struct epoll_event ev并且对ev的两个字段进行了赋值, 但是此时尚未调用epoll_ctl对事件上树.
未决:就是对事件开始监听, 暂时未有事件产生。相当于调用epoll_ctl对要监听的事件上树, 但是没有事件产生.
激活:代表监听的事件已经产生, 这时需要处理, 相当于调用epoll_wait函数有返回, 当事件被激活以后, libevent会调用该事件对应的回调函数.

只用libevent事件检测,io操作自己来处理demo

像memcached它就是用这种层次(只使用libevent检测,io操作自己写)。我们从下面Demo中看到,使用libevent就像操作reactor一样,只需要传递回调函数,在回调函数里面去写io操作的逻辑。

  1. #include <event.h>
  2. #include <event2/listener.h>
  3. #include <stdio.h>
  4. #include <unistd.h>
  5. #include <string.h>
  6. #include <netinet/in.h>
  7. #include <sys/socket.h>
  8. #include <arpa/inet.h>
  9. //为什么events是short是因为库里定义过了
  10. // #define EV_TIMEOUT 0x01 //超时事件
  11. // #define EV_READ 0x02 //读事件
  12. // #define EV_WRITE 0x04 //写事件
  13. // #define EV_SIGNAL 0x08 //信号事件
  14. // #define EV_PERSIST 0x10 //周期性触发
  15. // #define EV_ET 0x20 //边缘触发, 如果底层模型支持设置
  16. // int event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, event_callback_fn callback, void *arg);
  17. // ev:要进行初始化的事件结构体指针。
  18. // base:事件所关联的事件基础。
  19. // fd:文件描述符或套接字,表示这个事件与哪个描述符相关联。
  20. // events:指定事件类型,比如读取、写入等。可以使用 EV_READ、EV_WRITE 等常量。
  21. // callback:是事件触发时调用的函数。
  22. // arg:是传递给回调函数 callback 的可选参数。
  23. void socket_read_cb(int fd,short events,void* arg);
  24. void socket_accept_cb(int fd,short events,void* arg)
  25. {
  26. sleep(1);
  27. struct sockaddr_in addr;
  28. socklen_t len = sizeof(addr);
  29. int clientfd = accept(fd, (struct sockaddr *) &addr, &len);
  30. evutil_make_socket_nonblocking(clientfd);
  31. //将该fd设置成非阻塞的
  32. printf("client_fd:%d",clientfd);
  33. struct event_base* base = (struct event_base*)arg;//这个是reactor对象
  34. struct event *ev = event_new(NULL, -1, 0, NULL, NULL);
  35. //建立一个event
  36. //注册这个事件,写到base里
  37. event_assign(ev,base,clientfd,EV_READ | EV_PERSIST,socket_read_cb,(void*)ev);//只是写入了数据
  38. //这次的arg我们传的是事件结构体
  39. event_add(ev,NULL);//跟epoll_ctl一样,但是也不是很相同
  40. }
  41. void socket_read_cb(int fd,short events,void* arg)
  42. {
  43. char msg[1024];
  44. struct event* ev = (struct event*)arg;
  45. int len = read(fd,msg,sizeof(msg) - 1);
  46. if (len <= 0) {
  47. printf("client fd:%d disconnect\n", fd);
  48. event_free(ev);
  49. close(fd);
  50. return;
  51. }
  52. msg[len] = '\0';
  53. printf("recv the client msg: %s",msg);
  54. char reply_msg[1024] = "recvieced msg: ";
  55. strcat(reply_msg + strlen(reply_msg), msg);
  56. write(fd, reply_msg, strlen(reply_msg));
  57. }
  58. int main()
  59. {
  60. int listenfd = socket(AF_INET,SOCK_STREAM,0);
  61. if(listenfd == -1) printf("socket error\n");
  62. struct sockaddr_in addr;
  63. memset(&addr, 0, sizeof(addr));
  64. addr.sin_family = AF_INET;
  65. addr.sin_port = htons(8080);
  66. addr.sin_addr.s_addr = htonl(INADDR_ANY);
  67. if(bind(listenfd ,(struct sockaddr*)&addr,sizeof(addr)) == -1)printf("bind error\n");
  68. listen(listenfd ,3);
  69. struct event_base* base = event_base_new();
  70. printf("create base\n");
  71. struct event* ev_listen = event_new(base,listenfd,EV_READ | EV_PERSIST,socket_accept_cb,base);
  72. //返回是struct event*得到这个事件结构体的指针
  73. /*
  74. event_new 等价于
  75. struct event ev_listen;
  76. event_set(&ev_listen, listenfd, EV_READ | EV_PERSIST, socket_accept_cb, base);
  77. event_base_set(base, &ev_listen);
  78. */
  79. event_add(ev_listen,NULL);//相当于epoll_ctl
  80. event_base_dispatch(base);//循环等待事件
  81. return 0;
  82. }

IO事件操作的封装与api介绍(主要是evconnlistener和bufferevent)

 自带buffer的事件-bufferevent


bufferevent实际上也是一个event, 只不过比普通的event高级一些, 它的内部有两个缓冲区, 以及一个文件描述符(网络套接字)。一个网络套接字有读和写两个缓冲区, bufferevent同样也带有两个缓冲区, 还有就是libevent事件驱动的核心回调函数, 那么四个缓冲区以及触发回调的关系如下:

从图中可以得知, 一个bufferevent对应两个缓冲区, 三个回调函数, 分别是写回调, 读回调和事件回调

bufferevent有三个回调函数:

读回调 – 当bufferevent将底层读缓冲区的数据读到自身的读缓冲区时触发读事件回调.
写回调 – 当bufferevent将自身写缓冲的数据写到底层写缓冲区的时候触发写事件回调, 由于数据最终是写入了内核的写缓冲区中, 应用程序已经无法控制, 这个事件对于应用程序来说基本没什么用, 只是通知功能.
事件回调 – 当bufferevent绑定的socket连接, 断开或者异常的时候触发事件回调.
 

构建bufferevent对象 
  1. struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options);
  2. 函数说明: bufferevent_socket_new 对已经存在socket创建bufferevent事件, 可用于
  3. 后面讲到的连接监听器的回调函数中.
  4. 参数说明:
  5. base :对应根节点
  6. fd :文件描述符
  7. options : bufferevent的选项
  8. BEV_OPT_CLOSE_ON_FREE -- 释放bufferevent自动关闭底层接口
  9. (当bufferevent被释放以后, 文件描述符也随之被close)
  10. BEV_OPT_THREADSAFE -- 使bufferevent能够在多线程下是安全的
销毁bufferevent对象
  1. void bufferevent_free(struct bufferevent *bufev);
  2. 函数说明: 释放bufferevent
连接操作bufferevent_socket_connect
  1. int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *serv, int socklen);
  2. 函数说明: 该函数封装了底层的socket与connect接口, 通过调用此函数, 可以将bufferevent事件与通信的socket进行绑定, 参数如下:
  3. bev – 需要提前初始化的bufferevent事件
  4. serv – 对端(一般指服务端)的ip地址, 端口, 协议的结构指针
  5. socklen – 描述serv的长度
  6. 说明: 调用此函数以后, 通信的socket与bufferevent缓冲区做了绑定, 后面调用了bufferevent_setcb函数以后, 会对bufferevent缓冲区的读写操作的事件设置回调函数, 当往缓冲区中写数据的时候会触发写回调函数, 当数据从socket的内核缓冲区读到bufferevent读缓冲区中的时候会触发读回调函数.
设置bufferevent回调与bufferevent_setcb
  1. void bufferevent_setcb(struct bufferevent *bufev,
  2. bufferevent_data_cb readcb,
  3. bufferevent_data_cb writecb,
  4. bufferevent_event_cb eventcb,
  5. void *cbarg
  6. );
  7. 函数说明: bufferevent_setcb用于设置bufferevent的回调函数,
  8. readcb, writecb,eventcb分别对应了读回调, 写回调, 事件回调,
  9. cbarg代表回调函数的参数。
回调函数的原型:
  1. typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
  2. typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short what, void *ctx);
  3. What 代表 对应的事件
  4. BEV_EVENT_EOF--遇到文件结束指示
  5. BEV_EVENT_ERROR--发生错误
  6. BEV_EVENT_TIMEOUT--发生超时
  7. BEV_EVENT_CONNECTED--请求的过程中连接已经完成
写数据到写缓冲区bufferevent_write
  1. int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
  2. int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
  3. bufferevent_write是将data的数据写到bufferevent的写缓冲区,bufferevent_write_buffer
  4. 是将数据写到写缓冲区另外一个写法, 实际上bufferevent的内部的两个缓冲区结构就是struct evbuffer
从读缓冲区读数据bufferevent_read
  1. size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
  2. int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);
  3. bufferevent_read 是将bufferevent的读缓冲区数据读到data中, 同时将读到的数据从
  4. bufferevent的读缓冲清除。
  5. bufferevent_read_buffer 将bufferevent读缓冲数据读到buf中, 接口的另外一种。
注册与注销事件类型bufferevent_enable/disable
  1. int bufferevent_enable(struct bufferevent *bufev, short event);
  2. int bufferevent_disable(struct bufferevent *bufev, short event);
  3. bufferevent_enable与bufferevent_disable是设置事件是否生效, 如果设置为disable,
  4. 事件回调将不会被触发。
获取读写缓冲区bufferevent_get_input和bufferevent_get_oupput
  1. struct evbuffer *bufferevent_get_input(struct bufferevent *bufev)
  2. struct evbuffer *bufferevent_get_output(struct bufferevent *bufev)
  3. 获取bufferevent的读缓冲区和写缓冲区
分割字符读evbuffer_readln与固定长度读evbuffer_remove
  1. char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, enum evbuffer_eol_style eol_style);
  2. int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);
  3. 分割字符读evbuffer_readln
  4. 固定长度读evbuffer_remove
bufferevent总结


对于bufferevent来说, 一个文件描述符, 2个缓冲区, 3个回调函数。文件描述符是用于和客户端进行通信的通信文件描述符, 并不是监听的文件描述符。

2个缓冲区是指: 一个bufferevent包括读缓冲区和写缓冲区。
3个回调函数指: 读回调函数 写回调函数 和事件回调函数
读回调函数的触发时机:

当socket的内核socket读缓冲区中有数据的时候, bufferevent会将内核缓冲区中的数据读到自身的读缓冲区, 会触发bufferevent的读操作, 此时会调用bufferevent的读回调函数.

写回调函数的触发时机:

当往bufferevent的写缓冲区写入数据的时候, bufferevent底层会把缓冲区中的数据写入到内核的socket的写缓冲区中, 此时会触发bufferevent的写回调函数, 最后由内核的驱动程序将数据发送出去.

事件(异常)回调函数的触发时机:

客户端关闭连接或者是被信号终止进程会触发事件回调函数

连接监听器-evconnlistener


链接监听器封装了底层的socket通信相关函数, 比如socket, bind, listen, accept这几个函数。链接监听器创建后实际上相当于调用了socket, bind, listen, 此时等待新的客户端连接到来, 如果有新的客户端连接, 那么内部先进行调用accept处理, 然后调用用户指定的回调函数。可以先看看函数原型, 了解一下它是怎么运作的:
 

构建连接监听器evconnlistener_new_bind
  1. struct evconnlistener *evconnlistener_new_bind(
  2. struct event_base *base,evconnlistener_cb cb,
  3. void *ptr, unsigned flags, int backlog,
  4. const struct sockaddr *sa, int socklen
  5. );
  6. 函数说明:
  7. 是在当前没有套接字的情况下对链接监听器进行初始化, 看最后2个参数实际上就是bind使用的关键参数,
  8. backlog是listen函数的关键参数(略有不同的是, 如果backlog是-1, 那么监听器会自动选择一个合适的值,
  9. 如果填0, 那么监听器会认为listen函数已经被调用过了), ptr是回调函数的参数, cb是有新连接之后的回调函数,
  10. 但是注意这个回调函数触发的时候, 链接器已经处理好新连接了, 并将与新连接通信的描述符交给回调函数。
  11. flags 需要参考几个值:
  12. LEV_OPT_LEAVE_SOCKETS_BLOCKING 文件描述符为阻塞的
  13. LEV_OPT_CLOSE_ON_FREE 关闭时自动释放
  14. LEV_OPT_REUSEABLE 端口复用
  15. LEV_OPT_THREADSAFE 分配锁, 线程安全
  1. struct evconnlistener *evconnlistener_new(
  2. struct event_base *base,
  3. evconnlistener_cb cb, void *ptr,
  4. unsigned flags, int backlog,
  5. evutil_socket_t fd
  6. );

evconnlistener_new函数与前一个函数不同的地方在与后2个参数, 使用本函数时, 认为socket已经初始化好, 并且bind完成, 甚至也可以做完listen, 所以大多数时候, 我们都可以使用第一个函数。

accept的回调函数evconnlistener_cb
typedef void (*evconnlistener_cb)(struct evconnlistener *evl, evutil_socket_t fd, struct sockaddr *cliaddr, int socklen, void *ptr);

回调函数fd参数是与客户端通信的描述符, 并非是等待连接的监听的那个描述符, 所以cliaddr对应的也是新连接的对端地址信息, 已经是accept处理好的。

销毁连接监听器evconnlistener_free
void evconnlistener_free(struct evconnlistener *lev);

使用libevent的事件检测与事件操作demo 

  1. #include <netinet/in.h>
  2. #include <sys/socket.h>
  3. #include <stdio.h>
  4. #include <signal.h>
  5. #include <event.h>
  6. #include <event2/buffer.h>
  7. #include <time.h>
  8. #include <string.h>
  9. #include <stdlib.h>
  10. #include <event2/bufferevent.h>
  11. #include <event2/listener.h>
  12. void socket_read_callback(struct bufferevent* bev,void* arg)
  13. {
  14. //操作读缓冲当中过的数据,通过该函数得到读缓冲的地址
  15. struct evbuffer* evbuf = bufferevent_get_input(bev);
  16. char* msg = evbuffer_readln(evbuf,NULL,EVBUFFER_EOL_LF);
  17. //我们的bufferevent里肯定有读缓冲
  18. // 也可以直接用 bufferevent_read 读数据
  19. // bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
  20. if(!msg) return;
  21. printf("server read the data: %s\n",msg);
  22. char reply[1024] = {0};
  23. sprintf(reply, "recvieced msg: %s\n", msg);//echo
  24. //需要自己释放资源,很重要
  25. free(msg);
  26. bufferevent_write(bev,reply,strlen(reply));
  27. }
  28. void socket_event_callback(struct bufferevent *bev, short events, void *arg)
  29. {
  30. if(events & BEV_EVENT_EOF)
  31. printf("connection closed\n");
  32. else if (events & BEV_EVENT_ERROR)
  33. printf("some other error\n");
  34. else if (events & BEV_EVENT_TIMEOUT)
  35. printf("timeout\n");
  36. bufferevent_free(bev);
  37. }
  38. void listener_callback(struct evconnlistener *listener,evutil_socket_t fd,
  39. struct sockaddr *sock,int socklen, void *arg)
  40. {
  41. char ip[32] = {0};
  42. evutil_inet_ntop(AF_INET,sock,ip,sizeof(ip) - 1);
  43. //该函数的作用是将网络字节序表示的 IPv4 地址转换为可读的字符串格式,并将结果存储在提供的缓冲区中。
  44. //这样可以方便地将 IP 地址以人可读的形式输出,比如在日志中记录连接的来源。
  45. printf("accept a client fd:%d ip:%s\n",fd,ip);
  46. //也就是说,监听到之后,触发回调,然后会自动把新连接的fd传进来吗
  47. //也就是相当于我们前面设置了一个监听套接字的bufferevent,当内核中的socket有链接到来的时候
  48. //也就是内核中的读缓冲有数据的时候,那么就会触发bufferevent回调,写到bufferevent的读缓冲区
  49. //然后把数据传到我们用户层,就不需要我们自己去事件处理了
  50. struct event_base* base = (struct event_base*)arg;//把reactor对象传进来了
  51. //创建一个bufferevent,构建bufferevent对象
  52. struct bufferevent* bev = bufferevent_socket_new(base,fd,BEV_OPT_CLOSE_ON_FREE);
  53. //函数说明: bufferevent_socket_new 对已经存在socket创建bufferevent事件, 可用于
  54. //后面讲到的连接监听器的回调函数中.我这里的fd是accept是用于和客户端交互的fd
  55. //所以这个fd是我们设置到bufferevent中,让bufferevent对象帮我们处理
  56. //选了这个选项会当bufferevent释放后,里面的fd什么的也会自动关闭和释放
  57. // 设置读、写、以及异常时的回调函数
  58. bufferevent_setcb(bev,socket_read_callback,NULL,socket_event_callback,NULL);
  59. bufferevent_enable(bev,EV_READ | EV_PERSIST);//注册事件
  60. }
  61. void stdin_callback(struct bufferevent* bev,void* arg)
  62. {
  63. struct evbuffer* evbuf = bufferevent_get_input(bev);
  64. struct event_base* base = (struct event_base*)arg;//这个就是上下文
  65. char* msg = evbuffer_readln(evbuf,NULL,EVBUFFER_EOL_LF);
  66. if(!msg) return;
  67. if (strcmp(msg, "quit") == 0) {
  68. printf("safe exit!!!\n");
  69. event_base_loopbreak(arg);//中断事件循环
  70. }
  71. printf("stdio read the data: %s\n", msg);
  72. }
  73. void do_timer(int fd,short events,void* arg)
  74. {
  75. struct event *timer = (struct event *) arg;
  76. time_t now = time(NULL);
  77. printf("do_timer %s", (char *) ctime(&now));
  78. }
  79. void do_sig_int(int fd,short events,void* arg)
  80. {
  81. struct event *si = (struct event *)arg;
  82. event_del(si);
  83. printf("do_sig_int SIGINT\n");//CTRL + C
  84. }
  85. int main()
  86. {
  87. struct sockaddr_in sin;
  88. memset(&sin,0,sizeof(sin));
  89. sin.sin_family = AF_INET;
  90. sin.sin_port = htons(8088);
  91. sin.sin_addr.s_addr = htonl(INADDR_ANY);
  92. struct event_base* base = event_base_new();
  93. //链接监听器封装了底层的socket通信相关函数, 比如socket, bind, listen, accept这几个函数。
  94. struct evconnlistener* listener = evconnlistener_new_bind(base,listener_callback,
  95. base,LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,10,(struct sockaddr*)&sin,sizeof(sin));
  96. //: 创建一个监听器对象,该监听器将监听指定的地址和端口,
  97. //并在有新连接时调用 listener_callback 函数进行处理。参数说明如下:
  98. // base: 事件基础结构,将监听器与此关联。
  99. // listener_callback: 当有新连接建立时将调用的回调函数。
  100. // base: 将传递给回调函数的参数,这里是事件基础结构。
  101. // LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE: 一组标志,指定监听器的行为,包括启用地址重用和在释放时关闭底层文件描述符。
  102. // 10: 允许在套接字上排队等待连接的最大数量。
  103. // (struct sockaddr*)&sin: 要监听的地址和端口信息。
  104. // sizeof(sin): 提供地址结构体的大小。
  105. //对stdin的io事件进行处理
  106. //stdin的文件描述符是0
  107. struct bufferevent* ioev = bufferevent_socket_new(base,0,BEV_OPT_CLOSE_ON_FREE);
  108. bufferevent_setcb(ioev,stdin_callback,NULL,NULL,base);//设置读写事件回调
  109. bufferevent_enable(ioev,EV_READ | EV_PERSIST);//设置事件属性并开启
  110. //定时事件
  111. struct event evtimer;
  112. struct timeval tv = {1,0};
  113. event_set(&evtimer,-1,EV_PERSIST,do_timer,&evtimer);//初始化
  114. // &evtimer: 是一个指向 struct event 结构体的指针,表示你要设置的事件对象。
  115. // -1: 是事件的文件描述符,定时器事件不需要一个真实的文件描述符,因此通常设置为 -1。
  116. // EV_PERSIST: 是事件的标志,表示这是一个持久性事件,即它会在每次触发后自动重新添加到事件循环中,使得它可以周期性地触发。
  117. // do_timer: 是事件触发时执行的回调函数。
  118. // &evtimer: 是传递给回调函数的用户数据。
  119. event_base_set(base,&evtimer);
  120. //设置event从属的event_base,这一步相当于指明event要注册到哪个event_base实例上。
  121. event_add(&evtimer,&tv);
  122. //信号事件
  123. struct event ev_sig_int;
  124. event_set(&ev_sig_int,-1,EV_PERSIST,do_sig_int,&ev_sig_int);
  125. event_base_set(base,&ev_sig_int);
  126. event_add(&ev_sig_int,NULL);
  127. //开启事件主循环
  128. event_base_dispatch(base);
  129. /* 结束释放资源 */
  130. evconnlistener_free(listener);
  131. event_base_free(base);
  132. return 0;
  133. }

libevent事件原理剖析 

信号事件剖析(这个我们先不看了)

定时事件和网络事件剖析

Timer小根堆


libevent定时器的机制是最小堆+epoll_wait的机制,event_base_dispatch内部调用的是event_base_loop,我们进入主循环看看,发现它先是去最小堆找timeout参数,然后执行epoll_wait。之后再将所有的超时任务取出timeout_process放到就绪队列,我们发现现在网络事件和定时事件都被加入到就绪队列中了,然后按照优先级进行处理,调用对应的回调函数。

  1. while (!done) {
  2. ......
  3. tv_p = &tv;
  4. if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
  5. timeout_next(base, &tv_p); // 返回的 tv_p 即是 最小堆实现的定时器中第一个事件的剩余等待时间
  6. }
  7. ......
  8. clear_time_cache(base);
  9. res = evsel->dispatch(base, tv_p); // 以tv_p作为 epoll_wait 的超时时间。这里相当于epoll_wait(),收集网络事件
  10. ......
  11. update_time_cache(base); // 更新 time_cache,time_cache的作用在于不必每次都从系统调用获取时间值
  12. ......
  13. timeout_process(base); // 将所有已超时的任务从最小堆中取出,插入到就绪队列(有优先级)收集定时事件
  14. if (N_ACTIVE_CALLBACKS(base)) {
  15. int n = event_process_active(base); // 处理这些就绪的任务,调用其回调函数
  16. ......
  17. }
  18. }
  19. /* Activate every event whose timeout has elapsed. */
  20. static void timeout_process(struct event_base *base)
  21. {
  22. /* Caller must hold lock. */
  23. struct timeval now;
  24. struct event *ev;
  25. if (min_heap_empty_(&base->timeheap)) {
  26. return;
  27. }
  28. gettime(base, &now);
  29. while ((ev = min_heap_top_(&base->timeheap))) {
  30. if (evutil_timercmp(&ev->ev_timeout, &now, >)) // 从堆中取出所有 ev_timeout 已达到 now 的事件
  31. break;
  32. /* delete this event from the I/O queues */
  33. event_del_nolock_(ev, EVENT_DEL_NOBLOCK); // 从所在的 event_base 中删除该事件
  34. event_active_nolock_(ev, EV_TIMEOUT, 1); // 激活该事件,即 插入到就绪队列
  35. }
  36. }

 event_active_nolock_()底层将调用event_queue_insert_active()将事件插入到event_base下的就绪队列activequeues中,这个就绪队列实际上是有nactivequeues个元素的队列数组,数组下标越小的队列优先级越高,每次我们新建一个event时默认的优先级ev_pri都是nactivequeues / 2(by default, we put new events into the middle priority),而注册事件到event_base前可以通过该函数来手动设置优先级:
 

/* Set's the priority of an event - if an event is already scheduled
 * changing the priority is going to fail. */
int event_priority_set(struct event *ev, int pri)
 

读写缓冲区evbuffer的实现(重点理解)


我们在读写网络IO的时候,我们是不能确保一次读取,就是一个完整的数据包。比如我们写入size,但是实际写入n<size,数据没有全部写出去,那剩下的数据怎么办呢?我们需要缓存起来等待下次写数据触发,读数据同理。所以因为这个原因,我们需要设置缓冲区来解决这个问题。常用的解决方案有三种

fix buffer :char rbuf[16 * 1024 * 1024];char wbuf[16 * 1024 * 1024] ,但是这样会造成两个新的问题,1. 存在空间浪费 2. 数据移动频繁
ringbuffer:环形缓冲区,解决了数据移动频繁的问题,但是数据空间浪费的问题没有解决
libevent中的evbuffer。下面开始介绍evbuffer。
evbuffer 是 libevent 底层实现的一种链式缓冲区,当我们使用bufferevent来管理事件时,就会从每个事件的 evbuffer 中读写数据。每个 evbuffer 实质是一个缓冲区链表,其中的每个元素为 struct evbuffer_chain。一个struct evbuffer中的关键成员定义如下:
 

  1. struct evbuffer {
  2. /** The first chain in this buffer's linked list of chains. */
  3. struct evbuffer_chain *first;
  4. /** The last chain in this buffer's linked list of chains. */
  5. struct evbuffer_chain *last;
  6. /** Pointer to the next pointer pointing at the 'last_with_data' chain. */
  7. struct evbuffer_chain **last_with_datap; // 指针指向最后一个可写的 chain
  8. /** Total amount of bytes stored in all chains.*/
  9. size_t total_len;
  10. ...... // 以上为关键成员
  11. }

每个evbuffer_chain的定义又如下所示:

  1. /** A single item in an evbuffer. */
  2. struct evbuffer_chain {
  3. /** points to next buffer in the chain */
  4. struct evbuffer_chain *next; // 指向下一个 evbuffer_chain
  5. /** total allocation available in the buffer field. */
  6. size_t buffer_len; // buffer 的长度
  7. /** unused space at the beginning of buffer or an offset into a file for sendfile buffers. */
  8. ev_misalign_t misalign; // 实际数据在 buffer 中的偏移
  9. /** Offset into buffer + misalign at which to start writing.
  10. * In other words, the total number of bytes actually stored in buffer. */
  11. size_t off; // buffer 中有效数据的末尾,接下来的数据从这个位置开始填入(该位置即 buffer + misalign + off)
  12. /** number of references to this chain */
  13. int refcnt; // 这个 buffer的引用计数
  14. /** Usually points to the read-write memory belonging to this buffer allocated as part of the evbuffer_chain allocation.
  15. * For mmap, this can be a read-only buffer and EVBUFFER_IMMUTABLE will be set in flags. For sendfile, it may point to NULL. */
  16. unsigned char *buffer; // 指向实际数据存储的位置,这是真正的 buffer
  17. };

misaligin是什么意思呢?是已经被读取的数据,下一段有效数据是从【buffer+misaligin , buffer+misaligin +off】这一段off的长,是我们待取的有效数据。而【buffer,buffer+misaligin 】这一段是之前就已经被读取过了,所以这里是失效的数据。所以misaligin 就解决了数据移动频繁的问题。而我们的evbuffer_chain是链表形式,所以又解决了数据空间浪费的问题。所以说evbuffer的设计是非常巧妙的。

bufferevent_write
当我们调用bufferevent_write往写缓冲区写数据时,实际上是调用了evbuffer_add,在写入后libevent自动帮我们写到内核缓冲区,之后会触发写回调函数。

若这个evbuffer中没有一个 chain 可以写入数据,则需要根据写入的数据大小新申请一个 chain 挂到链表末尾,然后往这个chain中写数据,所以每个 chain 的 buffer 大小是不定的。还有更多细节内容我写到注释里面了,读者自行阅读。

  1. int evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) {
  2. //...
  3. //如果大于限定的容量
  4. if (datlen > EV_SIZE_MAX - buf->total_len) {
  5. goto done;
  6. }
  7. //使chain指向之后一个链表
  8. if (*buf->last_with_datap == NULL) {
  9. chain = buf->last;
  10. }
  11. else {
  12. chain = *buf->last_with_datap;
  13. }
  14. //...
  15. //如果没有chain,那么创建一个datlen大小的返回即可
  16. if (chain == NULL) {
  17. chain = evbuffer_chain_insert_new(buf, datlen);
  18. if (!chain)
  19. goto done;
  20. }
  21. if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
  22. //...
  23. //remain为当前可用剩余空间还有多少
  24. remain = chain->buffer_len - (size_t) chain->misalign - chain->off;
  25. //如果剩余空间大于需求,那么直接分配即可
  26. if (remain >= datlen) {
  27. /* there's enough space to hold all the data in the
  28. * current last chain */
  29. memcpy(chain->buffer + chain->misalign + chain->off,
  30. data, datlen);
  31. chain->off += datlen;
  32. buf->total_len += datlen;
  33. buf->n_add_for_cb += datlen;
  34. goto out;
  35. }
  36. //否则看一看剩余空间+misalign是否大于需求,大于则移动off数据
  37. else if (!CHAIN_PINNED(chain) &&
  38. //里面涉及别的一些细节,这里不展开
  39. evbuffer_chain_should_realign(chain, datlen)) {
  40. /* we can fit the data into the misalignment */
  41. evbuffer_chain_align(chain);
  42. memcpy(chain->buffer + chain->off, data, datlen);
  43. chain->off += datlen;
  44. buf->total_len += datlen;
  45. buf->n_add_for_cb += datlen;
  46. goto out;
  47. }
  48. }
  49. else {
  50. /* we cannot write any data to the last chain */
  51. remain = 0;
  52. }
  53. //走到这里代表一个chain不能满足datlen,那么预分配一个tmp chain
  54. /* we need to add another chain */
  55. to_alloc = chain->buffer_len;
  56. if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE / 2)
  57. to_alloc <<= 1;
  58. if (datlen > to_alloc)
  59. to_alloc = datlen;
  60. tmp = evbuffer_chain_new_membuf(to_alloc);
  61. if (tmp == NULL)
  62. goto done;
  63. //把当前chain给分配完
  64. if (remain) {
  65. memcpy(chain->buffer + chain->misalign + chain->off,
  66. data, remain);
  67. chain->off += remain;
  68. buf->total_len += remain;
  69. buf->n_add_for_cb += remain;
  70. }
  71. //还需要多少大小从新的tmp里面分配
  72. data += remain;
  73. datlen -= remain;
  74. memcpy(tmp->buffer, data, datlen);
  75. tmp->off = datlen;
  76. evbuffer_chain_insert(buf, tmp);
  77. buf->n_add_for_cb += datlen;
  78. out:
  79. evbuffer_invoke_callbacks_(buf);
  80. result = 0;
  81. done:
  82. EVBUFFER_UNLOCK(buf);
  83. return result;
  84. }
bufferevent_read

bufferevent_read()底层调用evbuffer_remove这代表它按照指定长度去读,其又调用了evbuffer_copyout_from,具体细节就不展开了,我们知道了怎么写,那么怎么读我们也就知道了。

  1. /* Reads data from an event buffer and drains the bytes read */
  2. int evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
  3. {
  4. ev_ssize_t n;
  5. EVBUFFER_LOCK(buf);
  6. n = evbuffer_copyout_from(buf, NULL, data_out, datlen); // 拷贝数据
  7. if (n > 0) {
  8. if (evbuffer_drain(buf, n)<0) // drain 就是丢弃已读走的数据,即 调整当前 chain 的 misalign 或 直接释放数据已全部读走的 chain
  9. n = -1;
  10. }
  11. EVBUFFER_UNLOCK(buf);
  12. return (int)n;
  13. }
evbuffer的缺点


上面我们说了evbuffer的优点,那么evbuffer的缺点呢?其实也很明显,即我们的数据是存储在不连续的内存上面(例如我们读20B,结果着20B分别在两个chain里面),内存不连续会带来多次io,我们可能需要多次io才能把数据读完整。对于内存不连续的问题,Linux内核提供了一个接口,readv和writev,解决内存不连续的读写问题

readv:将读缓冲区的数据读到不连续的内存中
writev:将不连续的内存数据写到写缓冲区
 

  1. man 2 readv
  2. # 第二个参数是数组,第三个参数是数组的长度
  3. ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
  4. ssize_t writev(int fd, const struct iovec *iov, int iovcnt);
  5. struct iovec {
  6. void *iov_base; /* Starting address 起始地址*/
  7. size_t iov_len; /* Number of bytes to transfer 长度*/
  8. };

 图片摘自零声教育

因为我们的内核缓冲区是连续的,而我们的libevent的bufferevent的缓冲区采用的是链式缓冲区

 当面我们数据大的时候,可以放好几个chain的时候,如果我们还是采用write和read的话那么势必会导致内核切换过多,因为要分配好几次到chain上面

解决方法:

采用readv和writev,将bufferevent的所有的chain地址传进去,然后在每个chain上进行分配,

那么此时我们就只需要一次用户态和内核态的切换,大大提高了效率

解决了网络编程中那些痛点?

高效的网络缓冲区

io函数使用与网络原理,

多线程环境下,buffer加锁时,读要读出一个完整的包,写也是一样​​​​​​​

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/742792
推荐阅读
相关标签
  

闽ICP备14008679号