当前位置:   article > 正文

muduo库的模拟实现——Reactor部分

muduo库的模拟实现——Reactor部分

一、Channel模块

事实上Channel模块并不算是一个单独的模块,顶多算是一个单独的类。Channel模块和EventLoop模块、Poller模块是紧密关联的,其中通过EventLoop模块创建出来的对象就是一个Reactor,EventLoop对象里面有一个Poller对象,用来监控事件。至于监控什么样的事件,是可读事件还是可写事件,以及各种事件触发以后该调用什么回调函数去处理,这就是由EventLoop对象通过Channel来设置的了。

Channel模块一共管理五类事件,分别是:可读事件、可写事件、错误事件、连接断开事件以及任意事件。每种事件都要设置事件触发后的回调函数。所以Channel模块的成员变量需要这五类事件触发后的回调函数。另外还需要绑定一个EventLoop对象,也就是说一个Channel对象是绑定一个EventLoop对象的,因为设置好了事件以后,需要通过EventLoop对象的Poller对象将Channel事件监控起来。当然了,Channel对象监控的文件描述符以及需要监控的事件也必须要有,所以这就是Channel类的成员变量:

private:
    int _fd;                       // socket文件描述符
    uint32_t _events;              // 当前需要监控的事件
    uint32_t _revents;             // 当前连接触发的事件
    EventCallBack _read_callback;  // 可读事件被触发的回调函数
    EventCallBack _write_callback; // 可写事件被触发的回调函数
    EventCallBack _error_callback; // 错误事件被触发的回调函数
    EventCallBack _close_callback; // 连接断开事件被触发的回调函数
    EventCallBack _event_callback; // 任意事件被触发的回调函数
    EventLoop *_eventLoop;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

