当前位置:   article > 正文

Libevent库源码介绍及TCP后端服务器_libevent开发服务器

libevent开发服务器

Libevent库介绍

Libevent 是一个轻量级的开源高性能事件通知库,主要有以下几个亮点:事件驱动,高性能;轻量级,专注于网络,源代码相当精炼、易读;跨平台,支持 Windows、 Linux、 *BSD 和 Mac Os;支持多种 I/O 多路复用技术, epoll、 poll、 dev/poll、 select 和 kqueue 等;支持 I/O,定时器和信号等事件;注册事件优先级.

Libevent模型

常用API

1.event_init()   --  event_base_new() 的封装

event_init()函数主要调用event_base_new()函数,返回event_base结构体;我们直接调用event_base_new()函数也是可以的;

  1. struct event_base *
  2. event_init(void)
  3. {
  4. struct event_base *base = event_base_new();
  5. if (base != NULL)
  6. current_base = base;
  7. return (base);
  8. }

2.event_base_new()  初始化event_base

event_base_new()主要就是对结构体event_base进行初始化,设置一些参数;

  1. struct event_base *
  2. event_base_new(void)
  3. {
  4. int i;
  5. struct event_base *base;
  6. //用calloc而不用malloc的原因?
  7. //calloc动态分配完内存后,自动初始化该内存为零
  8. if ((base = calloc(1, sizeof(struct event_base))) == NULL)
  9. event_err(1, "%s: calloc", __func__);
  10. event_sigcb = NULL;
  11. event_gotsig = 0;
  12. detect_monotonic();//设置use_monotonic
  13. gettime(base, &base->event_tv);
  14. //初始化定时事件的小根堆
  15. min_heap_ctor(&base->timeheap);
  16. //初始化事件链表,头 == 尾
  17. TAILQ_INIT(&base->eventqueue);
  18. //初始化信号
  19. base->sig.ev_signal_pair[0] = -1;
  20. base->sig.ev_signal_pair[1] = -1;
  21. //初始化IO多路复用机制
  22. base->evbase = NULL;
  23. //选取以NULL 结尾,初始化
  24. for (i = 0; eventops[i] && !base->evbase; i++) {
  25. base->evsel = eventops[i];
  26. base->evbase = base->evsel->init(base);
  27. }
  28. //如果没有IO多路复用
  29. if (base->evbase == NULL)
  30. event_errx(1, "%s: no event mechanism available", __func__);
  31. if (evutil_getenv("EVENT_SHOW_METHOD"))
  32. event_msgx("libevent using: %s\n",
  33. base->evsel->name);
  34. /* allocate a single active event queue */
  35. //设置优先级
  36. //活跃事件链表中,优先级值越小,越优先
  37. event_base_priority_init(base, 1);
  38. return (base);
  39. }

3.event_add()    将event添加到事件链表上,注册事件

根据时间类型添加到不同的列表中;

1.将event注册到event_base的I/O多路复用要监听的事件链表中
2.将event注册到event_base的已注册事件链表中
3.如果传入了超时时间,则删除旧的超时时间,重新设置,并将event添加到event_base的小根堆中;

  1. int
  2. event_add(struct event *ev, const struct timeval *tv)
  3. {
  4. struct event_base *base = ev->ev_base;
  5. const struct eventop *evsel = base->evsel;
  6. void *evbase = base->evbase;
  7. int res = 0;
  8. event_debug((
  9. "event_add: event: %p, %s%s%scall %p",
  10. ev,
  11. ev->ev_events & EV_READ ? "EV_READ " : " ",
  12. ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
  13. tv ? "EV_TIMEOUT " : " ",
  14. ev->ev_callback));
  15. assert(!(ev->ev_flags & ~EVLIST_ALL));
  16. /*
  17. * prepare for timeout insertion further below, if we get a
  18. * failure on any step, we should not change any state.
  19. */
  20. if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
  21. if (min_heap_reserve(&base->timeheap,
  22. 1 + min_heap_size(&base->timeheap)) == -1)
  23. return (-1); /* ENOMEM == errno */
  24. }
  25. if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
  26. !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
  27. res = evsel->add(evbase, ev);
  28. if (res != -1)
  29. event_queue_insert(base, ev, EVLIST_INSERTED);
  30. }
  31. /*
  32. * we should change the timout state only if the previous event
  33. * addition succeeded.
  34. */
  35. if (res != -1 && tv != NULL) {
  36. struct timeval now;
  37. /*
  38. * we already reserved memory above for the case where we
  39. * are not replacing an exisiting timeout.
  40. */
  41. if (ev->ev_flags & EVLIST_TIMEOUT)
  42. event_queue_remove(base, ev, EVLIST_TIMEOUT);
  43. /* Check if it is active due to a timeout. Rescheduling
  44. * this timeout before the callback can be executed
  45. * removes it from the active list. */
  46. if ((ev->ev_flags & EVLIST_ACTIVE) &&
  47. (ev->ev_res & EV_TIMEOUT)) {
  48. /* See if we are just active executing this
  49. * event in a loop
  50. */
  51. if (ev->ev_ncalls && ev->ev_pncalls) {
  52. /* Abort loop */
  53. *ev->ev_pncalls = 0;
  54. }
  55. event_queue_remove(base, ev, EVLIST_ACTIVE);
  56. }
  57. gettime(base, &now);
  58. evutil_timeradd(&now, tv, &ev->ev_timeout);
  59. event_debug((
  60. "event_add: timeout in %ld seconds, call %p",
  61. tv->tv_sec, ev->ev_callback));
  62. event_queue_insert(base, ev, EVLIST_TIMEOUT);
  63. }
  64. return (res);
  65. }

