当前位置:   article > 正文

一个muduo的简单实现_muduo服务器的简单实现

muduo服务器的简单实现

前篇大致介绍了一下muduo,这篇探讨一下大致的代码实现。

muduo是基于反应堆reactor,且one loop per thread模式的。网络服务器两大类型的socket是监听socket和连接socket,我们可以在两种各自的线程里面对他们进行操作,每个线程里有事件循环来处理各种io事件。

我们知道reactor模式是基于事件回调的,为了处理好每个事件的回调,我们先构造一个事件结构体。他必包含事件对应的文件描述符fd,读写回调函数以及事件本身相关属性,如事件的类型,发生时间等等。

如何等待事件的到来,在linux上,最高效的网络模型必属epoll。又如何驱动事件呢?我们可以在每个线程里封装一个循环,专门负责事件的获取和分发。

有那么多网络事件,我们如何获取感兴趣的事件呢?例如对于监听socket,我们一般只获取连接事件,对于建立连接的socket,我们读写事件,甚至错误事件都得监听。在reactor模式里,这个过程就叫注册事件。

 

我们可以在创建事件的时候就注册。线程是事先创建好的,也可以利用线程池。注册事件是在某个线程上进行的。对于epoll,注册实际上是调用epoll_ctl函数进行添加。示例代码:

  1. static void event_accept_callback(int listenfd, event *ev, void *arg)
  2. {
  3. listener *ls = (listener *)arg;
  4. inet_address client_addr;
  5. socklen_t clilen = sizeof(client_addr.addr);
  6. int ret;
  7. int connfd;
  8. static int i;
  9. connfd = accept(listenfd, (struct sockaddr *)&client_addr.addr, &clilen);
  10. if (connfd < 0)
  11. {
  12. debug_ret("file: %s, line: %d", __FILE__, __LINE__);
  13. int save = errno;
  14. if (save == EAGAIN || save == ECONNABORTED || save == EINTR
  15. || save == EPROTO || save == EPERM || save == EMFILE)
  16. {
  17. return;
  18. }
  19. else
  20. {
  21. debug_sys("file: %s, line: %d", __FILE__, __LINE__);
  22. }
  23. }
  24. fcntl(connfd, F_SETFL, fcntl(connfd, F_GETFL) | O_NONBLOCK);
  25. int opt = 1;
  26. setsockopt(connfd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
  27. /* epoll是线程安全的 */
  28. if (i == MAX_LOOP)
  29. i = 0;
  30. connection *conn = connection_create(loops[i], connfd, ls->readable_callback);
  31. if (conn == NULL)
  32. {
  33. debug_quit("file: %s, line: %d", __FILE__, __LINE__);
  34. }
  35. i++;
  36. /* 用户回调函数 */
  37. if (ls->new_connection_callback)
  38. ls->new_connection_callback(conn);
  39. }
  40. listener *listener_create(server_manager *manager, inet_address ls_addr,
  41. connection_callback_pt read_cb, connection_callback_pt new_conn_cb)
  42. {
  43. int ret;
  44. listener *ls;
  45. ls = malloc(sizeof(listener));
  46. if (ls == NULL)
  47. {
  48. debug_ret("file: %s, line: %d", __FILE__, __LINE__);
  49. return NULL;
  50. }
  51. ls->listen_addr = ls_addr;
  52. ls->readable_callback = read_cb;
  53. ls->new_connection_callback = new_conn_cb;
  54. /* 创建非阻塞套接字 */
  55. int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
  56. if (listen_fd < 0)
  57. {
  58. debug_ret("file: %s, line: %d", __FILE__, __LINE__);
  59. free(ls);
  60. return NULL;
  61. }
  62. int opt = 1;
  63. setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
  64. ret = bind(listen_fd, (struct sockaddr *)&ls_addr.addr, sizeof(ls_addr.addr));
  65. if (ret < 0)
  66. {
  67. close(listen_fd);
  68. free(ls);
  69. return NULL;
  70. }
  71. ret = listen(listen_fd, SOMAXCONN);
  72. if (ret < 0)
  73. {
  74. debug_ret("file: %s, line: %d", __FILE__, __LINE__);
  75. close(listen_fd);
  76. free(ls);
  77. return NULL;
  78. }
  79. //创建监听事件
  80. ls->ls_event = event_create(listen_fd, EPOLLIN | EPOLLPRI,
  81. event_accept_callback, ls, NULL, NULL);
  82. if (ls->ls_event == NULL)
  83. {
  84. debug_ret("file: %s, line: %d", __FILE__, __LINE__);
  85. close(listen_fd);
  86. free(ls);
  87. return NULL;
  88. }
  89. //注册事件
  90. accept_event_add(manager, ls->ls_event);
  91. event_start(ls->ls_event);
  92. return ls;
  93. }
  1. connection *connection_create(event_loop *loop, int connfd, connection_callback_pt read_cb)
  2. {
  3. connection *conn = malloc(sizeof(connection));
  4. if (conn == NULL)
  5. {
  6. return NULL;
  7. }
  8. conn->fd = connfd;
  9. conn->readable_callback = read_cb;
  10. /* 默认大小参考muduo */
  11. conn->input_buffer = array_create(1024, sizeof(char));
  12. if (conn->input_buffer == NULL)
  13. {
  14. debug_ret("file: %s, line: %d", __FILE__, __LINE__);
  15. free(conn);
  16. return NULL;
  17. }
  18. conn->output_buffer = array_create(1024, sizeof(char));
  19. if (conn->output_buffer == NULL)
  20. {
  21. array_free(conn->input_buffer);
  22. free(conn);
  23. return NULL;
  24. }
  25. //创建连接事件
  26. conn->conn_event = event_create(connfd, EPOLLIN | EPOLLPRI,
  27. event_readable_callback, conn, event_writable_callback, conn);
  28. if (conn->conn_event == NULL)
  29. {
  30. array_free(conn->input_buffer);
  31. array_free(conn->output_buffer);
  32. free(conn);
  33. return NULL;
  34. }
  35. //注册连接事件
  36. io_event_add(loop, conn->conn_event);
  37. /* 跟TCP相关的event才需要对应的connection */
  38. conn->conn_event->conn = conn;
  39. event_start(conn->conn_event);
  40. return conn;
  41. }

这里可以在主线程中循环处理监听socket的事件以及其他诸如定时器等事件,在其他线程里处理连接socket的事件,最终他们都会如下调用:

  1. struct timeval epoller_dispatch(int epoll_fd, int timeout)
  2. {
  3. int i;
  4. event *ev;
  5. struct epoll_event events[MAX_EVENTS];
  6. struct timeval now;
  7. /* 就绪事件在epoll内部排队,当就绪事件大于MAX_EVENTS时
  8. * 剩余就绪事件可在下次epoll_wait时获得
  9. */
  10. int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
  11. gettimeofday(&now, NULL);
  12. //debug_msg("tid = %ld", (long int)gettid());
  13. if (nfds == 0)
  14. {
  15. /* timeout */
  16. }
  17. /* 取出event对象 */
  18. for (i = 0; i < nfds; i++)
  19. {
  20. ev = events[i].data.ptr;
  21. ev->time = now;
  22. ev->actives = events[i].events;
  23. event_handler(ev);
  24. }
  25. return now;
  26. }

 本质上,上面就是reactor模式的典型写法,然后加入了多线程处理。muduo也是上面的思路,只不过是面向对象的思想,把每个实体都抽象为一个对象。例如Acceptor,Connector对象都包含Channel对象,他是一条具体连接的抽象。

 

代码在git上,赏个星星

https://github.com/shonm520/mu_event

 

欢迎加入QQ群 858791125 讨论skynet,游戏后台开发,lua脚本语言等问题。

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/634117
推荐阅读
相关标签
  

闽ICP备14008679号