赞
踩
sylar作者在本站的地址为 这里,也可以查看 作者主页,也有视频教程可以 点击这里。此外,也可以看一下赵路强大佬的关于sylar协程模块的博客 点击这里,我本人在阅读sylar源码的时候也是参考了赵路强大佬的解析 可以点击这里。
各位看官也可以加我的qq和我讨论2511010742
sylar大人在协程调度模块中封装了epoll,对于每一个需要监听的文件描述符fd,都支持可读和可写事件。这部分操作是十分复杂的,需要读者对协程调度模块和epoll模型十分了解,接下来我会尽我所能向大家介绍清楚这部分内容。在协程调度模块中,当没有任务执行时就会在idle状态下忙等待,在本节中就利用了idle状态去做一些需要阻塞等待的任务,比如epoll_wait等。下面,我先介绍一下epoll的相关信息。
epoll 是 Linux 系统提供的一种事件通知机制,主要用于处理大量文件描述符的 I/O 事件。它是一种高效的 I/O 多路复用机制,相比于传统的 select 和 poll,epoll 在处理大量连接时有更好的性能。
1.1 创建 epoll 实例
int epfd = epoll_create(int size);
这个函数用处是创建一个 epoll 实例,size 本意为希望监听的文件描述符的数量,不用太在意它。
1.2 控制 epoll 上的事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
1.3 等待事件发生
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
2.1 eventpoll
epoll 在内核中主要使用两个结构体:struct eventpoll 和 struct epitem。这两个结构体用于管理 epoll 实例中的文件描述符和事件。eventpoll 里成员有很多,我们讲一些常用的。
struct eventpoll { spinlock_t lock; struct mutex mtx; struct file *file; struct wait_queue_head *wq; struct wait_queue_entry wait; wait_queue_t poll_wait; struct list_head rdllist; struct list_head ovflist; struct rb_root_cached rbr; struct list_head poll_siblings; struct list_head fasync_list; unsigned int ovflist_length; struct ep_pqueue **poll_table; struct user_struct *user; unsigned long gen; wait_queue_head_t proc_wait; struct wakeup_source __rcu *ws; unsigned int user_size; int wakeup_pipe; };
主要字段包括:
2.2 epitem
struct epitem 表示 epoll 实例中的一个事件项,它包含了与文件描述符相关的信息和事件状态。
struct epitem {
struct rb_node rbn;
struct list_head rdllink;
struct list_head fllink;
struct file *file;
struct eventpoll *ep;
struct wait_queue_head wq;
struct epoll_event event;
unsigned long last_wakeup;
spinlock_t lock;
};
主要字段包括:
这两个结构体是 epoll 内核实现中的核心数据结构,通过它们实现了高效的事件管理和处理机制。struct eventpoll 用于维护 epoll 实例的状态,而 struct epitem 用于表示每个文件描述符的事件状态。
epoll_event 结构体用于描述文件描述符上的事件,它是在 epoll 操作中用到的关键数据结构。以下是 epoll_event 结构体的定义:
struct epoll_event {
uint32_t events; // 事件类型,可以是 EPOLLIN、EPOLLOUT、EPOLLERR 等
epoll_data_t data; // 用户数据,可以携带额外信息
};
events:表示事件的类型,可以是以下几个宏的组合:
data:是一个联合体,用于携带额外的信息。epoll_data_t 的定义如下:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
epoll_data 可以用于携带用户数据,具体使用取决于应用的需要。如果用户程序希望在事件发生时携带一些额外的信息,可以通过设置 data 字段。sylar就使用了其中的 ptr 指针来保存事件上下文信息。
首先,通过 epoll_create 创建一个 epoll 实例,得到一个文件描述符,然后,通过 epoll_ctl 将需要监听的文件描述符注册到 epoll 实例中,并指定关注的事件类型。这个过程会将我们添加的文件描述符添加到 epoll 句柄的红黑树中,然后执行 epoll_wait 函数则会阻塞等待这些文件描述符所关心的事件发生,当有关注的事件发生时,epoll_wait 返回,需要处理的事件都会保存在 events 数组里,遍历并处理这些就绪的事件。
为了高效地管理已经就绪的事件项,epoll 使用双向链表维护一个就绪事件链表。当文件描述符上的事件发生时,对应的事件项会被从红黑树中找到,然后加入到就绪链表中等待用户程序处理。用户程序调用 epoll_wait 进行阻塞等待事件的发生。内核检查红黑树上的文件描述符,如果有就绪的文件描述符,将相应的事件项加入到就绪链表中。epoll_wait 返回时,将就绪链表上的事件项传递给用户程序,用户程序可以遍历链表处理已经就绪的事件。
简单介绍完之后,让我们进入今天的主题,读一下 sylar 这部分源码。
对于IO协程调度来说,我们只关心它的 fd、事件类型和对应的回调函数,其中 epoll 负责 fd 和事件之间的联系,回调函数则是需要协程调度去调度执行。sylar 建立了一个 FdContext 结构来保存对应的文件描述符,以及事件信息和回调函数。并将其保存在epoll_event的私有数据指针data.ptr中,当 wait 之后,遍历就绪事件就可以拿到这些信息,从而去执行相应的操作。
调度器在 idle 时会 epoll_wait 所有已经注册的 fd,返回时就可以拿到对应的 FdContext。然后将对应事件的回调函数加入到调度器的任务列表中,在 idle 退出后就可以调度这些回调函数了。下面我们看一下头文件信息。
class IOManager : public Scheduler, public TimerManager { public: typedef std::shared_ptr<IOManager> ptr; typedef RWMutex RWMutexType; // IO事件 enum Event { // 无事件 NONE = 0x0, // 读事件 READ = 0x1, // 写事件 WRITE = 0x4, }; private: // Socket事件上下文类 struct FdContext { typedef Mutex MutexType; // 事件上下文类 struct EventContext { // 这个事件执行的调度器 Scheduler* scheduler = nullptr; // 事件协程 Fiber::ptr fiber; // 事件回调函数 std::function<void()> cb; }; // 获取事件上下文类 EventContext& getContext(Event event); // 重新设置事件上下文 void resetContext(EventContext& ctx); // 触发事件 void triggerEvent(Event event); // 定义一读一写事件 EventContext read, write; // 事件关联的句柄 int fd = 0; // 当前描述符所关注事件 Event events = NONE; // 事件的锁 MutexType mutex; }; public: IOManager(size_t threads = 1, bool use_caller = true, const std::string& name = ""); ~IOManager(); /** * @brief 添加事件 * @param[in] fd socket句柄 * @param[in] event 事件类型 * @param[in] cb 事件回调函数 * @return 添加成功返回0,失败返回-1 */ int addEvent(int fd, Event event, std::function<void()> cb = nullptr); /** * @brief 删除事件 * @param[in] fd socket句柄 * @param[in] event 事件类型 * @attention 不会触发事件 */ bool delEvent(int fd, Event event); // 取消 fd上的某个事件 bool cancelEvent(int fd, Event event); // 取消 fd上的所有事件 bool cancelAll(int fd); // 返回当前IOManager static IOManager* GetThis(); protected: void tickle() override; bool stopping() override; void idle() override; void onTimerInsertedAtFront() override; void contextResize(size_t size); /** * @brief 判断是否可以停止 * @param[out] timeout 最近要出发的定时器事件间隔 * @return 返回是否可以停止 */ bool stopping(uint64_t& timeout); private: // epoll文件句柄 int m_epfd = 0; // pipe 句柄 int m_tickleFds[2]; // 当前等待执行的事件总数 std::atomic<size_t> m_pendingEventCount = {0}; // IOManager的锁 RWMutexType m_mutex; // socket事件上下文的容器 std::vector<FdContext*> m_fdContexts; };
可以看到,这里对于每一个文件描述符都关联了相关的事件,以及回调。换句话说就是每一个文件描述符 fd 都可以关注三种事件类型,读写以及无事件,每种事件都可以注册一个专属的回调函数,并且还支持删除和取消事件。
下面来阅读一下 sylar 关于这部分的函数定义,我们先来看一下主体部分。
IOManager::IOManager(size_t threads, bool use_caller, const std::string& name) :Scheduler(threads, use_caller, name) { std::cout << "[DEBUG] IOManager create" << std::endl; // 创建 epoll实例 m_epfd = epoll_create(5000); //SYLAR_ASSERT(m_epfd > 0); // 初始化管道 /* 这里 sylar使用管道提醒其他线程有任务来了 */ int rt = pipe(m_tickleFds); //SYLAR_ASSERT(!rt); // 为管道读端注册可读事件 epoll_event event; memset(&event, 0, sizeof(epoll_event)); // 监听事件,边沿触发 event.events = EPOLLIN | EPOLLET; event.data.fd = m_tickleFds[0]; rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK); //SYLAR_ASSERT(!rt); rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event); //SYLAR_ASSERT(!rt); contextResize(32); // 直接开启调度 start(); }
sylar 使用管道来通知有任务要调度
void IOManager::tickle() {
std::cout << "[INFO] IOManager::tickle" << std::endl;
if(!hasIdleThreads()) {
return;
}
// 向管道写段写入数据
int rt = write(m_tickleFds[1], "T", 1);
//SYLAR_ASSERT(rt == 1);
}
接下来是 idle 协程的重写了
void IOManager::idle() { std::cout << "[DEBUG] IOManager::idle" << std::endl; const uint64_t MAX_EVENTS = 256; // 动态分配一个包含MAX_EVENTS个epoll_event的数组,并初始化为零 epoll_event* events = new epoll_event[MAX_EVENTS](); // 使用std::shared_ptr来管理这个动态分配的数组的内存 std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr){ // lambda表达式,用于在shared_ptr释放内存时调用 // 删除动态分配的epoll_event数组 delete[] ptr; }); while(true) { uint64_t next_timeout = 0; if(stopping(next_timeout)) { break; } int rt = 0; do { static const int MAX_TIMEOUT = 3000; if(next_timeout != ~0ull) { next_timeout = (int)next_timeout > MAX_TIMEOUT ? MAX_TIMEOUT : next_timeout; } else { next_timeout = MAX_TIMEOUT; } rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout); if(rt < 0 && errno == EINTR) { // 在中断时不执行任何操作 } else { break; } } while(true); std::vector<std::function<void()> > cbs; // 填充过期的回调 listExpiredCb(cbs); if(!cbs.empty()) { schedule(cbs.begin(), cbs.end()); cbs.clear(); } // 上面这一部分关于定时器的内容目前不必理会 for(int i = 0; i < rt; ++i) { epoll_event& event = events[i]; if(event.data.fd == m_tickleFds[0]) { uint8_t dummy[256]; while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0); continue; // 处理一个特殊情况:如果事件对应于tickle管道的读取端 (m_tickleFds),则从中读取以清空管道。 } // 拿到对应 fd的上下文信息 FdContext* fd_ctx = (FdContext*)event.data.ptr; FdContext::MutexType::Lock lock(fd_ctx->mutex); if(event.events & (EPOLLERR | EPOLLHUP)) { // 如果事件是EPOLLERR | EPOLLHUP,就更新事件类型信息 event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events; } // 定义真实发生的事件 int real_events = NONE; if(event.events & EPOLLIN) { real_events |= READ; } if(event.events & EPOLLOUT) { real_events |= WRITE; } // 以上,确定实际发生的事件 if((fd_ctx->events & real_events) == NONE) { continue; } /* 下面这一行首先计算剩余的事件。fd_ctx->events 表示原始的关注事件集合, 而 real_events 表示实际发生的事件集合。 ~real_events 对 real_events 取反,即将所有位取反,然后使用按位与操作 &, 将原始关注事件集合中对应实际发生事件的位清零。 这样就得到了剩余的未发生事件的集合,存储在 left_events 变量中。 */ int left_events = (fd_ctx->events & ~real_events); int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; event.events = EPOLLET | left_events; /* 接下来,根据剩余的未发生事件集合 left_events 的情况, 确定是要修改(EPOLL_CTL_MOD)还是删除(EPOLL_CTL_DEL)事件。 如果 left_events 不为零,表示仍然有关注的事件,那么操作类型 op 就是 EPOLL_CTL_MOD; 否则,操作类型 op 就是 EPOLL_CTL_DEL。 最后,将 event.events 设置为 EPOLLET | left_events, 将事件设置为边缘触发模式(EPOLLET)以及剩余的未发生事件, 用于后续调用 epoll_ctl 进行修改或删除监听。 */ int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event); if(rt2) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):" << rt2 << " (" << errno << ") (" << strerror(errno) << ")"; continue; } if(real_events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if(real_events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } // 触发相关事件 } // 退出 idle协程 Fiber::ptr cur = Fiber::GetThis(); auto raw_ptr = cur.get(); cur.reset(); raw_ptr->swapOut(); //raw_ptr->back(); } std::cout << "[DEBUG] IOManager::idle over" << std::endl; }
接下来就是剩下的部分了,我会给出全部的代码,并尽我所能给出详细的注释。
// 获得当前事件上下文 IOManager::FdContext::EventContext& IOManager::FdContext::getContext(IOManager::Event event) { switch(event) { case IOManager::READ: return read; case IOManager::WRITE: return write; default: //SYLAR_ASSERT2(false, "getContext"); ; } // 一般是使用无效参数调用函数抛出异常 throw std::invalid_argument("getContext invalid event"); } // 重置事件上下文信息 void IOManager::FdContext::resetContext(EventContext& ctx) { ctx.scheduler = nullptr; ctx.fiber.reset(); ctx.cb = nullptr; } // 触发这些事件的回调函数 void IOManager::FdContext::triggerEvent(IOManager::Event event) { //SYLAR_ASSERT(events & event); events = (Event)(events & ~event); EventContext& ctx = getContext(event); // 将对应的回调或协程加入调度器 if(ctx.cb) { ctx.scheduler->schedule(&ctx.cb); } else { ctx.scheduler->schedule(&ctx.fiber); } ctx.scheduler = nullptr; return; } IOManager::IOManager(size_t threads, bool use_caller, const std::string& name) :Scheduler(threads, use_caller, name) { std::cout << "[DEBUG] IOManager create" << std::endl; m_epfd = epoll_create(5000); //SYLAR_ASSERT(m_epfd > 0); int rt = pipe(m_tickleFds); //SYLAR_ASSERT(!rt); epoll_event event; memset(&event, 0, sizeof(epoll_event)); // 监听事件,边沿触发 event.events = EPOLLIN | EPOLLET; event.data.fd = m_tickleFds[0]; rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK); //SYLAR_ASSERT(!rt); rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event); //SYLAR_ASSERT(!rt); contextResize(32); start(); } IOManager::~IOManager() { std::cout << "[INFO] ~IOManager" << std::endl; stop(); close(m_epfd); close(m_tickleFds[0]); close(m_tickleFds[1]); for(size_t i = 0; i < m_fdContexts.size(); ++i) { if(m_fdContexts[i]) { delete m_fdContexts[i]; } } } // 重置 fd上下文容器 void IOManager::contextResize(size_t size) { m_fdContexts.resize(size); for(size_t i = 0; i < m_fdContexts.size(); ++i) { if(!m_fdContexts[i]) { m_fdContexts[i] = new FdContext; m_fdContexts[i]->fd = i; } } } // 向某个文件描述符上添加事件 int IOManager::addEvent(int fd, Event event, std::function<void()> cb) { FdContext* fd_ctx = nullptr; RWMutexType::ReadLock lock(m_mutex); // 如果 m_fdContexts大小不够的话,就扩容 if((int)m_fdContexts.size() > fd) { fd_ctx = m_fdContexts[fd]; lock.unlock(); } else { lock.unlock(); RWMutexType::WriteLock lock2(m_mutex); contextResize(fd * 1.5); fd_ctx = m_fdContexts[fd]; } FdContext::MutexType::Lock lock2(fd_ctx->mutex); // 确定是添加事件(EPOLL_CTL_ADD)还是修改事件(EPOLL_CTL_MOD)。 // 如果 fd_ctx->events 不为零,说明文件描述符已经关注了其他事件,需要进行修改。 int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; epoll_event epevent; // 边缘触发 + 原有的事件 + 新加入的事件 epevent.events = EPOLLET | fd_ctx->events | event; // data.ptr 设置为 fd_ctx,用于在事件发生时追溯到对应的文件描述符上下文。 epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ") fd_ctx->events=" << (EPOLL_EVENTS)fd_ctx->events; return -1; } // 更新 IOManager 中待处理事件的计数,并更新文件描述符上下文的关注事件集合。 ++m_pendingEventCount; fd_ctx->events = (Event)(fd_ctx->events | event); FdContext::EventContext& event_ctx = fd_ctx->getContext(event); // 设置调度器 event_ctx.scheduler = Scheduler::GetThis(); if(cb) { event_ctx.cb.swap(cb); } else { event_ctx.fiber = Fiber::GetThis(); } return 0; } // 删除事件 bool IOManager::delEvent(int fd, Event event) { RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(!(fd_ctx->events & event)) { return false; } // 计算新的关注事件集合,通过将指定事件从原有集合中去除。 Event new_events = (Event)(fd_ctx->events & ~event); int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; epoll_event epevent; epevent.events = EPOLLET | new_events; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } --m_pendingEventCount; fd_ctx->events = new_events; FdContext::EventContext& event_ctx = fd_ctx->getContext(event); fd_ctx->resetContext(event_ctx); // 获取特定事件类型的上下文信息,并重置该上下文信息。重置的目的是清除可能存在的回调函数和协程等相关信息。 return true; } // 取消 fd的某个事件 bool IOManager::cancelEvent(int fd, Event event) { RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(!(fd_ctx->events & event)) { return false; } Event new_events = (Event)(fd_ctx->events & ~event); int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; epoll_event epevent; epevent.events = EPOLLET | new_events; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } // 触发事件,执行相应的回调函数或唤醒关联的协程。 fd_ctx->triggerEvent(event); --m_pendingEventCount; return true; } // 取消某个描述符上的所有事件 bool IOManager::cancelAll(int fd) { RWMutexType::ReadLock lock(m_mutex); if((int)m_fdContexts.size() <= fd) { return false; } FdContext* fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if(!(fd_ctx->events)) { return false; } int op = EPOLL_CTL_DEL; epoll_event epevent; epevent.events = 0; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if(rt) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } if(fd_ctx->events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if(fd_ctx->events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } // 如果文件描述符上下文关注了读或写事件,则触发相应的事件,并更新 IOManager 中待处理事件的计数。 //SYLAR_ASSERT(fd_ctx->events == 0); return true; } IOManager* IOManager::GetThis() { // dynamic_cast 是 C++ 中的一个运算符,用于在运行时执行安全的类型转换。 // 它主要用于进行基类和派生类之间的安全转型。 return dynamic_cast<IOManager*> (Scheduler::GetThis()); } void IOManager::tickle() { std::cout << "[INFO] IOManager::tickle" << std::endl; if(!hasIdleThreads()) { return; } int rt = write(m_tickleFds[1], "T", 1); //SYLAR_ASSERT(rt == 1); } bool IOManager::stopping(uint64_t& timeout) { timeout = getNextTimer(); return timeout == ~0ull && m_pendingEventCount == 0 && Scheduler::stopping(); } bool IOManager::stopping() { uint64_t timeout = 0; return stopping(timeout); } void IOManager::idle() { std::cout << "[DEBUG] IOManager::idle" << std::endl; const uint64_t MAX_EVENTS = 256; // 动态分配一个包含MAX_EVENTS个epoll_event的数组,并初始化为零 epoll_event* events = new epoll_event[MAX_EVENTS](); // 使用std::shared_ptr来管理这个动态分配的数组的内存 std::shared_ptr<epoll_event> shared_events(events, [](epoll_event* ptr){ // lambda表达式,用于在shared_ptr释放内存时调用 // 删除动态分配的epoll_event数组 delete[] ptr; }); while(true) { uint64_t next_timeout = 0; // if(SYLAR_UNLIKELY(stopping(next_timeout))) { // SYLAR_LOG_INFO(g_logger) << "name=" << getName() // << " idle stopping exit"; // break; // } if(stopping(next_timeout)) { break; } int rt = 0; do { static const int MAX_TIMEOUT = 3000; if(next_timeout != ~0ull) { next_timeout = (int)next_timeout > MAX_TIMEOUT ? MAX_TIMEOUT : next_timeout; } else { next_timeout = MAX_TIMEOUT; } rt = epoll_wait(m_epfd, events, MAX_EVENTS, (int)next_timeout); if(rt < 0 && errno == EINTR) { // 在中断时不执行任何操作 } else { break; } } while(true); std::vector<std::function<void()> > cbs; // 填充过期的回调 listExpiredCb(cbs); if(!cbs.empty()) { schedule(cbs.begin(), cbs.end()); cbs.clear(); } for(int i = 0; i < rt; ++i) { epoll_event& event = events[i]; if(event.data.fd == m_tickleFds[0]) { uint8_t dummy[256]; while(read(m_tickleFds[0], dummy, sizeof(dummy)) > 0); continue; // 处理一个特殊情况:如果事件对应于tickle管道的读取端 (m_tickleFds),则从中读取以清空管道。 } FdContext* fd_ctx = (FdContext*)event.data.ptr; FdContext::MutexType::Lock lock(fd_ctx->mutex); if(event.events & (EPOLLERR | EPOLLHUP)) { event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events; } int real_events = NONE; if(event.events & EPOLLIN) { real_events |= READ; } if(event.events & EPOLLOUT) { real_events |= WRITE; } // 以上,确定实际发生的事件 if((fd_ctx->events & real_events) == NONE) { continue; } /* 下面这一行首先计算剩余的事件。fd_ctx->events 表示原始的关注事件集合, 而 real_events 表示实际发生的事件集合。 ~real_events 对 real_events 取反,即将所有位取反,然后使用按位与操作 &, 将原始关注事件集合中对应实际发生事件的位清零。 这样就得到了剩余的未发生事件的集合,存储在 left_events 变量中。 */ int left_events = (fd_ctx->events & ~real_events); int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; event.events = EPOLLET | left_events; /* 接下来,根据剩余的未发生事件集合 left_events 的情况, 确定是要修改(EPOLL_CTL_MOD)还是删除(EPOLL_CTL_DEL)事件。 如果 left_events 不为零,表示仍然有关注的事件,那么操作类型 op 就是 EPOLL_CTL_MOD; 否则,操作类型 op 就是 EPOLL_CTL_DEL。 最后,将 event.events 设置为 EPOLLET | left_events, 将事件设置为边缘触发模式(EPOLLET)以及剩余的未发生事件, 用于后续调用 epoll_ctl 进行修改或删除监听。 */ int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event); if(rt2) { std::cout << "[ERROR] epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):" << rt2 << " (" << errno << ") (" << strerror(errno) << ")"; continue; } //SYLAR_LOG_INFO(g_logger) << " fd=" << fd_ctx->fd << " events=" << fd_ctx->events // << " real_events=" << real_events; if(real_events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if(real_events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } // 触发相关事件 } Fiber::ptr cur = Fiber::GetThis(); auto raw_ptr = cur.get(); cur.reset(); raw_ptr->swapOut(); //raw_ptr->back(); } std::cout << "[DEBUG] IOManager::idle over" << std::endl; } void IOManager::onTimerInsertedAtFront() { tickle(); }
总的来说这部分内容还是很抽象的,我的个人能力有限,只能表达出这些。关于这部分内容,我也跑了一些测试。测试效果也如预想一样,这次时间也有点些许的匆忙,这次测试结果,我会在下一节定时器中一起呈上来。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。