4.event_base_dispatch()   循环、检测、分发事件

event_base_dispatch仅调用了event_base_loop函数;

  1. int
  2. event_base_dispatch(struct event_base *event_base)
  3. {
  4. return (event_base_loop(event_base, 0));
  5. }

5.event_base_loop()

event_base_loop()主要就是循环、检测、分发事件

1.信号标记被设置,则调用信号的回调函数
2.根据定时器最小时间,设置I/O多路复用的最大等待时间,这样即使没有I/O事件发生,也能在最小定时器超时时返回。
3.调用I/O多路复用,监听事件,将活跃事件添加到活跃事件链表中
4.检查定时事件,将就绪的定时事件从小根堆中删除,插入到活跃事件链表中

libevent的核心就event_base_loop();在这其中检测和分发通过I/O多路复用来完成,比如我们经常使用的poll和epoll,通过epoll.c就可以看到源码;其实原理与我们之前学习到的epoll编程是很类似的,只是多了一部分的处理方式,达到与整合系统互相呼应的效果;

  1. int
  2. event_base_loop(struct event_base *base, int flags)
  3. {
  4. //IO复用方式
  5. const struct eventop *evsel = base->evsel;
  6. void *evbase = base->evbase;
  7. struct timeval tv;
  8. struct timeval *tv_p;
  9. int res, done;
  10. /* clear time cache */
  11. base->tv_cache.tv_sec = 0;
  12. if (base->sig.ev_signal_added)
  13. evsignal_base = base;
  14. done = 0;
  15. while (!done) {
  16. /* Terminate the loop if we have been asked to */
  17. if (base->event_gotterm) {
  18. //设置中止循环
  19. base->event_gotterm = 0;
  20. break;
  21. }
  22. if (base->event_break) {
  23. base->event_break = 0;
  24. break;
  25. }
  26. /* You cannot use this interface for multi-threaded apps */
  27. while (event_gotsig) {
  28. event_gotsig = 0;
  29. if (event_sigcb) {
  30. res = (*event_sigcb)();
  31. if (res == -1) {
  32. errno = EINTR;
  33. return (-1);
  34. }
  35. }
  36. }
  37. timeout_correct(base, &tv);
  38. tv_p = &tv;
  39. if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
  40. timeout_next(base, &tv_p);
  41. } else {
  42. /*
  43. * if we have active events, we just poll new events
  44. * without waiting.
  45. */
  46. evutil_timerclear(&tv);
  47. }
  48. /* If we have no events, we just exit */
  49. if (!event_haveevents(base)) {
  50. event_debug(("%s: no events registered.", __func__));
  51. return (1);
  52. }
  53. /* update last old time */
  54. gettime(base, &base->event_tv);
  55. /* clear time cache */
  56. base->tv_cache.tv_sec = 0;
  57. res = evsel->dispatch(base, evbase, tv_p);
  58. if (res == -1)
  59. return (-1);
  60. gettime(base, &base->tv_cache);
  61. timeout_process(base);
  62. //有就绪事件则调用事件注册的回调函数
  63. if (base->event_count_active) {
  64. event_process_active(base);
  65. if (!base->event_count_active && (flags & EVLOOP_ONCE))
  66. done = 1;
  67. } else if (flags & EVLOOP_NONBLOCK)
  68. done = 1;
  69. }
  70. /* clear time cache */
  71. base->tv_cache.tv_sec = 0;
  72. event_debug(("%s: asked to terminate loop.", __func__));
  73. return (0);
  74. }

TCP服务器模型

1.evconnlistener

基于event和event_base已经可以写一个C/S模型了。但是对于服务器端来说,仍然需要用户自行调用socket、bind、listen、accept等步骤。这个过程有点繁琐,并且一些细节可能考虑不全,为此Libevent推出了一些对应的封装函数,简化了整个监听的流程,用户仅仅需要在对应回调函数里面处理已完成连接的套接字即可。

常用API

evconnlistener_new_bind:
通过evconnlistener_new_bind传递回调函数,在accept成功后,在回调函数里面处理已连接的套接字。evconnlistener其实是对even_base和event的封装。

evconnlistener具体使用可参考:

Libevent之evconnlistener详解_evconnlistener_new_bind_阿卡基YUAN的博客-CSDN博客

