当前位置:   article > 正文

workflow源码解析:epoll

workflow源码解析:epoll

一、Linux下的epoll

epoll主要有三个核心函数
epoll_create 创建一个epoll
epoll_ctl 在epoll中增加、删除文件描述符
epoll_wait 等待epoll中文件描述符变为可活动的,然后返回结果

二、workflow对epoll的封装

1. 使用epoll_create

//poller.c
static inline int __poller_create_pfd()
{
	return epoll_create(1);
}
  • 1
  • 2
  • 3
  • 4
  • 5
//poller.c
poller_t *__poller_create(void **nodes_buf, const struct poller_params *params)
{
	poller_t *poller = (poller_t *)malloc(sizeof (poller_t));
	int ret;

	if (!poller)
		return NULL;
    //创建poller
	poller->pfd = __poller_create_pfd();
	。。。

	free(poller);
	return NULL;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
static int __mpoller_create(const struct poller_params *params,
							mpoller_t *mpoller)
{
    ...
	if (nodes_buf)
	{
		for (i = 0; i < mpoller->nthreads; i++)
		{
			mpoller->poller[i] = __poller_create(nodes_buf, params);
			if (!mpoller->poller[i])
				break;
		}
	...
	}

	return -1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.使用epoll_wait

int mpoller_start(mpoller_t *mpoller)
{
	...
	for (i = 0; i < mpoller->nthreads; i++)
	{
		if (poller_start(mpoller->poller[i]) < 0)
			break;
	}
	...;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
int poller_start(poller_t *poller)
{
	pthread_t tid;
	int ret;

	pthread_mutex_lock(&poller->mutex);
	if (__poller_open_pipe(poller) >= 0)
	{
	   //创建线程,在里面跑poller
		ret = pthread_create(&tid, NULL, __poller_thread_routine, poller);
		if (ret == 0)
		{
			poller->tid = tid;
			poller->stopped = 0;
		}
		else
		{
			errno = ret;
			close(poller->pipe_wr);
			close(poller->pipe_rd);
		}
	}
	。。。
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
//poller.c
static void *__poller_thread_routine(void *arg)
{
    poller_t *poller = (poller_t *)arg;
	。。。
	while (1)
	{
		__poller_set_timer(poller);
		//处理epoll事件
		nevents = __poller_wait(events, POLLER_EVENTS_MAX, poller);
		clock_gettime(CLOCK_MONOTONIC, &time_node.timeout);
		has_pipe_event = 0;
		//epoll监听到有事件发生,处理事件
		for (i = 0; i < nevents; i++)
		{
			node = (struct __poller_node *)__poller_event_data(&events[i]);
			if (node <= (struct __poller_node *)1)
			{
				if (node == (struct __poller_node *)1)
					has_pipe_event = 1;
				continue;
			}

			switch (node->data.operation)
			{
			case PD_OP_READ:
				__poller_handle_read(node, poller);
				break;
			case PD_OP_WRITE:
				__poller_handle_write(node, poller);
				break;
			...
			}
		}
		。。。
	}

	return NULL;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
static inline int __poller_wait(__poller_event_t *events, int maxevents,							poller_t *poller)
{
	return epoll_wait(poller->pfd, events, maxevents, -1);
}
  • 1
  • 2
  • 3
  • 4

3. 使用epoll_ctl

static inline int __poller_add_fd(int fd, int event, void *data,
								  poller_t *poller)
{
	struct epoll_event ev = {
		.events		=	event,
		.data		=	{
			.ptr	=	data
		}
	};
	return epoll_ctl(poller->pfd, EPOLL_CTL_ADD, fd, &ev);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
static inline int __poller_del_fd(int fd, int event, poller_t *poller)
{
	return epoll_ctl(poller->pfd, EPOLL_CTL_DEL, fd, NULL);
}

  • 1
  • 2
  • 3
  • 4
  • 5

综上,workflow 通过mpoller_create()创建poller,通过mpoller_start()启动poller。

三、workflow如何使用epoll

WFGlobal::get_scheduler()

class WFGlobal
{
public:
	static CommScheduler *get_scheduler();
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

CommScheduler

class CommScheduler
{
public:
	int init(size_t poller_threads, size_t handler_threads)
	{
		return this->comm.init(poller_threads, handler_threads);
	}

    ...
private:
	Communicator comm;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Communicator::init

int Communicator::init(size_t poller_threads, size_t handler_threads)
{
	....
	create_poller(poller_threads);   // 创建poller线程
	....
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
int Communicator::create_poller(size_t poller_threads)
{
	struct poller_params params = 默认参数;

	msgqueue_create();  // 创建msgqueue
	mpoller_create(&params, poller_threads);  // 创建poller
	mpoller_start();  // poller 启动

	return -1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

这样就把每个poller放入每个线程运行,线程一直循环监听epoll的变化,如果有消息变化就处理消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/66856
推荐阅读
相关标签
  

闽ICP备14008679号