Channel类比较重要的一个接口是handlerEvent函数接口,这个接口是真正的事件处理接口,一旦事件触发了,就会调用这个函数。也就是说这个函数是被上层调用的,具体的调用逻辑是:当Poller模块监控到有事件触发时,就将触发事件的文件描述符对应的Channel对象返回,返回给EventLoop对象,EventLoop对象再调用该Channel对象的handlerEvent函数,去执行触发事件对应的回调函数。

    // 事件处理,一旦连接触发了事件,就调用这个函数
    void handlerEvent()
    {
        // EPOLLIN: 可读事件触发
        // EPOLLRDHUP: 对端关闭了连接,表示读关闭
        // ?思考为什么对端关闭连接要调用可读事件回调函数,猜测与缓冲区还有数据有关
        // EPOLLPRI:表示有紧急数据需要读取
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            // // 不管是什么事件,都要调用任意事件回调函数
            // if (_event_callback)
            // {
            //     _event_callback();
            // }
            if (_read_callback)
            {
                _read_callback();
            }
        }
        // EPOLLOUT:表示写事件触发
        if (_revents & EPOLLOUT)
        {
            if (_write_callback)
            {
                _write_callback();
            }
        }
        // EPOLLERR:表示出错
        else if (_revents & EPOLLERR)
        {
            if (_error_callback)
            {
                _error_callback();
            }
        }
        // EPOLLHUP:表示连接关闭
        else if (_revents & EPOLLHUP)
        {
            if (_close_callback)
            {
                _close_callback();
            }
        }
        // 不管是什么事件,都要调用任意事件回调函数
        if (_event_callback)
        {
            _event_callback();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

其它的实现都比较简单,就是在设置对应的事件以及事件触发的回调函数,这里就不过多介绍了,看代码就能看得懂,Channel类完整代码如下:

using EventCallBack = std::function<void()>;

class EventLoop;

/// @brief Channel是事件管理类,用来管理eventLoop对象所关心的所有事件
class Channel
{
public:
    /// @brief 构造函数
    /// @param eventLoop 需要传入EventLoop对象来初始化Channel,因为Channel管理的是EventLoop对象的事件,
    ///                  EventLoop对象就是一个reactor,EventLoop对象里面有一个Poller对象用来监控事件
    ///                  至于监控什么事件,这就是由EventLoop对象通过Channel来设置了,这就是三者之间的关系
    /// @param fd 还需要用文件描述符来初始化,因为必须要知道Channel要管理的事件是哪一个文件描述符的事件
    Channel(EventLoop *eventLoop, int fd)
        : _fd(fd), _events(0), _revents(0), _eventLoop(eventLoop)
    {
    }

    int getFd()
    {
        return _fd;
    }

    void setRevents(uint32_t events)
    {
        _revents = events;
    }

    // 当前是否监控了可读
    bool isReadAble()
    {
        return (_events & EPOLLIN);
    }

    // 当前是否监控了可写
    bool isWriteAble()
    {
        return (_events & EPOLLOUT);
    }

    // 启动读事件监控
    void startReadAbleEvent()
    {
        _events |= EPOLLIN;
        // 这里需要调用updateEvent函数接口,用来更新_events
        // 这个接口是Channel类实现的,但是这个接口其实调用的是eventLoop对象的updateEvent函数接口
        // 它的逻辑链是Channel更新了_event事件,就要交给Poller监控模块进行监控
        // 但是这不能跳过eventLoop对象,因为reactor才是操作Channel和Poller的主体
        // 所以必须调用eventLoop对象的更新事件函数接口,然后eventLoop对象再调用Poller类的更新事件接口
        // 这样就能将事件更新到Poller模块,实现新事件的监控
        updateEvent();
    }

    // 启动写事件监控
    void startWriteAbleEvent()
    {
        _events |= EPOLLOUT;
        updateEvent();
    }

    // 关闭读事件监控
    void closeReadAbleEvent()
    {
        _events &= ~EPOLLIN;
        updateEvent();
    }

    // 关闭写事件监控
    void closeWriteAbleEvent()
    {
        _events &= ~EPOLLOUT;
        updateEvent();
    }

    // 关闭所有事件监控
    void closeAllEvent()
    {
        _events = 0;
        updateEvent();
    }

    void updateEvent();

    // 移除监控
    void removeEvent();

    // 移除所有监控
    void removeAllEvents();

    // 设置可读事件回调函数
    // 这个回调函数由外界传入,当revent事件就绪时,会首先检查就绪的事件是什么类型的事件
    // 这里设置可读事件、可写事件等等的回调函数,目的是在对应事件就绪时,可以调用对应的回调函数去处理
    void setReadAbleCallBack(const EventCallBack &callBack)
    {
        _read_callback = callBack;
    }

    // 设置可写事件回调函数
    void setWriteAbleCallBack(const EventCallBack &callBack)
    {
        _write_callback = callBack;
    }

    // 设置错误事件回调函数
    void setErrorCallBack(const EventCallBack &callBack)
    {
        _error_callback = callBack;
    }

    // 设置连接关闭事件回调函数
    void setCloseCallBack(const EventCallBack &callBack)
    {
        _close_callback = callBack;
    }

    // 设置任意事件回调函数
    void setEventCallBack(const EventCallBack &callBack)
    {
        _event_callback = callBack;
    }

    // 事件处理,一旦连接触发了事件,就调用这个函数
    void handlerEvent()
    {
        // EPOLLIN: 可读事件触发
        // EPOLLRDHUP: 对端关闭了连接,表示读关闭
        // ?思考为什么对端关闭连接要调用可读事件回调函数,猜测与缓冲区还有数据有关
        // EPOLLPRI:表示有紧急数据需要读取
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            // // 不管是什么事件,都要调用任意事件回调函数
            // if (_event_callback)
            // {
            //     _event_callback();
            // }
            if (_read_callback)
            {
                _read_callback();
            }
        }
        // EPOLLOUT:表示写事件触发
        if (_revents & EPOLLOUT)
        {
            if (_write_callback)
            {
                _write_callback();
            }
        }
        // EPOLLERR:表示出错
        else if (_revents & EPOLLERR)
        {
            if (_error_callback)
            {
                _error_callback();
            }
        }
        // EPOLLHUP:表示连接关闭
        else if (_revents & EPOLLHUP)
        {
            if (_close_callback)
            {
                _close_callback();
            }
        }
        // 不管是什么事件,都要调用任意事件回调函数
        if (_event_callback)
        {
            _event_callback();
        }
    }

    uint32_t getEvent()
    {
        return _events;
    }

private:
    int _fd;                       // socket文件描述符
    uint32_t _events;              // 当前需要监控的事件
    uint32_t _revents;             // 当前连接触发的事件
    EventCallBack _read_callback;  // 可读事件被触发的回调函数
    EventCallBack _write_callback; // 可写事件被触发的回调函数
    EventCallBack _error_callback; // 错误事件被触发的回调函数
    EventCallBack _close_callback; // 连接断开事件被触发的回调函数
    EventCallBack _event_callback; // 任意事件被触发的回调函数
    EventLoop *_eventLoop;
};