evconnlistener有关介绍与使用_Xiezongyi的博客-CSDN博客

2.bufferevent

Bufferevent主要是用来管理和调度IO事件,负责数据的read和write, 因为do_read方法作为一个事件会一直被循环, 当客户端连接断开的时候,do_read方法还是在循环,根本不知道客户端已经断开socket的连接。如果要解决这个问题,我们可能要做大量的工作来维护这些socket的连接状态,读取状态。而Libevent的Bufferevent帮我们解决了这些问题。Bufferevent封装了recv和send函数,并且设置了水位,有两种水位:低水位和高水位。低水位为0是默认值,表示收到了就读了,当设置了低水位(下界)的值,收到了这么多的大小才会去处理,没有到达低水位的字节数的话就一直不处理。高水位的默认值也是0,设置高水位(上界)超过这个值的话就要分批处理。另外, Bufferevent还解决了各种粘包和拆包问题。

  1. struct bufferevent* evbuf=bufferevent_socket_new(connectedbase,fd,BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);
  2.  bufferevent_setcb(evbuf,receive_read_cb,NULL,event_error_cb,NULL);
  3.  bufferevent_setwatermark(evbuf,EV_WRITE,0,SEND_BUFFERCACHESIZE);
  4.  bufferevent_setwatermark(evbuf,EV_READ,0,RECEIVE_BUFFERCACHESIZE);
  5. // 0就是默认值,收到了就读

bufferevent具体使用可参考:

libevent学习——buffevent事件及低水位高水位设置_bufferevent_setwatermark_dxgzg的博客-CSDN博客

基于Libevent实现的TCP服务器的部分代码: 

  1. int TCPServer::initServer()
  2. {
  3. cout<<" init tcp server"<<endl;
  4. int status=0;
  5. // event_enable_debug_mode();
  6. evthread_use_pthreads();//启动线程安全
  7. ebase= event_base_new();//初始化base
  8. if(ebase)
  9. {
  10. cout<<" event init success"<<endl;
  11. memset(&esin,0,sizeof(esin));
  12. esin.sin_family=AF_INET;
  13. esin.sin_addr.s_addr=htonl(0);
  14. esin.sin_port=htons(ServerMemoryCache::getInstance()->configinfor->port);
  15. elistener=evconnlistener_new_bind(ebase,accept_conn_cb,NULL,LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,-1,
  16. (struct sockaddr*)&esin,sizeof(esin));
  17. if(elistener)
  18. {
  19. cout<<" evconnlistener_new_bind success"<<endl;
  20. cout<<" evconnlistener_set_error_cb ..."<<endl;
  21. evconnlistener_set_error_cb(elistener,accept_error_cb);
  22. cout<<" evconnlistener_set_error_cb success"<<endl;
  23. status=0;
  24. }
  25. else
  26. {
  27. cout<<" evconnlistener_new_bind failed"<<endl;
  28. status=2;
  29. puts("event lister 创建失败!");
  30. }
  31. }
  32. else
  33. {
  34. cout<<"event init failed"<<endl;
  35. puts("event 打开失败!");
  36. status=1;
  37. }
  38. return status;
  39. }
  40. void TCPServer::accept_conn_cb(struct evconnlistener *listener,evutil_socket_t fd,
  41. struct sockaddr* address,int socklen,void *ctx)
  42. {
  43. cout<<" receivce connect..."<<endl;
  44. struct event_base* connectedbase=evconnlistener_get_base(listener);
  45. struct bufferevent* evbuf=bufferevent_socket_new(connectedbase,fd,BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);
  46. ClientConnectinfo* coninfo=new ClientConnectinfo();
  47. time_t curtime;
  48. time(&curtime);
  49. bufferevent_setcb(evbuf,receive_read_cb,NULL,event_error_cb,NULL);
  50. bufferevent_setwatermark(evbuf,EV_WRITE,0,SEND_BUFFERCACHESIZE);
  51. bufferevent_setwatermark(evbuf,EV_READ,0,RECEIVE_BUFFERCACHESIZE);
  52. bufferevent_enable(evbuf,EV_WRITE|EV_READ);
  53. struct timeval tv;
  54. tv.tv_sec=BUFFEROVERTIME;
  55. tv.tv_usec=0;
  56. bufferevent_set_timeouts(evbuf,&tv,&tv);
  57. coninfo->lastreceivetime=curtime;
  58. coninfo->clientbuffer=evbuf;
  59. coninfo->fd=fd;
  60. ServerMemoryCache::getInstance()->connectedbuffers.push_back(coninfo);
  61. unique_lock<mutex> lkhaveclient(ServerMemoryCache::getInstance()->ishaveconnectedclientmtx);
  62. lkhaveclient.unlock();
  63. ServerMemoryCache::getInstance()->ishaveconnectedclientcv.notify_one();
  64. cout<<" new connect deal with finish..."<<endl;
  65. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/458495
推荐阅读
  

闽ICP备14008679号