赞
踩
首先,在阅读本章之前,我们需要搞清楚为什么EventLoop类这么复杂
其次,我们还需要再强调一次关于mainLoop唤醒subLoop的流程(可以看完该类代码后再回顾该流程):
为什么需要唤醒 subLoop?
subLoop(通常指的是工作线程中的 EventLoop)可能会被阻塞在 poller 的等待调用上,例如 epoll_wait。当主线程或其他线程需要向 subLoop 传递新任务或事件时,需要唤醒 subLoop,使其能够及时处理新提交的任务或事件。
subLoop 被阻塞在哪里?
subLoop 通常被阻塞在 poller 的等待调用上,如 epoll_wait、poll 或 select。这些系统调用会在没有事件发生时使线程进入阻塞状态,从而节省 CPU 资源。
为什么要有唤醒这个流程?
举一个例子,我们运行整个系统后,我们同时运行了一个mainLoop
,和3个subLoop
,我们其中一个subLoop1
正在执行相关事件的回调操作,subLoop2
、subLoop3
已经干完活了,被阻塞到 loop()方法的poller_->poll
调用上(也就是epoll_wait),现在我们的mianLoop
又来了新连接,那么minLoop
就会封装一个wakeupFd的channel和其他新的cfd的channle,那么mainLoop
就通过负载均衡算法(轮询)唤醒特定的、被阻塞的 subLoop,它被wakeupFd唤醒之后就开始真正干活了。
书接上回
首先我们定义好全局函数:
//防止一个线程创建多个EventLoop 作用相当于thread_local
__thread EventLoop *t_loopInThisThread = nullptr;
//定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;
//创建wakeupfd, 用来notify唤醒subReactor处理新来的channel
int createEventfd() {
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0) {
LOG_FATAL("eventfd error: %d \n", errno);
}
return evtfd;
}
在这里我们封装了定义wakeupFd_的函数,主要内容就是封装一个eventfd()系统调用。
EventLoop::EventLoop() : looping_(false) , quit_(false) , callingPendingFunctors_(false) , threadId_(CurrentThread::tid()) , poller_(Poller::newDefaultPoller(this)) , wakeupFd_(createEventfd()) , wakeupChannel_(new Channel(this, wakeupFd_)) { LOG_DEBUG("EventLoop create %p in thread %d \n", this, threadId_); if (t_loopInThisThread) { LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_); } else { t_loopInThisThread = this; } //设置wakeupFd的事件类型以及发生事件后的回调操作 wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this)); // 每一个eventloop都将监听wakeupChannel的EPOLLIN读事件 wakeupChannel_->enableReading(); } void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = read(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n); } } EventLoop::~EventLoop() { wakeupChannel_->disableAll(); wakeupChannel_->remove(); ::close(wakeupFd_); t_loopInThisThread = nullptr; }
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_))
handleRead
中发送的东西并不重要,只是让subReactor感知到我们的fd上面有读事件发生,我就睡醒去干活了,就能去拿到新用户连接的channel了。 ...
// 设置wakeupfd的事件类型以及发生事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// 每一个eventloop都将监听wakeupchannel的EPOLLIN读事件了
wakeupChannel_->enableReading();
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n);
}
}
最后必须谈一下我们的线程绑定:
{
...
if (t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}
...
}
析构函数的主要作用是清理资源,关闭文件描述符,并解除 EventLoop 与线程的绑定。
wakeupChannel_->disableAll(); //禁用 wakeupChannel_ 上的所有事件。
wakeupChannel_->remove(); // 将 wakeupChannel_ 从 Poller 中移除。
::close(wakeupFd_); //关闭用于唤醒的文件描述符 wakeupFd_,释放资源。
t_loopInThisThread = nullptr;//解除 EventLoop 与线程的绑定
剩下的资源基本都是由智能指针进行管理,不需要我们来手动操作了,比如说:
std::unique_ptr<Poller> poller_;
std::unique_ptr<Channel> wakeupChannel_;
该函数用来开启事件循环,也是我们EventLoop最核心的函数,它的主要任务就是用来调度底层的Poller开启事件分发器,开始监听事件。
先定义好状态位置,也就是说该EventLoop开启,非退出状态。
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
...
}
然后开启了我们的while循环,这个while死循环熟不熟悉!这段代码务必结合poller->poll一起来看,我们通过传递给poll一个空的activeChannels,让他来代劳监听任务,其实就可以理解为,之前我们在写网络编程时直接调用了一个epoll_wait
,只不过现在被封装好了:
while(!quit_)
{
activeChannels_.clear();
// 监听两类fd 一种是client的fd,一种wakeupfd
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
...
}
我们站在EventLoop的角色来看,当底层的epoll发生事件以后,activeChannels_这个vector里面放的就是所有发生事件的channel。
在此之后,我们得到了发生事件的channels,那我现在就应该去处理它:
while(!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel * channel : ativeChannels_)
//Poller监听哪些channel发生了事件,上报给EventLoop,通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
//执行当前EventLoop事件循环需要处理的回调操作
doPendingFunctors();
这里我们的Poller监听到了发生事件的channel,然后立马上报给EventLoop,通知channel处理相应的事件。这里的handleEvent
无非就对应了那些读、写、错误、关闭等回调函数。
随后我们调用了doPendingFunctors()
,这里的函数表示执行当前EventLoop事件循环需要处理的回调操作,这里是什么意思呢?
这里梳理一下整个流程来帮助理解doPendingFunctors()操作:
- 首先我们的IO线程mainLoop,它主要用来做accept的工作,就是来接受新用户的连接,然后accept会返回一个通信用的fd,我们肯定会用一个channel打包fd的。
- 由于我们的mainLoop只管理新用户的连接工作,打包好的fd,必须得分发给subLoop,如果我们从未调用过muduo库的setThreadNum(该函数后续会讲),也就是我们目前只有一个loop也就是我们的mainLoop,也就是说到时候我们的mainLoop不仅要监听新用户的连接,还要负责已连接用户的读写事件。
- 如果我们调用了setThreadNum(并且作为服务器我们肯定会调用setThreadNum的),所以这里我们肯定会起一定数量的subloop,那么mainLoop拿到跟新用户通信的channel之后,就会唤醒某一个subloop。
- 所以mainLoop会实现注册一个回调cb(CallBackFunction),这个回调需要subloop来执行。那么我们现在把目前的
loop
函数想象成一个subloop的loop
调用,但是问题是这个subloop还在睡觉呢,还没起床。- 现在需要我们的mainLoop wakeup该subloop之后,起来以后它做的事情首先就是执行doPendingFunctors(),也就是执行回调,其回调都在
std::vector<Functor> pendingFunctors_
中写着,那么这个回调就是之前mainLoop注册的cb操作,这个cb可能是1个,也可能是多个。
- 这就是doPendingFunctors()存在的意义。随后我们会讲解doPendingFunctors()如何实现(一般它与我们的
queueInLoop
配合使用)
void EventLoop::quit()
这里的quit()函数也非常讲究:
Poller_->poll
了,然后在loop()函数中将不再满足while(!quit)
的条件,所以整个loop()调用就正常结束了。Poller_->poll
里返回回来了,它再回到while将不再满足while(!quit)
,从而正常结束loop()的调用。//用来唤醒loop所在的线程 向wakeupfd_写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒 void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR("EventLoop::wakeup writes %lu bytes instead of 8", n); } } // EventLoop的方法==》Poller的方法 void EventLoop::updateChannel(Channel *channel) { poller_->updateChannel(channel); } void EventLoop::removeChannel(Channel *channel) { poller_->removeChannel(channel); } bool EventLoop::hasChannel(Channel *channel){ return poller_->hasChannel(channel); }
//在当前loop中执行cb void EventLoop::runInLoop(Functor cb) { if (isInLoopThread()) { //在当前的loop线程中执行callback cb(); } else { //在非loop线程中执行cb,就需要唤醒loop所在线程,执行cb queueInLoop(cb); } } // 把cb放入队列中,唤醒loop所在的线程,执行cb void EventLoop::queueInLoop(Functor cb) { { std::unique_lock<std::mutex> lock(mutex_); pendingFunctors_.emplace_back(cb); } //唤醒相应的,需要执行上面回调操作的loop的线程了 if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); //唤醒loop所在线程 } }
runInLoop
方法用于在 EventLoop 所在的线程中直接执行一个回调函数。如果当前线程是 EventLoop 所属的线程,那么直接执行回调函数;否则,将回调函数添加到队列,并唤醒 EventLoop 线程来执行回调函数。queueInLoop
方法将回调函数添加到 pendingFunctors_ 队列,并唤醒 EventLoop 线程来处理这些回调函数。这种方法用于异步任务的执行。|| callingPendingFunctors_
呢?我们需要先搞清楚doPendingFunctors()
的逻辑void EventLoop::doPendingFunctors() {//执行回调
std::vector<Functor> functors;
callingPendingFunctors_ = true; // 表示需要执行回调
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor &functor : functors)
functor(); //执行当前loop需要执行的回调操作
callingPendingFunctors_ = false; //回调执行完了,开始新一轮循环
}
它首先定义了一个局部的 std::vector<Functor> functors
,来装回调函数,然后把callingPendingFunctors_
置为true
;
然后我们之前在queueInLoop()
中执行了往pendingFunctors
里装了回调函数,现在我们把它放到了一个局部定义的新的functors
中,并且把pendingFunctors_
置为空,为什么要这么做呢?
因为我们如果不这样做,直接在pendingFunctors
上操作,那么我们就得变执行回调函数,边从pendingFunctors
上取出回调函数,但是这样的话别的loop有可能还在往这上面注册回调函数呢,那我们是加锁还是不加锁呢,加锁回阻塞我们的mainloop线程可能导致它无法去监听新连接,不加锁那我们的pendingFunctors
岂不是乱套了?
现在我们也可以解释
EventLoop::queueInLoop(Functor cb)
中:if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); }
- 1
- 2
- 3
这里的callingPendingFunctors_就是表示我当前的subReactor正在执行回调「也就是说在while(!quit_)循环体内」的同时,某个线程调用
EventLoop::queueInLoop(Functor cb)
又给我的pendingFunctors_
里写了新的回调函数,那么我肯定得再唤醒一次,不然subReactor在loop()
函数中会被被阻塞到poller_->poll()
处。但是有了wakeup()
之后,就不会发生这个事情了!
如果我们在mainloop和subLoop之间放一个生产者消费者的线程安全的队列,这样的话我们的逻辑会相当好处理。
/*
mainLoop
========================生产者消费者的线程安全队列
subLoop1 subLoop1 subLoop1
*/
但是在我们的muduo库中是不存在这个队列,mainLoop和各个subLoop是直接通过我们的wakeupFd_来进行线程间的通信。
所以在这里我们函数在执行的时候,逻辑相当巧妙,这里的EventLoop类的代码逻辑非常非常巧妙。
EventLoop.h代码:
#pragma once #include <functional> #include <vector> #include <atomic> #include <memory> #include <mutex> #include "Timestamp.h" #include "noncopyable.h" #include "CurrentThread.h" class Channel; class Poller; //事件循环类 主要包含了两个大模块channel Pollor(epoll的抽象) class EventLoop : noncopyable { public: using Functor = std::function<void()>; EventLoop(); ~EventLoop(); //开启事件循环 void loop(); //退出事件循环 void quit(); Timestamp pollReturnTime() const { return pollReturnTime_; } // 在当前loop中执行cb void runInLoop(Functor cb); //把cb放入队列中,唤醒loop所在的线程后再去执行cb void queueInLoop(Functor cb); //用来唤醒loop所在的线程 void wakeup(); // EventLoop的方法==》Poller的方法 void updateChannel(Channel *channel); void removeChannel(Channel *channel); bool hasChannel(Channel *channel); //判断EventLoop对象是否已经在自己的线程里面 bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } private: void handleRead(); //wake up void doPendingFunctors(); using ChannelList = std::vector<Channel*>; std::atomic_bool looping_; //原子操作,通过CAS实现 std::atomic_bool quit_; //标识退出loop循环 const pid_t threadId_; //记录当前loop所在线程的id Timestamp pollReturnTime_; //poller返回事件的channels的时间点 std::unique_ptr<Poller> poller_; int wakeupFd_; std::unique_ptr<Channel> wakeupChannel_; ChannelList activeChannels_; std::atomic_bool callingPendingFunctors_; //标识当前loop是否有需要执行的回调操作 std::vector<Functor> pendingFunctors_; //存储loop需要执行的所有回调操作 std::mutex mutex_; //互斥锁,用来保护上面vector容器的线程安全操作 };
EventLoop.cc:
#include "EventLoop.h" #include "Logger.h" #include "Poller.h" #include "Channel.h" #include <unistd.h> #include <sys/eventfd.h> //防止一个线程创建多个EventLoop 作用相当于thread_local __thread EventLoop *t_loopInThisThread = nullptr; //定义默认的Poller IO复用接口的超时时间 const int kPollTimeMs = 10000; //创建wakeupfd, 用来notify唤醒subReactor处理新来的channel int createEventfd() { int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0) { LOG_FATAL("eventfd error: %d \n", errno); } return evtfd; } EventLoop::EventLoop() : looping_(false) , quit_(false) , callingPendingFunctors_(false) , threadId_(CurrentThread::tid()) , poller_(Poller::newDefaultPoller(this)) , wakeupFd_(createEventfd()) , wakeupChannel_(new Channel(this, wakeupFd_)) { LOG_DEBUG("EventLoop create %p in thread %d \n", this, threadId_); if (t_loopInThisThread) { LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_); } else { t_loopInThisThread = this; } //设置wakeupFd的事件类型以及发生事件后的回调操作 wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this)); // 每一个eventloop都将监听wakeupChannel的EPOLLIN读事件 wakeupChannel_->enableReading(); } void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = read(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n); } } EventLoop::~EventLoop() { wakeupChannel_->disableAll(); wakeupChannel_->remove(); ::close(wakeupFd_); t_loopInThisThread = nullptr; } // 开启事件循环 void EventLoop::loop() { looping_ = true; quit_ = false; LOG_INFO("EventLoop %p start looping \n", this); while (!quit_) { activeChannels_.clear(); //监听两类fd,一种是client的fd,一种是wakeupFd pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); for (Channel* channel : activeChannels_) { //Poller监听那些channel发生了事件,然后上报给EventLoop,通知channel处理相应的事件 channel->handleEvent(pollReturnTime_); } //执行当前EventLoop事件循环需要处理的回调操作 doPendingFunctors(); } LOG_INFO("EventLoop %p stop looping. \n", this); looping_ = false; } //退出事件循环 void EventLoop::quit() { quit_ = true; if (!isInLoopThread()) { wakeup(); } } //在当前loop中执行cb void EventLoop::runInLoop(Functor cb) { if (isInLoopThread()) { //在当前的loop线程中执行callback cb(); } else { //在非loop线程中执行cb,就需要唤醒loop所在线程,执行cb queueInLoop(cb); } } // 把cb放入队列中,唤醒loop所在的线程,执行cb void EventLoop::queueInLoop(Functor cb) { { std::unique_lock<std::mutex> lock(mutex_); pendingFunctors_.emplace_back(cb); } //唤醒相应的,需要执行上面回调操作的loop的线程了 if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); //唤醒loop所在线程 } } //用来唤醒loop所在的线程 向wakeupfd_写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒 void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR("EventLoop::wakeup writes %lu bytes instead of 8", n); } } // EventLoop的方法==》Poller的方法 void EventLoop::updateChannel(Channel *channel) { poller_->updateChannel(channel); } void EventLoop::removeChannel(Channel *channel) { poller_->removeChannel(channel); } bool EventLoop::hasChannel(Channel *channel){ return poller_->hasChannel(channel); } void EventLoop::doPendingFunctors() {//执行回调 std::vector<Functor> functors; callingPendingFunctors_ = true; { std::unique_lock<std::mutex> lock(mutex_); functors.swap(pendingFunctors_); } for (const Functor &functor : functors) functor(); //执行当前loop需要执行的回调操作 callingPendingFunctors_ = false; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。