赞
踩
先说第一点,线程(进程)间通信有很多种方式(pipe,socketpair),为什么这里选择eventfd?
eventfd 是一个比 pipe 更高效的线程间事件通知机制,一方面它比 pipe 少用一个 file descripor,节省了资源;另一方面,eventfd 的缓冲区管理也简单得多,全部“buffer” 只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。
最重要的一点:当我们想要编写并发型服务器的时候,eventfd 可以完美取代 pipe去通知(唤醒)其他的进程(线程)。比如经典的异步IO reactor/selector 应用场景,去唤醒select的调用。可以和事件通知机制完美的的结合。
(一)eventfd
- #include <sys/eventfd.h>
- int eventfd(unsigned int initval, intflags);
- #include <sys/eventfd.h>
- #include <unistd.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <stdint.h> /* Definition of uint64_t */
-
- #define handle_error(msg) \
- do { perror(msg); exit(EXIT_FAILURE); } while (0)
-
- int
- main(int argc, char *argv[])
- {
- uint64_t u;
-
- int efd = eventfd(10, 0);
- if (efd == -1)
- handle_error("eventfd");
-
- int ret = fork();
- if(ret == 0)
- {
- for (int j = 1; j < argc; j++) {
- printf("Child writing %s to efd\n", argv[j]);
- u = atoll(argv[j]);
- ssize_t s = write(efd, &u, sizeof(uint64_t));
- if (s != sizeof(uint64_t))
- handle_error("write");
- }
- printf("Child completed write loop\n");
-
- exit(EXIT_SUCCESS);
- }
- else
- {
- sleep(2);
-
- ssize_t s = read(efd, &u, sizeof(uint64_t));
- if (s != sizeof(uint64_t))
- handle_error("read");
- printf("Parent read %llu from efd\n",(unsigned long long)u);
- exit(EXIT_SUCCESS);
- }
- }
先看一下这四个函数总体的流程图:
依次解释:
- // 该函数可以跨线程调用
- void EventLoop::quit()
- {
- quit_ = true;
- if (!isInLoopThread())
- {
- wakeup();
- }
- }
-
- //使用eventfd唤醒
- void EventLoop::wakeup()
- {
- uint64_t one = 1;
- //ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
- ssize_t n = ::write(wakeupFd_, &one, sizeof one);
- if (n != sizeof one)
- {
- LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
- }
- }
- // 事件循环,该函数不能跨线程调用
- // 只能在创建该对象的线程中调用
- void EventLoop::loop()
- {// 断言当前处于创建该对象的线程中
- assertInLoopThread();
- while (!quit_)
- {
- pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
-
- eventHandling_ = true;
- for (ChannelList::iterator it = activeChannels_.begin();
- it != activeChannels_.end(); ++it)
- {
- currentActiveChannel_ = *it;
- currentActiveChannel_->handleEvent(pollReturnTime_);
- }
- currentActiveChannel_ = NULL;
- eventHandling_ = false;
- <span style="color:#ff0000;"> doPendingFunctors();</span>
- }
- }
- // 为了使IO线程在空闲时也能处理一些计算任务
- // 在I/O线程中执行某个回调函数,该函数可以跨线程调用
- void EventLoop::runInLoop(const Functor& cb)
- {
- if (isInLoopThread())
- {
- // 如果是当前IO线程调用runInLoop,则同步调用cb
- cb();
- }
- else
- {
- // 如果是其它线程调用runInLoop,则异步地将cb添加到队列,让IO线程处理
- queueInLoop(cb);
- }
- }
- void EventLoop::queueInLoop(const Functor& cb)
- {
- {
- MutexLockGuard lock(mutex_);
- pendingFunctors_.push_back(cb);
- }
-
- // 调用queueInLoop的线程不是当前IO线程则需要唤醒当前IO线程,才能及时执行doPendingFunctors();
-
- // 或者调用queueInLoop的线程是当前IO线程(比如在doPendingFunctors()中执行functors[i]() 时又调用了queueInLoop())
- // 并且此时正在调用pending functor,需要唤醒当前IO线程
- // 因为在此时doPendingFunctors() 过程中又添加了任务,故循环回去poll的时候需要被唤醒返回,进而继续执行doPendingFunctors()
-
- // 只有当前IO线程的事件回调中调用queueInLoop才不需要唤醒
- // 即在handleEvent()中调用queueInLoop 不需要唤醒,因为接下来马上就会执行doPendingFunctors();
- if (!isInLoopThread() || callingPendingFunctors_)
- {
- wakeup();
- }
- }
- // 该函数只会被当前IO线程调用
- void EventLoop::doPendingFunctors()
- {
- std::vector<Functor> functors;
- callingPendingFunctors_ = true;
-
- {
- MutexLockGuard lock(mutex_);
- functors.swap(pendingFunctors_);
- }
-
- for (size_t i = 0; i < functors.size(); ++i)
- {
- functors[i]();
- }
- callingPendingFunctors_ = false;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。