赞
踩
前篇大致介绍了一下muduo,这篇探讨一下大致的代码实现。
muduo是基于反应堆reactor,且one loop per thread模式的。网络服务器两大类型的socket是监听socket和连接socket,我们可以在两种各自的线程里面对他们进行操作,每个线程里有事件循环来处理各种io事件。
我们知道reactor模式是基于事件回调的,为了处理好每个事件的回调,我们先构造一个事件结构体。他必包含事件对应的文件描述符fd,读写回调函数以及事件本身相关属性,如事件的类型,发生时间等等。
如何等待事件的到来,在linux上,最高效的网络模型必属epoll。又如何驱动事件呢?我们可以在每个线程里封装一个循环,专门负责事件的获取和分发。
有那么多网络事件,我们如何获取感兴趣的事件呢?例如对于监听socket,我们一般只获取连接事件,对于建立连接的socket,我们读写事件,甚至错误事件都得监听。在reactor模式里,这个过程就叫注册事件。
我们可以在创建事件的时候就注册。线程是事先创建好的,也可以利用线程池。注册事件是在某个线程上进行的。对于epoll,注册实际上是调用epoll_ctl函数进行添加。示例代码:
- static void event_accept_callback(int listenfd, event *ev, void *arg)
- {
- listener *ls = (listener *)arg;
- inet_address client_addr;
- socklen_t clilen = sizeof(client_addr.addr);
- int ret;
- int connfd;
- static int i;
-
- connfd = accept(listenfd, (struct sockaddr *)&client_addr.addr, &clilen);
- if (connfd < 0)
- {
- debug_ret("file: %s, line: %d", __FILE__, __LINE__);
-
- int save = errno;
- if (save == EAGAIN || save == ECONNABORTED || save == EINTR
- || save == EPROTO || save == EPERM || save == EMFILE)
- {
- return;
- }
- else
- {
- debug_sys("file: %s, line: %d", __FILE__, __LINE__);
- }
- }
-
- fcntl(connfd, F_SETFL, fcntl(connfd, F_GETFL) | O_NONBLOCK);
-
- int opt = 1;
- setsockopt(connfd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
-
- /* epoll是线程安全的 */
- if (i == MAX_LOOP)
- i = 0;
-
- connection *conn = connection_create(loops[i], connfd, ls->readable_callback);
- if (conn == NULL)
- {
- debug_quit("file: %s, line: %d", __FILE__, __LINE__);
- }
-
- i++;
-
- /* 用户回调函数 */
- if (ls->new_connection_callback)
- ls->new_connection_callback(conn);
-
- }
-
- listener *listener_create(server_manager *manager, inet_address ls_addr,
- connection_callback_pt read_cb, connection_callback_pt new_conn_cb)
- {
- int ret;
- listener *ls;
-
- ls = malloc(sizeof(listener));
- if (ls == NULL)
- {
- debug_ret("file: %s, line: %d", __FILE__, __LINE__);
- return NULL;
- }
-
- ls->listen_addr = ls_addr;
- ls->readable_callback = read_cb;
- ls->new_connection_callback = new_conn_cb;
-
- /* 创建非阻塞套接字 */
- int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
- if (listen_fd < 0)
- {
- debug_ret("file: %s, line: %d", __FILE__, __LINE__);
- free(ls);
- return NULL;
- }
-
- int opt = 1;
- setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
-
- ret = bind(listen_fd, (struct sockaddr *)&ls_addr.addr, sizeof(ls_addr.addr));
- if (ret < 0)
- {
- close(listen_fd);
- free(ls);
- return NULL;
- }
-
- ret = listen(listen_fd, SOMAXCONN);
- if (ret < 0)
- {
- debug_ret("file: %s, line: %d", __FILE__, __LINE__);
- close(listen_fd);
- free(ls);
- return NULL;
- }
-
- //创建监听事件
- ls->ls_event = event_create(listen_fd, EPOLLIN | EPOLLPRI,
- event_accept_callback, ls, NULL, NULL);
- if (ls->ls_event == NULL)
- {
- debug_ret("file: %s, line: %d", __FILE__, __LINE__);
- close(listen_fd);
- free(ls);
- return NULL;
- }
-
-
- //注册事件
- accept_event_add(manager, ls->ls_event);
- event_start(ls->ls_event);
-
- return ls;
- }
-
-
- connection *connection_create(event_loop *loop, int connfd, connection_callback_pt read_cb)
- {
- connection *conn = malloc(sizeof(connection));
- if (conn == NULL)
- {
- return NULL;
- }
-
- conn->fd = connfd;
- conn->readable_callback = read_cb;
-
- /* 默认大小参考muduo */
- conn->input_buffer = array_create(1024, sizeof(char));
- if (conn->input_buffer == NULL)
- {
- debug_ret("file: %s, line: %d", __FILE__, __LINE__);
- free(conn);
- return NULL;
- }
-
- conn->output_buffer = array_create(1024, sizeof(char));
- if (conn->output_buffer == NULL)
- {
- array_free(conn->input_buffer);
- free(conn);
- return NULL;
- }
-
- //创建连接事件
- conn->conn_event = event_create(connfd, EPOLLIN | EPOLLPRI,
- event_readable_callback, conn, event_writable_callback, conn);
- if (conn->conn_event == NULL)
- {
- array_free(conn->input_buffer);
- array_free(conn->output_buffer);
- free(conn);
- return NULL;
- }
-
- //注册连接事件
- io_event_add(loop, conn->conn_event);
-
- /* 跟TCP相关的event才需要对应的connection */
- conn->conn_event->conn = conn;
- event_start(conn->conn_event);
-
-
- return conn;
- }
这里可以在主线程中循环处理监听socket的事件以及其他诸如定时器等事件,在其他线程里处理连接socket的事件,最终他们都会如下调用:
- struct timeval epoller_dispatch(int epoll_fd, int timeout)
- {
- int i;
- event *ev;
- struct epoll_event events[MAX_EVENTS];
- struct timeval now;
-
- /* 就绪事件在epoll内部排队,当就绪事件大于MAX_EVENTS时
- * 剩余就绪事件可在下次epoll_wait时获得
- */
- int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
-
- gettimeofday(&now, NULL);
- //debug_msg("tid = %ld", (long int)gettid());
-
- if (nfds == 0)
- {
- /* timeout */
- }
-
- /* 取出event对象 */
- for (i = 0; i < nfds; i++)
- {
- ev = events[i].data.ptr;
- ev->time = now;
- ev->actives = events[i].events;
- event_handler(ev);
- }
-
- return now;
- }
本质上,上面就是reactor模式的典型写法,然后加入了多线程处理。muduo也是上面的思路,只不过是面向对象的思想,把每个实体都抽象为一个对象。例如Acceptor,Connector对象都包含Channel对象,他是一条具体连接的抽象。
代码在git上,赏个星星
https://github.com/shonm520/mu_event
欢迎加入QQ群 858791125 讨论skynet,游戏后台开发,lua脚本语言等问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。