赞
踩
epoll主要有三个核心函数
epoll_create 创建一个epoll
epoll_ctl 在epoll中增加、删除文件描述符
epoll_wait 等待epoll中文件描述符变为可活动的,然后返回结果
//poller.c
static inline int __poller_create_pfd()
{
return epoll_create(1);
}
//poller.c
poller_t *__poller_create(void **nodes_buf, const struct poller_params *params)
{
poller_t *poller = (poller_t *)malloc(sizeof (poller_t));
int ret;
if (!poller)
return NULL;
//创建poller
poller->pfd = __poller_create_pfd();
。。。
free(poller);
return NULL;
}
static int __mpoller_create(const struct poller_params *params, mpoller_t *mpoller) { ... if (nodes_buf) { for (i = 0; i < mpoller->nthreads; i++) { mpoller->poller[i] = __poller_create(nodes_buf, params); if (!mpoller->poller[i]) break; } ... } return -1; }
int mpoller_start(mpoller_t *mpoller)
{
...
for (i = 0; i < mpoller->nthreads; i++)
{
if (poller_start(mpoller->poller[i]) < 0)
break;
}
...;
}
int poller_start(poller_t *poller) { pthread_t tid; int ret; pthread_mutex_lock(&poller->mutex); if (__poller_open_pipe(poller) >= 0) { //创建线程,在里面跑poller ret = pthread_create(&tid, NULL, __poller_thread_routine, poller); if (ret == 0) { poller->tid = tid; poller->stopped = 0; } else { errno = ret; close(poller->pipe_wr); close(poller->pipe_rd); } } 。。。 }
//poller.c static void *__poller_thread_routine(void *arg) { poller_t *poller = (poller_t *)arg; 。。。 while (1) { __poller_set_timer(poller); //处理epoll事件 nevents = __poller_wait(events, POLLER_EVENTS_MAX, poller); clock_gettime(CLOCK_MONOTONIC, &time_node.timeout); has_pipe_event = 0; //epoll监听到有事件发生,处理事件 for (i = 0; i < nevents; i++) { node = (struct __poller_node *)__poller_event_data(&events[i]); if (node <= (struct __poller_node *)1) { if (node == (struct __poller_node *)1) has_pipe_event = 1; continue; } switch (node->data.operation) { case PD_OP_READ: __poller_handle_read(node, poller); break; case PD_OP_WRITE: __poller_handle_write(node, poller); break; ... } } 。。。 } return NULL; }
static inline int __poller_wait(__poller_event_t *events, int maxevents, poller_t *poller)
{
return epoll_wait(poller->pfd, events, maxevents, -1);
}
static inline int __poller_add_fd(int fd, int event, void *data,
poller_t *poller)
{
struct epoll_event ev = {
.events = event,
.data = {
.ptr = data
}
};
return epoll_ctl(poller->pfd, EPOLL_CTL_ADD, fd, &ev);
}
static inline int __poller_del_fd(int fd, int event, poller_t *poller)
{
return epoll_ctl(poller->pfd, EPOLL_CTL_DEL, fd, NULL);
}
综上,workflow 通过mpoller_create()创建poller,通过mpoller_start()启动poller。
WFGlobal::get_scheduler()
class WFGlobal
{
public:
static CommScheduler *get_scheduler();
...
}
CommScheduler
class CommScheduler
{
public:
int init(size_t poller_threads, size_t handler_threads)
{
return this->comm.init(poller_threads, handler_threads);
}
...
private:
Communicator comm;
};
Communicator::init
int Communicator::init(size_t poller_threads, size_t handler_threads)
{
....
create_poller(poller_threads); // 创建poller线程
....
}
int Communicator::create_poller(size_t poller_threads)
{
struct poller_params params = 默认参数;
msgqueue_create(); // 创建msgqueue
mpoller_create(¶ms, poller_threads); // 创建poller
mpoller_start(); // poller 启动
return -1;
}
这样就把每个poller放入每个线程运行,线程一直循环监听epoll的变化,如果有消息变化就处理消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。