赞
踩
事实上Channel模块并不算是一个单独的模块,顶多算是一个单独的类。Channel模块和EventLoop模块、Poller模块是紧密关联的,其中通过EventLoop模块创建出来的对象就是一个Reactor,EventLoop对象里面有一个Poller对象,用来监控事件。至于监控什么样的事件,是可读事件还是可写事件,以及各种事件触发以后该调用什么回调函数去处理,这就是由EventLoop对象通过Channel来设置的了。
Channel模块一共管理五类事件,分别是:可读事件、可写事件、错误事件、连接断开事件以及任意事件。每种事件都要设置事件触发后的回调函数。所以Channel模块的成员变量需要这五类事件触发后的回调函数。另外还需要绑定一个EventLoop对象,也就是说一个Channel对象是绑定一个EventLoop对象的,因为设置好了事件以后,需要通过EventLoop对象的Poller对象将Channel事件监控起来。当然了,Channel对象监控的文件描述符以及需要监控的事件也必须要有,所以这就是Channel类的成员变量:
private:
int _fd; // socket文件描述符
uint32_t _events; // 当前需要监控的事件
uint32_t _revents; // 当前连接触发的事件
EventCallBack _read_callback; // 可读事件被触发的回调函数
EventCallBack _write_callback; // 可写事件被触发的回调函数
EventCallBack _error_callback; // 错误事件被触发的回调函数
EventCallBack _close_callback; // 连接断开事件被触发的回调函数
EventCallBack _event_callback; // 任意事件被触发的回调函数
EventLoop *_eventLoop;
};
Channel类比较重要的一个接口是handlerEvent函数接口,这个接口是真正的事件处理接口,一旦事件触发了,就会调用这个函数。也就是说这个函数是被上层调用的,具体的调用逻辑是:当Poller模块监控到有事件触发时,就将触发事件的文件描述符对应的Channel对象返回,返回给EventLoop对象,EventLoop对象再调用该Channel对象的handlerEvent函数,去执行触发事件对应的回调函数。
// 事件处理,一旦连接触发了事件,就调用这个函数 void handlerEvent() { // EPOLLIN: 可读事件触发 // EPOLLRDHUP: 对端关闭了连接,表示读关闭 // ?思考为什么对端关闭连接要调用可读事件回调函数,猜测与缓冲区还有数据有关 // EPOLLPRI:表示有紧急数据需要读取 if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { // // 不管是什么事件,都要调用任意事件回调函数 // if (_event_callback) // { // _event_callback(); // } if (_read_callback) { _read_callback(); } } // EPOLLOUT:表示写事件触发 if (_revents & EPOLLOUT) { if (_write_callback) { _write_callback(); } } // EPOLLERR:表示出错 else if (_revents & EPOLLERR) { if (_error_callback) { _error_callback(); } } // EPOLLHUP:表示连接关闭 else if (_revents & EPOLLHUP) { if (_close_callback) { _close_callback(); } } // 不管是什么事件,都要调用任意事件回调函数 if (_event_callback) { _event_callback(); } }
其它的实现都比较简单,就是在设置对应的事件以及事件触发的回调函数,这里就不过多介绍了,看代码就能看得懂,Channel类完整代码如下:
using EventCallBack = std::function<void()>; class EventLoop; /// @brief Channel是事件管理类,用来管理eventLoop对象所关心的所有事件 class Channel { public: /// @brief 构造函数 /// @param eventLoop 需要传入EventLoop对象来初始化Channel,因为Channel管理的是EventLoop对象的事件, /// EventLoop对象就是一个reactor,EventLoop对象里面有一个Poller对象用来监控事件 /// 至于监控什么事件,这就是由EventLoop对象通过Channel来设置了,这就是三者之间的关系 /// @param fd 还需要用文件描述符来初始化,因为必须要知道Channel要管理的事件是哪一个文件描述符的事件 Channel(EventLoop *eventLoop, int fd) : _fd(fd), _events(0), _revents(0), _eventLoop(eventLoop) { } int getFd() { return _fd; } void setRevents(uint32_t events) { _revents = events; } // 当前是否监控了可读 bool isReadAble() { return (_events & EPOLLIN); } // 当前是否监控了可写 bool isWriteAble() { return (_events & EPOLLOUT); } // 启动读事件监控 void startReadAbleEvent() { _events |= EPOLLIN; // 这里需要调用updateEvent函数接口,用来更新_events // 这个接口是Channel类实现的,但是这个接口其实调用的是eventLoop对象的updateEvent函数接口 // 它的逻辑链是Channel更新了_event事件,就要交给Poller监控模块进行监控 // 但是这不能跳过eventLoop对象,因为reactor才是操作Channel和Poller的主体 // 所以必须调用eventLoop对象的更新事件函数接口,然后eventLoop对象再调用Poller类的更新事件接口 // 这样就能将事件更新到Poller模块,实现新事件的监控 updateEvent(); } // 启动写事件监控 void startWriteAbleEvent() { _events |= EPOLLOUT; updateEvent(); } // 关闭读事件监控 void closeReadAbleEvent() { _events &= ~EPOLLIN; updateEvent(); } // 关闭写事件监控 void closeWriteAbleEvent() { _events &= ~EPOLLOUT; updateEvent(); } // 关闭所有事件监控 void closeAllEvent() { _events = 0; updateEvent(); } void updateEvent(); // 移除监控 void removeEvent(); // 移除所有监控 void removeAllEvents(); // 设置可读事件回调函数 // 这个回调函数由外界传入,当revent事件就绪时,会首先检查就绪的事件是什么类型的事件 // 这里设置可读事件、可写事件等等的回调函数,目的是在对应事件就绪时,可以调用对应的回调函数去处理 void setReadAbleCallBack(const EventCallBack &callBack) { _read_callback = callBack; } // 设置可写事件回调函数 void setWriteAbleCallBack(const EventCallBack &callBack) { _write_callback = callBack; } // 设置错误事件回调函数 void setErrorCallBack(const EventCallBack &callBack) { _error_callback = callBack; } // 设置连接关闭事件回调函数 void setCloseCallBack(const EventCallBack &callBack) { _close_callback = callBack; } // 设置任意事件回调函数 void setEventCallBack(const EventCallBack &callBack) { _event_callback = callBack; } // 事件处理,一旦连接触发了事件,就调用这个函数 void handlerEvent() { // EPOLLIN: 可读事件触发 // EPOLLRDHUP: 对端关闭了连接,表示读关闭 // ?思考为什么对端关闭连接要调用可读事件回调函数,猜测与缓冲区还有数据有关 // EPOLLPRI:表示有紧急数据需要读取 if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { // // 不管是什么事件,都要调用任意事件回调函数 // if (_event_callback) // { // _event_callback(); // } if (_read_callback) { _read_callback(); } } // EPOLLOUT:表示写事件触发 if (_revents & EPOLLOUT) { if (_write_callback) { _write_callback(); } } // EPOLLERR:表示出错 else if (_revents & EPOLLERR) { if (_error_callback) { _error_callback(); } } // EPOLLHUP:表示连接关闭 else if (_revents & EPOLLHUP) { if (_close_callback) { _close_callback(); } } // 不管是什么事件,都要调用任意事件回调函数 if (_event_callback) { _event_callback(); } } uint32_t getEvent() { return _events; } private: int _fd; // socket文件描述符 uint32_t _events; // 当前需要监控的事件 uint32_t _revents; // 当前连接触发的事件 EventCallBack _read_callback; // 可读事件被触发的回调函数 EventCallBack _write_callback; // 可写事件被触发的回调函数 EventCallBack _error_callback; // 错误事件被触发的回调函数 EventCallBack _close_callback; // 连接断开事件被触发的回调函数 EventCallBack _event_callback; // 任意事件被触发的回调函数 EventLoop *_eventLoop; }; void Channel::removeEvent() { _eventLoop->removeEvent(this); } void Channel::updateEvent() { _eventLoop->updateEvent(this); }
Poller模块负责的是监控事件,事实上就是封装了epoll多路转接那一套操作,它的成员变量如下:这里可以简单介绍一下_channels这个哈希表结构,之所以需要这个结构,是因为EventLoop模块向Poller模块新增事件的时候,Poller模块也需要通过_channels这个结构将这些Channel对象管理起来,当Poller监控到事件触发的时候,可以直接从_channels里找到对应的Channel对象返回给上层,上层再直接调用Channel对象的handlerEvent函数即可。
private:
int _epfd; // epoll模型对应的epollfd
epoll_event _events[MAX_EPOLLEVENTS]; // 用来接收就绪事件的数组
// 用来保存Channel对象的哈希表,key值是Channel对象管理的事件对应的文件描述符,value值是Channel对象
// Poller类是与EventLoop类关联起来的,EventLoop对象里面有一个Poller对象
// Poller对象里面管理的事件不止一个,管理的文件描述符也不止一个,同一个文件描述符下的不同事件通过Channel对象来管理
// 不同的文件描述符对应的Channel对象就通过这个_channels成员变量来管理
std::unordered_map<int, Channel *> _channels;
Poller模块的实现也比较简单,因为就是epoll那一套操作,这里简单介绍一下几个函数接口的作用即可。首先在Poller类的构造函数处调用epoll_create创建epoll模型,然后updateEvent函数是给外部调用来添加或者修改监控事件的,removeEvent则是移除监控事件,最后poll函数是外部调用来启动epoll监控的,这个函数会调用epoll_wait函数接口去等待事件就绪,有事件就绪以后就从_channels里面找到对应的Channel对象返回。Poller类的完整代码如下:
#define MAX_EPOLLEVENTS 1024 class Poller { public: /// @brief 构造函数,需要调用epoll_create创建epoll模型 Poller() { _epfd = epoll_create(MAX_EPOLLEVENTS); if (_epfd < 0) { LOG("epoll_create error"); abort(); } } // 添加或修改监控事件 void updateEvent(Channel *channel) { // 首先判断形参传递进来的Channel对象是否已经存在在_channels哈希表中 // 如果存在就直接更新事件即可,如果不存在就先增加到哈希表中再更新事件 bool isAddChannelRes = isAddChannel(channel); if (isAddChannelRes == false) { // 不存在则添加 _channels.insert(std::make_pair(channel->getFd(), channel)); if (!update(channel, EPOLL_CTL_ADD)) { LOG("update error"); } } else { if (!update(channel, EPOLL_CTL_MOD)) { LOG("update error"); } } } // 移除监控 void removeEvent(Channel *channel) { // 首先要检查_channels哈希表中是否存在对应的channel对象,如果存在就要将其删除 auto iter = _channels.find(channel->getFd()); if (iter != _channels.end()) { _channels.erase(iter); } // 然后更新事件 if (!update(channel, EPOLL_CTL_DEL)) { LOG("update error"); } } // 开始监控,返回活跃连接 void poll(std::vector<Channel *> *active) { // 调用epoll_wait函数接口,开始监控,就绪事件保存在_events数组中,返回值nfds表示就绪事件的个数 int nfds = epoll_wait(_epfd, _events, MAX_EPOLLEVENTS, -1); if (nfds < 0) { if (errno == EINTR) { return; } LOG("epoll_wait error"); abort(); } // 遍历_events数组中的所有就绪事件 for (int i = 0; i < nfds; i++) { // 先通过就绪事件的文件描述符到_channels哈希表中查找 auto iter = _channels.find(_events[i].data.fd); // 必须保证能够在_channels哈希表中找到对应的channel对象,找不到是不合理的 assert(iter != _channels.end()); // 找到了以后拿到该文件描述符对应的channel对象,设置给对象的revent就绪事件 iter->second->setRevents(_events[i].events); // 然后将就绪事件加入active活跃连接数组中返回给外界 active->push_back(iter->second); } } void clearChannels() { _channels.clear(); } private: // 对epoll的直接操作 // 更新epoll模型中的事件,这个事件通过形参的channel对象获取 // 可以是新增事件、修改事件、删除事件,通过op决定 bool update(Channel *channel, int op) { int fd = channel->getFd(); epoll_event event; event.data.fd = fd; event.events = channel->getEvent(); int epollCtlRes = epoll_ctl(_epfd, op, fd, &event); if (epollCtlRes < 0) { LOG("epoll_ctl error"); return false; } return true; } // 判断一个Channel是否已经添加了事件监控 bool isAddChannel(Channel *channel) { auto iter = _channels.find(channel->getFd()); if (iter == _channels.end()) { return false; } return true; } private: int _epfd; // epoll模型对应的epollfd在这里插入代码片 epoll_event _events[MAX_EPOLLEVENTS]; // 用来接收就绪事件的数组 // 用来保存Channel对象的哈希表,key值是Channel对象管理的事件对应的文件描述符,value值是Channel对象 // Poller类是与EventLoop类关联起来的,EventLoop对象里面有一个Poller对象 // Poller对象里面管理的事件不止一个,管理的文件描述符也不止一个,同一个文件描述符下的不同事件通过Channel对象来管理 // 不同的文件描述符对应的Channel对象就通过这个_channels成员变量来管理 std::unordered_map<int, Channel *> _channels; };
EventLoop类是封装Reactor操作的类,我们先来看一下EventLoop类的成员变量:
private:
std::thread::id _thread_id; // 线程ID
int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞
Poller _poller; // 进行所有描述符的事件监控
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 实现任务池操作的线程安全
std::unique_ptr<Channel> _event_channel; // 这是管理eventfd的Channel对象
TimerWheel _timerWheel;
接下来我们就要介绍一下EventLoop里很重要的一个函数——runInLoop函数,这个函数确保我们在不加锁的情况下也能保证线程安全,原因就是one loop one thread,我们将EventLoop对象与线程绑定起来了,而且一个线程只有一个EventLoop对象。runInLoop的实现非常简单,只需要传递进来需要执行的任务,这个任务其实就是一个执行函数,比如新增定时器函数、刷新定时任务函数、取消定时任务函数等等。runInLoop函数首先会判断当前执行的线程是否是EventLoop对象绑定的线程,如果是的话,就直接执行这个任务,如果不是的话就不能执行,不能让其它线程执行这个任务,否则会线程不安全,所以只能将其加入到任务池中等待执行。
// 用于判断当前线程是否是EventLoop对应的线程 bool isInLoop() { return (_thread_id == std::this_thread::get_id()); } // 判断将要执行的任务是否在当前线程中,如果是则执行,如果不是则压入任务队列 void runInLoop(const Functor &callBack) { if (isInLoop()) { callBack(); return; } pushTaskQueue(callBack); }
将任务加入到任务池之后,前面说了,为了防止连接没有发送数据而导致线程阻塞等待在epoll_wait处,所以需要利用eventfd立即唤醒线程。
void weakupEventFd() { uint64_t val = 1; // 向eventfd文件描述符中写入数据,写入的数据也不重要 // 因为这个文件描述符已经被epoll监控起来了 // 只要有数据到来就会触发可读事件,那epoll_wait就不会阻塞了 // 这样就可以唤醒可能因为没有事件就绪而阻塞的线程 int writeRes = write(_event_fd, &val, sizeof(val)); if (writeRes < 0) { if (errno == EINTR) { return; } LOG("write eventfd error"); abort(); } } // 将任务压入任务队列 void pushTaskQueue(const Functor &callBack) { // 这里需要加锁保护_tasks的插入 // 因为_tasks是临界资源,这里有可能会出现当前线程 { std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(callBack); } weakupEventFd(); }
这就是runInLoop函数的逻辑,接下来介绍EventLoop另一个很重要的函数接口,这个接口就是外部通过EventLoop启动事件监控的接口,该接口的实现是,首先调用Poller的poll函数启动事件监控,这个函数会返回所有触发事件的Channel对象,通过这个Channel对象去调用handlerEvent函数,就能调用触发事件对应的回调函数。最后再执行任务池里的所有函数即可。
// 事件监控->就绪事件处理->执行任务
void start()
{
while (true)
{
std::vector<Channel *> actives;
_poller.poll(&actives);
for (auto &channel : actives)
{
channel->handlerEvent();
}
runAllTasks();
}
}
那么Poller里监控的事件是什么事件呢?是由谁设置的事件呢?所以EventLoop还有两个接口是updateEvent和removeEvent,分别用来更新事件和移除事件,外部需要构造一个Channel对象,通过这两个接口来向Poller的epoll模型更新或者删除事件监控。
// 添加/修改描述符的事件监控
void updateEvent(Channel *channel)
{
_poller.updateEvent(channel);
}
// 移除描述符的监控
void removeEvent(Channel *channel)
{
_poller.removeEvent(channel);
}
至此,EventLoop与底层各个关联模块的逻辑就理清楚了,这里可以直接给出EventLoop的完整代码:
/// @brief 这是封装reactor操作的类,服务器实现的是多reactor多线程的IO模型 class EventLoop { public: void readEventFd() { uint64_t res = 0; // 将eventfd文件描述符的数据读取处理 // 读取到的数据我们并不关心,因为eventfd是为了唤醒可能因为没有事件就绪而阻塞在epoll_wait函数调用的线程 int readRes = read(_event_fd, &res, sizeof(res)); if (readRes < 0) { if (errno == EINTR || errno == EAGAIN) { return; } LOG("read eventfd error"); abort(); } } EventLoop() : _thread_id(std::this_thread::get_id()), _event_fd(createEventFd()), _event_channel(new Channel(this, _event_fd)), _timerWheel(this) { // 初始化eventfd文件描述符的可读事件回调函数,设置为readEventFd _event_channel->setReadAbleCallBack(std::bind(&EventLoop::readEventFd, this)); // 开启可读事件监控 _event_channel->startReadAbleEvent(); } static int createEventFd() { // 创建eventfd文件描述符,将该文件描述符设置为非阻塞 int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { LOG("eventfd error"); abort(); } return efd; } void weakupEventFd() { uint64_t val = 1; // 向eventfd文件描述符中写入数据,写入的数据也不重要 // 因为这个文件描述符已经被epoll监控起来了 // 只要有数据到来就会触发可读事件,那epoll_wait就不会阻塞了 // 这样就可以唤醒可能因为没有事件就绪而阻塞的线程 int writeRes = write(_event_fd, &val, sizeof(val)); if (writeRes < 0) { if (errno == EINTR) { return; } LOG("write eventfd error"); abort(); } } // 断言当前线程是否是eventLoop对应的线程 // 因为很多函数会触及对临界资源的修改,如果多线程去调用的话是会有线程安全问题的 // 可以加锁解决,但是加锁会很消耗资源,影响效率 // 所以使用将每一个eventLoop的操作都放在一个线程里,如果不是该线程就不能调用该eventLoop对象的函数 // 这样就不会有线程安全的问题了 void assertInLoop() { assert(_thread_id == std::this_thread::get_id()); } // 判断将要执行的任务是否在当前线程中,如果是则执行,如果不是则压入任务队列 void runInLoop(const Functor &callBack) { if (isInLoop()) { callBack(); return; } pushTaskQueue(callBack); } // 将任务压入任务队列 void pushTaskQueue(const Functor &callBack) { // 这里需要加锁保护_tasks的插入 // 因为_tasks是临界资源,这里有可能会出现当前线程 { std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(callBack); } weakupEventFd(); } // 用于判断当前线程是否是EventLoop对应的线程 bool isInLoop() { return (_thread_id == std::this_thread::get_id()); } // 添加/修改描述符的事件监控 void updateEvent(Channel *channel) { _poller.updateEvent(channel); } // 移除描述符的监控 void removeEvent(Channel *channel) { _poller.removeEvent(channel); } // 执行所有任务池中的任务 void runAllTasks() { std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); _tasks.swap(functor); } for (auto &f : functor) { f(); } } // 事件监控->就绪事件处理->执行任务 void start() { while (true) { std::vector<Channel *> actives; _poller.poll(&actives); for (auto &channel : actives) { channel->handlerEvent(); } runAllTasks(); } } void addTimer(uint64_t id, uint32_t delay, const TaskFunc &task) { _timerWheel.addTimer(id, delay, task); } void refreshTimer(uint64_t id) { _timerWheel.refreshTimer(id); } void cancelTimer(uint64_t id) { _timerWheel.cancelTimer(id); } bool hasTimer(uint64_t id) { return _timerWheel.hasTimer(id); } private: std::thread::id _thread_id; // 线程ID int _event_fd; // eventfd唤醒IO事件监控有可能导致的阻塞 Poller _poller; // 进行所有描述符的事件监控 std::vector<Functor> _tasks; // 任务池 std::mutex _mutex; // 实现任务池操作的线程安全 std::unique_ptr<Channel> _event_channel; // 这是管理eventfd的Channel对象 TimerWheel _timerWheel; };
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。