赞
踩
先复习下 epoll 的用法。
如下的代码中,先用 epoll_create 创建一个 epoll 文件描述符 epfd,再通过 epoll_ctl 将需要监听的 socket 添加到 epfd 中,最后调用 epoll_wait 等待数据。
int s = socket(AF_INET, SOCK_STREAM, 0); bind(s, ...); listen(s, ...) int epfd = epoll_create(...); epoll_ctl(epfd, ...); // 将所有需要监听的 socket 添加到 epfd 中。 #define MAX_EVENTS 10 struct epoll_event events[MAX_EVENTS]; while(1) { int n = epoll_wait(epfd, events, ...); // 返回就绪的 socket 数量。 for(int i = 0; i < n; ++i) { if (events[n].data.fd == listen_sock) { // 处理 } } }
创建 socket 时,操作系统会创建一个由文件系统管理的 socket 对象。这个 socket 对象包含了发送缓冲区、接收缓冲区、等待队列等成员。等待队列是个非常重要的结构,它指向所有需要等待该 socket 事件的进程。
要使用 epoll 首先需要调用 epoll_create() 函数创建一个 epoll 的文件描述符,函数原型如下:
int epoll_create(int size);
参数 size 是由于历史原因遗留下来的,自 Linux 2.6.8 以来,已不起作用,但必须大于零。
当用户调用 epoll_create() 函数时,会进入到内核空间,并且调用 do_epoll_create() 内核函数来创建 epoll 句柄,do_epoll_create() 函数代码如下:
/* * Open an eventpoll file descriptor. */ static int do_epoll_create(int flags) { int error, fd; struct eventpoll *ep = NULL; struct file *file; /* Check the EPOLL_* constant for consistency. */ BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC); if (flags & ~EPOLL_CLOEXEC) return -EINVAL; /* * Create the internal data structure ("struct eventpoll"). */ error = ep_alloc(&ep); if (error < 0) return error; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure and a free file descriptor. */ fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC)); if (fd < 0) { error = fd; goto out_free_ep; } file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC)); if (IS_ERR(file)) { error = PTR_ERR(file); goto out_free_fd; } ep->file = file; fd_install(fd, file); return fd; out_free_fd: put_unused_fd(fd); out_free_ep: ep_free(ep); return error; }
sys_epoll_create() 主要做两件事情:
从 do_epoll_create() 源码可以看出,epoll 对象实际上是一个 eventpoll,定义如下:
struct eventpoll { /* Protect the access to this structure */ spinlock_t lock; /* * This mutex is used to ensure that files are not removed * while epoll is using them. This is held during the event * collection loop, the file cleanup path, the epoll file exit * code and the ctl operations. */ struct mutex mtx; /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq; /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait; /* List of ready file descriptors */ struct list_head rdllist; /* RB tree root used to store monitored fd structs */ struct rb_root rbr; /* * This is a single linked list that chains all the "struct epitem" that * happened while transferring ready events to userspace w/out * holding ->lock. */ struct epitem *ovflist; /* wakeup_source used when ep_scan_ready_list is running */ struct wakeup_source *ws; /* The user that created the eventpoll descriptor */ struct user_struct *user; struct file *file; /* used to optimize loop detection check */ int visited; struct list_head visited_list_link; }
其中有几个成员需要重点关注:
(1)被监听的 Socket 列表(rbr )。
通过 epoll_ctl(2) 将监控的 socket 添加到以红黑树保存的列表中。红黑树结点键是文件描述符,而值是与文件描述符相关的信息 epitem。
(2)等待队列(wq)。
和 socket 一样,它也会有等待队列,当调用 epoll_wait(epfd, ...)
时会把进程添加到 eventpoll 对象的 wq 等待队列中。
(3)就绪 socket 列表(rdllist)。
保存已经就绪的 socket 文件描述符列表。当程序执行到 epoll_wait 时,如果 rdlist 已经存在就绪的 socket,那么 epoll_wait 直接返回,如果 rdlist 为空,阻塞进程。
下图展示了 eventpoll 对象与被监听的文件关系:
由于被监听的文件是通过 epitem 对象来管理的,所以上图中的节点都是以 epitem 对象的形式存在的。为什么要使用红黑树来管理被监听的文件呢?这是为了能够通过文件描述符快速查找到其对应的 epitem 对象。
/* * Each file descriptor added to the eventpoll interface will * have an entry of this type linked to the "rbr" RB tree. * Avoid increasing the size of this struct, there can be many thousands * of these on a server and we do not want this to take another cache line. */ struct epitem { union { /* RB tree node links this structure to the eventpoll RB tree */ struct rb_node rbn; /* Used to free the struct epitem */ struct rcu_head rcu; }; /* List header used to link this structure to the eventpoll ready list */ struct list_head rdllink; /* * Works together "struct eventpoll"->ovflist in keeping the * single linked chain of items. */ struct epitem *next; /* The file descriptor information this item refers to */ struct epoll_filefd ffd; /* * Protected by file->f_lock, true for to-be-released epitem already * removed from the "struct file" items list; together with * eventpoll->refcount orchestrates "struct eventpoll" disposal */ bool dying; /* List containing poll wait queues */ struct eppoll_entry *pwqlist; /* The "container" of this item */ struct eventpoll *ep; /* List header used to link this item to the "struct file" items list */ struct hlist_node fllink; /* wakeup_source used when EPOLLWAKEUP is set */ struct wakeup_source __rcu *ws; /* The structure that describe the interested events and the source fd */ struct epoll_event event; };
epitem 用于表示 epoll 中的每个被监视的文件描述符的信息。以下是一些重要的字段:
前面介绍了怎么创建 epoll,接下来介绍一下怎么向 epoll 添加要监听的 socket。
通过调用 epoll_ctl() 函数可以向 epoll 添加要监听的文件,其原型如下:
long epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
下面说明一下各个参数的作用:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events 可以是以下几个宏的集合:
data 用来保存用户自定义数据。
epoll_ctl() 函数会调用 do_epoll_ctl() 内核函数,do_epoll_ctl() 的实现如下:
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds, bool nonblock) { ... f = fdget(epfd); if (!f.file) goto error_return; /* Get the "struct file *" for the target file */ tf = fdget(fd); if (!tf.file) goto error_fput; ... /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ ep = f.file->private_data; error = epoll_mutex_lock(&ep->mtx, 0, nonblock); if (error) goto error_tgt_fput; ... /* * Try to lookup the file inside our RB tree. Since we grabbed "mtx" * above, we can be sure to be able to use the item looked up by * ep_find() till we release the mutex. */ epi = ep_find(ep, tf.file, fd); error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds->events |= EPOLLERR | EPOLLHUP; error = ep_insert(ep, epds, tf.file, fd, full_check); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { if (!(epi->event.events & EPOLLEXCLUSIVE)) { epds->events |= EPOLLERR | EPOLLHUP; error = ep_modify(ep, epi, epds); } } else error = -ENOENT; break; } mutex_unlock(&ep->mtx); error_tgt_fput: if (full_check) { clear_tfile_check_list(); loop_check_gen++; mutex_unlock(&epmutex); } fdput(tf); error_fput: fdput(f); error_return: return error; }
sys_epoll_ctl() 函数会根据传入不同 op 的值来进行不同操作,比如传入 EPOLL_CTL_ADD 表示要进行添加操作,那么就调用 ep_insert() 函数来进行添加操作。
我们继续来分析添加操作 ep_insert() 函数的实现:
static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) { ... error = -ENOMEM; // 申请一个 epitem 对象 if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) goto error_return; // 初始化 epitem 对象 INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; epq.epi = epi; // 等价于: epq.pt->qproc = ep_ptable_queue_proc init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); // 调用被监听文件的 poll 接口. // 这个接口由各自文件系统实现, 如 socket 的 tcp_poll(). revents = tfile->f_op->poll(tfile, &epq.pt); ... ep_rbtree_insert(ep, epi); // 把 epitem 对象添加到epoll的红黑树中进行管理 spin_lock_irqsave(&ep->lock, flags); // 如果被监听的文件已经可以进行对应的读写操作 // 那么就把文件添加到 epoll 的就绪队列 rdllist 中, 并且唤醒调用 epoll_wait() 的进程. if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irqrestore(&ep->lock, flags); ... return 0; ... }
被监听的文件是通过 epitem 对象进行管理的,也就是说被监听的文件会被封装成 epitem 对象,然后会被添加到 eventpoll 对象的红黑树中进行管理(如上述代码中的 ep_rbtree_insert(ep, epi))。
tfile->f_op->poll(tfile, &epq.pt) 这行代码的作用是调用被监听文件的 poll() 接口,如果被监听的文件是一个 socket 句柄,那么就会调用 tcp_poll(),我们来看看 tcp_poll() 做了什么操作:
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
struct sock *sk = sock->sk;
...
poll_wait(file, sk->sk_sleep, wait);
...
return mask;
}
每个 socket 对象都有个等待队列用于存放等待 socket 状态更改的进程。
从上述代码可以知道,tcp_poll() 调用了 poll_wait() 函数,而 poll_wait() 最终会调用 ep_ptable_queue_proc() 函数,其实现如下:
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq; if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { epi->nwait = -1; } }
ep_ptable_queue_proc() 函数主要工作是把当前 epitem 对象添加到 socket 对象的等待队列中,并且设置唤醒函数为 ep_poll_callback(),也就是说,当 socket 状态发生变化时,会触发调用 ep_poll_callback() 函数。
ep_poll_callback() 函数实现如下:
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
...
// 把就绪的文件添加到就绪队列中
list_add_tail(&epi->rdllink, &ep->rdllist);
is_linked:
// 唤醒调用 epoll_wait() 而被阻塞的进程。
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
...
return 1;
}
ep_poll_callback() 函数的主要工作是把就绪的文件描述符添加到 eventepoll 对象的就绪队列中,然后唤醒调用 epoll_wait() 被阻塞的进程。
把被监听的文件描述符添加到 epoll 后,就可以通过调用 epoll_wait() 等待被监听的文件状态发生改变。epoll_wait() 调用会阻塞当前进程,当被监听的文件状态发生改变时,epoll_wait() 调用便会返回。
epoll_wait() 系统调用的原型如下:
long epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数说明:
关于返回值:
成功时,epoll_wait() 返回做好准备的文件描述符的数量,如果在请求的超时毫秒内没有文件描述符准备就绪,则返回零。 当发生错误时,epoll_wait()返回-1,并适当设置errno。
epoll_wait() 函数会调用 sys_epoll_wait() 内核函数,而 sys_epoll_wait() 函数最终会调用 ep_poll() 函数,我们来看看 ep_poll() 函数的实现:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { ... // 如果就绪文件列表为空 if (list_empty(&ep->rdllist)) { // 把当前进程添加到epoll的等待队列中 init_waitqueue_entry(&wait, current); wait.flags |= WQ_FLAG_EXCLUSIVE; __add_wait_queue(&ep->wq, &wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE); // 把当前进程设置为睡眠状态 if (!list_empty(&ep->rdllist) || !jtimeout) // 如果有就绪文件或者超时, 退出循环 break; if (signal_pending(current)) { // 接收到信号也要退出 res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout); // 让出CPU, 切换到其他进程进行执行 spin_lock_irqsave(&ep->lock, flags); } // 有3种情况会执行到这里: // 1. 被监听的文件集合中有就绪的文件 // 2. 设置了超时时间并且超时了 // 3. 接收到信号 __remove_wait_queue(&ep->wq, &wait); set_current_state(TASK_RUNNING); } /* 是否有就绪的文件? */ eavail = !list_empty(&ep->rdllist); spin_unlock_irqrestore(&ep->lock, flags); if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout) goto retry; return res; }
ep_poll() 函数主要做以下几件事:
下面通过文字来描述一下这个 epoll 实现 IO 多路复用的整个过程:
epoll_create(2) - Linux manual page - man7.org
linux内核Epoll 实现原理
Linux source code (v6.0) - Elixir Bootlin
如果这篇文章说不清epoll的本质,那就过来掐死我吧!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。