void Channel::removeEvent()
{
    _eventLoop->removeEvent(this);
}

void Channel::updateEvent()
{
    _eventLoop->updateEvent(this);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197

二、Poller模块

Poller模块负责的是监控事件,事实上就是封装了epoll多路转接那一套操作,它的成员变量如下:这里可以简单介绍一下_channels这个哈希表结构,之所以需要这个结构,是因为EventLoop模块向Poller模块新增事件的时候,Poller模块也需要通过_channels这个结构将这些Channel对象管理起来,当Poller监控到事件触发的时候,可以直接从_channels里找到对应的Channel对象返回给上层,上层再直接调用Channel对象的handlerEvent函数即可。

private:
    int _epfd;                            // epoll模型对应的epollfd
    epoll_event _events[MAX_EPOLLEVENTS]; // 用来接收就绪事件的数组
    // 用来保存Channel对象的哈希表,key值是Channel对象管理的事件对应的文件描述符,value值是Channel对象
    // Poller类是与EventLoop类关联起来的,EventLoop对象里面有一个Poller对象
    // Poller对象里面管理的事件不止一个,管理的文件描述符也不止一个,同一个文件描述符下的不同事件通过Channel对象来管理
    // 不同的文件描述符对应的Channel对象就通过这个_channels成员变量来管理
    std::unordered_map<int, Channel *> _channels;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Poller模块的实现也比较简单,因为就是epoll那一套操作,这里简单介绍一下几个函数接口的作用即可。首先在Poller类的构造函数处调用epoll_create创建epoll模型,然后updateEvent函数是给外部调用来添加或者修改监控事件的,removeEvent则是移除监控事件,最后poll函数是外部调用来启动epoll监控的,这个函数会调用epoll_wait函数接口去等待事件就绪,有事件就绪以后就从_channels里面找到对应的Channel对象返回。Poller类的完整代码如下:

#define MAX_EPOLLEVENTS 1024

class Poller
{
public:
    /// @brief 构造函数,需要调用epoll_create创建epoll模型
    Poller()
    {
        _epfd = epoll_create(MAX_EPOLLEVENTS);
        if (_epfd < 0)
        {
            LOG("epoll_create error");
            abort();
        }
    }

    // 添加或修改监控事件
    void updateEvent(Channel *channel)
    {
        // 首先判断形参传递进来的Channel对象是否已经存在在_channels哈希表中
        // 如果存在就直接更新事件即可,如果不存在就先增加到哈希表中再更新事件
        bool isAddChannelRes = isAddChannel(channel);
        if (isAddChannelRes == false)
        {
            // 不存在则添加
            _channels.insert(std::make_pair(channel->getFd(), channel));
            if (!update(channel, EPOLL_CTL_ADD))
            {
                LOG("update error");
            }
        }
        else
        {
            if (!update(channel, EPOLL_CTL_MOD))
            {
                LOG("update error");
            }
        }
    }

    // 移除监控
    void removeEvent(Channel *channel)
    {
        // 首先要检查_channels哈希表中是否存在对应的channel对象,如果存在就要将其删除
        auto iter = _channels.find(channel->getFd());
        if (iter != _channels.end())
        {
            _channels.erase(iter);
        }
        // 然后更新事件
        if (!update(channel, EPOLL_CTL_DEL))
        {
            LOG("update error");
        }
    }

    // 开始监控,返回活跃连接
    void poll(std::vector<Channel *> *active)
    {
        // 调用epoll_wait函数接口,开始监控,就绪事件保存在_events数组中,返回值nfds表示就绪事件的个数
        int nfds = epoll_wait(_epfd, _events, MAX_EPOLLEVENTS, -1);
        if (nfds < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            LOG("epoll_wait error");
            abort();
        }
        // 遍历_events数组中的所有就绪事件
        for (int i = 0; i < nfds; i++)
        {
            // 先通过就绪事件的文件描述符到_channels哈希表中查找
            auto iter = _channels.find(_events[i].data.fd);
            // 必须保证能够在_channels哈希表中找到对应的channel对象,找不到是不合理的
            assert(iter != _channels.end());
            // 找到了以后拿到该文件描述符对应的channel对象,设置给对象的revent就绪事件
            iter->second->setRevents(_events[i].events);
            // 然后将就绪事件加入active活跃连接数组中返回给外界
            active->push_back(iter->second);
        }
    }

    void clearChannels()
    {
        _channels.clear();
    }

private:
    // 对epoll的直接操作
    // 更新epoll模型中的事件,这个事件通过形参的channel对象获取
    // 可以是新增事件、修改事件、删除事件,通过op决定
    bool update(Channel *channel, int op)
    {
        int fd = channel->getFd();
        epoll_event event;
        event.data.fd = fd;
        event.events = channel->getEvent();
        int epollCtlRes = epoll_ctl(_epfd, op, fd, &event);
        if (epollCtlRes < 0)
        {
            LOG("epoll_ctl error");
            return false;
        }
        return true;
    }

    // 判断一个Channel是否已经添加了事件监控
    bool isAddChannel(Channel *channel)
    {
        auto iter = _channels.find(channel->getFd());
        if (iter == _channels.end())
        {
            return false;
        }
        return true;
    }

private:
    int _epfd;                            // epoll模型对应的epollfd在这里插入代码片
    epoll_event _events[MAX_EPOLLEVENTS]; // 用来接收就绪事件的数组
    // 用来保存Channel对象的哈希表,key值是Channel对象管理的事件对应的文件描述符,value值是Channel对象
    // Poller类是与EventLoop类关联起来的,EventLoop对象里面有一个Poller对象
    // Poller对象里面管理的事件不止一个,管理的文件描述符也不止一个,同一个文件描述符下的不同事件通过Channel对象来管理
    // 不同的文件描述符对应的Channel对象就通过这个_channels成员变量来管理
    std::unordered_map<int, Channel *> _channels;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128

3.EventLoop模块

EventLoop类是封装Reactor操作的类,我们先来看一下EventLoop类的成员变量:

  1. _thread_id: 线程ID,由于我们要实现one loop one thread,即一个EventLoop对象绑定一个线程,一个线程创建一个EventLoop对象,所以我们需要一个线程ID来记录该EventLoop对象绑定的线程。
  2. _tasks: 每一个EventLoop对象都有一个任务池,只有那些不允许被其它线程调用的任务会被加入到EventLoop的任务池中执行,如果被其他线程调用则可能会出现线程安全的问题。muduo解决这些线程安全问题的方案并不是加锁,因为不想因为加锁释放锁而影响效率,而是采用了EventLoop中一个很重要的函数——runInLoop,也就是将这些任务与EventLoop对象绑定起来,限制这些任务只能在这个EventLoop对象中被执行,而EventLoop对象又是与线程绑定的,所以也就限制了这些任务只能被一个线程执行,其它线程无法执行,这样就不存在线程安全问题了,这其实就是one loop one thread的思想。
  3. _event_fd:eventfd是Linux内核提供的用来进行事件通知的文件描述符,它其实是内核维护的一个计数器。当用户向eventfd文件描述符写入数据时,eventfd的计数器就加一;当用户读取数据时,就会返回eventfd文件描述符里的值,这个值代表距离上一次读取数据,一共通知了多少次。muduo在这里使用eventfd机制,其实是想通过epoll模型将eventfd文件描述符监控起来,因为EventLoop对象会在有IO事件触发的时候才去执行任务池里的任务,但有些时候可能对方没有发数据过来,但有一个任务需要立即被处理,就有可能因为阻塞在等待epoll_wait事件就绪而导致任务无法处理。比如说有一个连接快要超时了,用户又刷新了连接,但并没有发送数据,这时为了不然服务器阻塞在等待epoll_wait的返回,我们就可以向eventfd文件描述符里写入一个数据,目的是通知epoll模型有事件到来,然后服务器从epoll_wait处返回,进而去执行刷新定时器的任务。这就是eventfd的用处。
  4. _timerWheel:该成员变量就是时间轮对象,每一个EventLoop对象都要有一个时间轮对象,因为每个时间轮对象必须与EventLoop对象绑定,这样在执行新增定时器、刷新定时器任务、取消定时器任务这些函数时不会出现线程安全问题。想象一下,如果所有线程共用一个时间轮对象,就会有线程安全的问题,比如有一个线程刚准备去刷新定时器任务,就被切换走了,新线程上了执行了定时器销毁的动作,那就出问题了,因为按理说这个定时器任务应该被刷新不应该现在被销毁的。所以为了避免这种线程不安全的问题,我们让每个EventLoop对象用自己的时间轮对象,由于runInLoop函数的作用,这些定时器操作的函数不会被其它线程调用,也就不会出现线程不安全的问题了。
private:
    std::thread::id _thread_id;              // 线程ID
    int _event_fd;                           // eventfd唤醒IO事件监控有可能导致的阻塞
    Poller _poller;                          // 进行所有描述符的事件监控
    std::vector<Functor> _tasks;             // 任务池
    std::mutex _mutex;                       // 实现任务池操作的线程安全
    std::unique_ptr<Channel> _event_channel; // 这是管理eventfd的Channel对象
    TimerWheel _timerWheel;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

接下来我们就要介绍一下EventLoop里很重要的一个函数——runInLoop函数,这个函数确保我们在不加锁的情况下也能保证线程安全,原因就是one loop one thread,我们将EventLoop对象与线程绑定起来了,而且一个线程只有一个EventLoop对象。runInLoop的实现非常简单,只需要传递进来需要执行的任务,这个任务其实就是一个执行函数,比如新增定时器函数、刷新定时任务函数、取消定时任务函数等等。runInLoop函数首先会判断当前执行的线程是否是EventLoop对象绑定的线程,如果是的话,就直接执行这个任务,如果不是的话就不能执行,不能让其它线程执行这个任务,否则会线程不安全,所以只能将其加入到任务池中等待执行。

    // 用于判断当前线程是否是EventLoop对应的线程
    bool isInLoop()
    {
        return (_thread_id == std::this_thread::get_id());
    }
    
    // 判断将要执行的任务是否在当前线程中,如果是则执行,如果不是则压入任务队列
    void runInLoop(const Functor &callBack)
    {
        if (isInLoop())
        {
            callBack();
            return;
        }
        pushTaskQueue(callBack);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

将任务加入到任务池之后,前面说了,为了防止连接没有发送数据而导致线程阻塞等待在epoll_wait处,所以需要利用eventfd立即唤醒线程。

    void weakupEventFd()
    {
        uint64_t val = 1;
        // 向eventfd文件描述符中写入数据,写入的数据也不重要
        // 因为这个文件描述符已经被epoll监控起来了
        // 只要有数据到来就会触发可读事件,那epoll_wait就不会阻塞了
        // 这样就可以唤醒可能因为没有事件就绪而阻塞的线程
        int writeRes = write(_event_fd, &val, sizeof(val));
        if (writeRes < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            LOG("write eventfd error");
            abort();
        }
    }

    // 将任务压入任务队列
    void pushTaskQueue(const Functor &callBack)
    {
        // 这里需要加锁保护_tasks的插入
        // 因为_tasks是临界资源,这里有可能会出现当前线程
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.push_back(callBack);
        }
        weakupEventFd();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

这就是runInLoop函数的逻辑,接下来介绍EventLoop另一个很重要的函数接口,这个接口就是外部通过EventLoop启动事件监控的接口,该接口的实现是,首先调用Poller的poll函数启动事件监控,这个函数会返回所有触发事件的Channel对象,通过这个Channel对象去调用handlerEvent函数,就能调用触发事件对应的回调函数。最后再执行任务池里的所有函数即可。

    // 事件监控->就绪事件处理->执行任务
    void start()
    {
        while (true)
        {
            std::vector<Channel *> actives;
            _poller.poll(&actives);
            for (auto &channel : actives)
            {
                channel->handlerEvent();
            }
            runAllTasks();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

那么Poller里监控的事件是什么事件呢?是由谁设置的事件呢?所以EventLoop还有两个接口是updateEvent和removeEvent,分别用来更新事件和移除事件,外部需要构造一个Channel对象,通过这两个接口来向Poller的epoll模型更新或者删除事件监控。

    // 添加/修改描述符的事件监控
    void updateEvent(Channel *channel)
    {
        _poller.updateEvent(channel);
    }

    // 移除描述符的监控
    void removeEvent(Channel *channel)
    {
        _poller.removeEvent(channel);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

至此,EventLoop与底层各个关联模块的逻辑就理清楚了,这里可以直接给出EventLoop的完整代码:

/// @brief 这是封装reactor操作的类,服务器实现的是多reactor多线程的IO模型
class EventLoop
{
public:
    void readEventFd()
    {
        uint64_t res = 0;
        // 将eventfd文件描述符的数据读取处理
        // 读取到的数据我们并不关心,因为eventfd是为了唤醒可能因为没有事件就绪而阻塞在epoll_wait函数调用的线程
        int readRes = read(_event_fd, &res, sizeof(res));
        if (readRes < 0)
        {
            if (errno == EINTR || errno == EAGAIN)
            {
                return;
            }
            LOG("read eventfd error");
            abort();
        }
    }

    EventLoop()
        : _thread_id(std::this_thread::get_id()),
          _event_fd(createEventFd()),
          _event_channel(new Channel(this, _event_fd)),
          _timerWheel(this)
    {
        // 初始化eventfd文件描述符的可读事件回调函数,设置为readEventFd
        _event_channel->setReadAbleCallBack(std::bind(&EventLoop::readEventFd, this));
        // 开启可读事件监控
        _event_channel->startReadAbleEvent();
    }

    static int createEventFd()
    {
        // 创建eventfd文件描述符,将该文件描述符设置为非阻塞
        int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (efd < 0)
        {
            LOG("eventfd error");
            abort();
        }
        return efd;
    }

    void weakupEventFd()
    {
        uint64_t val = 1;
        // 向eventfd文件描述符中写入数据,写入的数据也不重要
        // 因为这个文件描述符已经被epoll监控起来了
        // 只要有数据到来就会触发可读事件,那epoll_wait就不会阻塞了
        // 这样就可以唤醒可能因为没有事件就绪而阻塞的线程
        int writeRes = write(_event_fd, &val, sizeof(val));
        if (writeRes < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            LOG("write eventfd error");
            abort();
        }
    }

    // 断言当前线程是否是eventLoop对应的线程
    // 因为很多函数会触及对临界资源的修改,如果多线程去调用的话是会有线程安全问题的
    // 可以加锁解决,但是加锁会很消耗资源,影响效率
    // 所以使用将每一个eventLoop的操作都放在一个线程里,如果不是该线程就不能调用该eventLoop对象的函数
    // 这样就不会有线程安全的问题了
    void assertInLoop()
    {
        assert(_thread_id == std::this_thread::get_id());
    }

    // 判断将要执行的任务是否在当前线程中,如果是则执行,如果不是则压入任务队列
    void runInLoop(const Functor &callBack)
    {
        if (isInLoop())
        {
            callBack();
            return;
        }
        pushTaskQueue(callBack);
    }

    // 将任务压入任务队列
    void pushTaskQueue(const Functor &callBack)
    {
        // 这里需要加锁保护_tasks的插入
        // 因为_tasks是临界资源,这里有可能会出现当前线程
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.push_back(callBack);
        }
        weakupEventFd();
    }

    // 用于判断当前线程是否是EventLoop对应的线程
    bool isInLoop()
    {
        return (_thread_id == std::this_thread::get_id());
    }

    // 添加/修改描述符的事件监控
    void updateEvent(Channel *channel)
    {
        _poller.updateEvent(channel);
    }

    // 移除描述符的监控
    void removeEvent(Channel *channel)
    {
        _poller.removeEvent(channel);
    }

    // 执行所有任务池中的任务
    void runAllTasks()
    {
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            _tasks.swap(functor);
        }
        for (auto &f : functor)
        {
            f();
        }
    }

    // 事件监控->就绪事件处理->执行任务
    void start()
    {
        while (true)
        {
            std::vector<Channel *> actives;
            _poller.poll(&actives);
            for (auto &channel : actives)
            {
                channel->handlerEvent();
            }
            runAllTasks();
        }
    }

    void addTimer(uint64_t id, uint32_t delay, const TaskFunc &task)
    {
        _timerWheel.addTimer(id, delay, task);
    }

    void refreshTimer(uint64_t id)
    {
        _timerWheel.refreshTimer(id);
    }

    void cancelTimer(uint64_t id)
    {
        _timerWheel.cancelTimer(id);
    }

    bool hasTimer(uint64_t id)
    {
        return _timerWheel.hasTimer(id);
    }

private:
    std::thread::id _thread_id;              // 线程ID
    int _event_fd;                           // eventfd唤醒IO事件监控有可能导致的阻塞
    Poller _poller;                          // 进行所有描述符的事件监控
    std::vector<Functor> _tasks;             // 任务池
    std::mutex _mutex;                       // 实现任务池操作的线程安全
    std::unique_ptr<Channel> _event_channel; // 这是管理eventfd的Channel对象
    TimerWheel _timerWheel;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/65798
推荐阅读
相关标签
  

闽ICP备14008679号