当前位置:   article > 正文

Muduo网络库源码分析(三)线程间使用eventfd通信和EventLoop::runInLoop系列函数_muduo eventfd

muduo eventfd

先说第一点,线程(进程)间通信有很多种方式(pipe,socketpair),为什么这里选择eventfd?


eventfd 是一个比 pipe 更高效的线程间事件通知机制,一方面它比 pipe 少用一个 file descripor,节省了资源;另一方面,eventfd 的缓冲区管理也简单得多,全部“buffer” 只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。

最重要的一点:当我们想要编写并发型服务器的时候,eventfd 可以完美取代 pipe去通知(唤醒)其他的进程(线程)。比如经典的异步IO reactor/selector 应用场景,去唤醒select的调用。可以和事件通知机制完美的的结合。


(一)eventfd

  1. #include <sys/eventfd.h>
  2. int eventfd(unsigned int initval, intflags);


简单的应用示例:
  1. #include <sys/eventfd.h>
  2. #include <unistd.h>
  3. #include <stdlib.h>
  4. #include <stdio.h>
  5. #include <stdint.h> /* Definition of uint64_t */
  6. #define handle_error(msg) \
  7. do { perror(msg); exit(EXIT_FAILURE); } while (0)
  8. int
  9. main(int argc, char *argv[])
  10. {
  11. uint64_t u;
  12. int efd = eventfd(10, 0);
  13. if (efd == -1)
  14. handle_error("eventfd");
  15. int ret = fork();
  16. if(ret == 0)
  17. {
  18. for (int j = 1; j < argc; j++) {
  19. printf("Child writing %s to efd\n", argv[j]);
  20. u = atoll(argv[j]);
  21. ssize_t s = write(efd, &u, sizeof(uint64_t));
  22. if (s != sizeof(uint64_t))
  23. handle_error("write");
  24. }
  25. printf("Child completed write loop\n");
  26. exit(EXIT_SUCCESS);
  27. }
  28. else
  29. {
  30. sleep(2);
  31. ssize_t s = read(efd, &u, sizeof(uint64_t));
  32. if (s != sizeof(uint64_t))
  33. handle_error("read");
  34. printf("Parent read %llu from efd\n",(unsigned long long)u);
  35. exit(EXIT_SUCCESS);
  36. }
  37. }

(二)EventLoop::loop、runInLoop、queueInLoop、doPendingFunctors


先看一下这四个函数总体的流程图:



依次解释:

  1. // 该函数可以跨线程调用
  2. void EventLoop::quit()
  3. {
  4. quit_ = true;
  5. if (!isInLoopThread())
  6. {
  7. wakeup();
  8. }
  9. }
  10. //使用eventfd唤醒
  11. void EventLoop::wakeup()
  12. {
  13. uint64_t one = 1;
  14. //ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  15. ssize_t n = ::write(wakeupFd_, &one, sizeof one);
  16. if (n != sizeof one)
  17. {
  18. LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  19. }
  20. }

如果不是当前IO线程调用quit,则需要唤醒(wakeup())当前IO线程,因为它可能还阻塞在poll的位置(EventLoop::loop()),这样再次循环判断 while (!quit_) 才能退出循环。

  1. // 事件循环,该函数不能跨线程调用
  2. // 只能在创建该对象的线程中调用
  3. void EventLoop::loop()
  4. {// 断言当前处于创建该对象的线程中
  5. assertInLoopThread();
  6. while (!quit_)
  7. {
  8. pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
  9. eventHandling_ = true;
  10. for (ChannelList::iterator it = activeChannels_.begin();
  11. it != activeChannels_.end(); ++it)
  12. {
  13. currentActiveChannel_ = *it;
  14. currentActiveChannel_->handleEvent(pollReturnTime_);
  15. }
  16. currentActiveChannel_ = NULL;
  17. eventHandling_ = false;
  18. <span style="color:#ff0000;"> doPendingFunctors();</span>
  19. }
  20. }

  1. // 为了使IO线程在空闲时也能处理一些计算任务
  2. // 在I/O线程中执行某个回调函数,该函数可以跨线程调用
  3. void EventLoop::runInLoop(const Functor& cb)
  4. {
  5. if (isInLoopThread())
  6. {
  7. // 如果是当前IO线程调用runInLoop,则同步调用cb
  8. cb();
  9. }
  10. else
  11. {
  12. // 如果是其它线程调用runInLoop,则异步地将cb添加到队列,让IO线程处理
  13. queueInLoop(cb);
  14. }
  15. }

  1. void EventLoop::queueInLoop(const Functor& cb)
  2. {
  3. {
  4. MutexLockGuard lock(mutex_);
  5. pendingFunctors_.push_back(cb);
  6. }
  7. // 调用queueInLoop的线程不是当前IO线程则需要唤醒当前IO线程,才能及时执行doPendingFunctors();
  8. // 或者调用queueInLoop的线程是当前IO线程(比如在doPendingFunctors()中执行functors[i]() 时又调用了queueInLoop())
  9. // 并且此时正在调用pending functor,需要唤醒当前IO线程
  10. // 因为在此时doPendingFunctors() 过程中又添加了任务,故循环回去poll的时候需要被唤醒返回,进而继续执行doPendingFunctors()
  11. // 只有当前IO线程的事件回调中调用queueInLoop才不需要唤醒
  12. // 即在handleEvent()中调用queueInLoop 不需要唤醒,因为接下来马上就会执行doPendingFunctors();
  13. if (!isInLoopThread() || callingPendingFunctors_)
  14. {
  15. wakeup();
  16. }
  17. }

  1. // 该函数只会被当前IO线程调用
  2. void EventLoop::doPendingFunctors()
  3. {
  4. std::vector<Functor> functors;
  5. callingPendingFunctors_ = true;
  6. {
  7. MutexLockGuard lock(mutex_);
  8. functors.swap(pendingFunctors_);
  9. }
  10. for (size_t i = 0; i < functors.size(); ++i)
  11. {
  12. functors[i]();
  13. }
  14. callingPendingFunctors_ = false;
  15. }
关于doPendingFunctors 的补充说明

1、不是简单地在临界区内依次调用Functor,而是把回调列表swap到functors中,这样一方面减小了临界区的长度(意味着不会阻塞其它线程的queueInLoop()),另一方面,也避免了死锁(因为Functor可能再次调用queueInLoop())
2、由于doPendingFunctors()调用的Functor可能再次调用queueInLoop(cb),这时,queueInLoop()就必须wakeup(),否则新增的cb可能就不能及时调用了
3、muduo没有反复执行doPendingFunctors()直到pendingFunctors_为空而是每次poll 返回就执行一次,这是有意的,否则IO线程可能陷入死循环,无法处理IO事件。

总结一下就是:
假设我们有这样的调用:loop->runInLoop(run),说明想让IO线程执行一定的计算任务,此时若是在当前的IO线程,就马上执行run();如果是其他线程调用的,那么就执行queueInLoop(run),将run异步添加到队列,当loop内处理完事件后,就执行doPendingFunctors(),也就执行到了run();最后想要结束线程的话,执行quit。


参考:
《linux多线程服务端编程》
http://blog.csdn.net/yusiguyuan/article/details/40593721?utm_source=tuicool&utm_medium=referral


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/647525
推荐阅读
相关标签
  

闽ICP备14008679号