当前位置:   article > 正文

63 epoll服务器 (ET模式)

63 epoll服务器 (ET模式)

基于LT模式修改,并加入前面的应用层计算器,实现稍完整的服务器功能
1.修改tcp_socket.hpp,新增非阻塞读和非阻塞写接口
2.对于accept返回的new_sock加上EPOLLET这样的选项

注意:此代码暂时未考虑listen_sock ET的情况,如果将listen_sock设为ET,则需要非阻塞轮询的方式accept,否则会导致同一时刻大量的客户端同时连接的情况,只能accept一次的问题

目录

  1. 整体结构
  2. 流程
  3. 运行示例
  4. 改进
  5. Reactor的理论

1. 整体结构

在这里插入图片描述
TpcServer服务器类,维护_listen套接字,用来获取连接和监听读写事件,map用套接字做键值,rev数组作为epoll_wait的输出参数
在这里插入图片描述

每一个连接都是一个session结构,包含读写缓冲区,ip和端口方便调试,读写错误的回调处理函数,指针回指服务器
在这里插入图片描述
nocopy类用来给某些类提供无法拷贝的功能
socket类,提供套接字的创建,监听等功能
协议类和计算类为前面章节的内容,用来对收到的数据处理返回结果
Epoll类提供epoll多路转接功能
comm类单独拎出来的设置非阻塞功能,因为很多地方都要用到
在这里插入图片描述

2. 流程

服务端

TpcServer.hpp
继承enable_shared_from_this类可以解决智能指针不能用this构造的问题,使用智能指针对象需要用shared_from_this()功能来获取
在这里插入图片描述
定义两个函数模板,构造时传入报文处理回调
在这里插入图片描述

在这里插入图片描述
Init函数初始化套接字,设置非阻塞,AddConnection函数添加listen套接字到关注事件中,绑定事件分配函数Accepter
在这里插入图片描述

AddConnection函数参数传入要设置的套接字,事件,三个函数,ip和端口用来调试

在这里插入图片描述

Connection
构造时传入sock初始化成员变量
在这里插入图片描述
作为连接管理类,需要管理每个连接的发送和接收缓冲区,所以提供存入缓冲区数据的和返回缓冲区内容的功能,再提供初始化自己成员函数的功能
在这里插入图片描述在这里插入图片描述在这里插入图片描述
TpcServer.hpp
继续说明AddConnection函数,这个函数的作用为每个连接初始化session,添加关注事件和管理,后面每个新链接都要用这个函数

构造一个Connection的临时对象
设置成员TpcServer和回调函数,ip和port
添加对象到map结构里,添加listen的事件,listen关注读

Accpeter连接管理器函数,参数是事件就绪的会话
在这里插入图片描述

不一定只有一个连接到来,所以需要循环读取。用accept获取就绪连接,设置非阻塞后,调用AddConnection函数加入会话管理,作为连接会话三个回调函数分别是读写错误

当错误码是EWOULDBLOCK的时候,说明已经获取完,退出循环,EINTR表示系统调用被信号中断,所以继续读取,其他情况退出

Recver数据读取函数,用来提供读取数据添加到Connection缓冲区的功能
在这里插入图片描述在这里插入图片描述

首先判断了连接的生命周期,如果消亡就退出。通过lock获取一个shared指针对象。
因为是ET模式,所以一次性需要读完所有数据,用recv函数,返回值n大于0表示读取到数据,添加到接收缓冲区中,等于0对方客户度退出,调用错误处理函数,小于0和上面一样判断是否读完,不是就走错误处理

最后将读取到的数据交给处理函数,所有报文情况都由它处理

Sender函数,获取连接的发送缓冲区发送,一次性将数据都发送,返回值大于0发送成功,将发送了的内容删除,判断如果发送缓冲区为空就退出。0表示没发送任何内容也退出,其他情况判断是否走错误处理
在这里插入图片描述

epoll/select/poll,因为写事件经常都是就绪的,发送缓冲区基本会有空间,如果设置了写关心,每次都会就绪,经常返回浪费cpu资源。所以对于读,需要设置常关心,写,按需求设置
当发送完后,检查缓冲区不为空,没发送完就对写事件开启关心,发送完将事件关闭

EnableEvent函数,设置套接字的读和写,根据传入的参数,判断有没有读和写,通过三木运算符,有就加入event,最后修改套接字的事件
在这里插入图片描述
Excepter函数,错误处理函数,遇到错误就是关闭这个链接。如果连接在读和写时发生错误,用这个函数。取消这个套接字的所有关心,关闭文件,map中移除
在这里插入图片描述

IsSafeConnection函数,检查链接是否合法,遍历map,是否存在
在这里插入图片描述

主逻辑
Loop函数,服务器的运行循环,传入超时时间,不断调用事件分配函数和打印连接函数
在这里插入图片描述

PrintConnection函数,打印出map中所有的fd,用来调试
在这里插入图片描述

Dispatcher函数,timeout等待时间是上一个函数传入。不断wait监听revs数组添加了的套接字,n会返回就绪的个数,取到套接字和事件,将异常转为读写统一处理。如果是读事件就绪,并且连接合法,就调用读取函数,写事件调用写函数

在这里插入图片描述

TpcServer.cc
全局的计算类对象,DefaultOnMessage函数是默认的报文处理函数,对报文的完整性判断,计算返回结果并发送
调用计算类的函数,判断返回的字符串是否为空,为空说明报文不完整或有错误。如果处理完成,将结果加入到发送缓冲区,用tcpserver对象发送

在这里插入图片描述
main函数创建svr对象,传入报文处理函数,启动服务器
在这里插入图片描述

客户端

客户端链接服务器,生成5个随机报文发送接收结果打印
是前面章节的网络计算器
网络计算器

3. 运行示例

在这里插入图片描述

4. 全

TcpServer.hpp

#pragma ocne
#include <iostream>
#include <memory>
#include <functional>
#include <unordered_map>
#include "Comm.hpp"
#include "log.hpp"
#include "Epoll.hpp"
#include "Socket.hpp"

class Connection;
class TpcServer;
using func_t = std::function<void(std::weak_ptr<Connection>)>; // 用户缓冲区处理函数模板
using except_func_t = std::function<void(std::weak_ptr<Connection>)>;
static const uint16_t port = 8000;
static const int g_buff_size = 128;
// 设置et
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENt_OUT = (EPOLLOUT | EPOLLET);

class Connection
{
public:
    Connection(int sock)
    {
        _sock = sock;
    }

    ~Connection()
    {

    }

    void AppendInbuff(const std::string& message)
    {
        _inbuff += message;
    }

    void AppendOutbuff(const std::string& message)
    {
        _outbuff += message;
    }

    int Fd()
    {
        return _sock;
    }

    std::string& Inbuffer()  // for debug
    {
        return _inbuff;
    }

    std::string& Outbuffer()  // for debug
    {
        return _outbuff;
    }

    void SetHandler(func_t recv_cb, func_t send_cb, except_func_t except_cb)
    {
        _recv_cb = recv_cb;
        _send_cb = send_cb;
        _except_cb = except_cb;
    }

    void SetWeakPtr(std::weak_ptr<TpcServer> tcp_setver_ptr)
    {
        _tcp_server_ptr = tcp_setver_ptr;
    }

private:
    int _sock;
    std::string _inbuff;  // string不能二进制, 需要vector
    std::string _outbuff;
public:
    func_t _recv_cb;
    func_t _send_cb;
    except_func_t _except_cb;

    std::weak_ptr<TpcServer> _tcp_server_ptr;  // 回指向服务器
    std::string _ip;
    uint16_t _port;
};

class TpcServer :public std::enable_shared_from_this<TpcServer>, public nocopy
{
    static const int num = 64;

public:
    TpcServer(func_t OnMessage)
        : _listensocket_ptr(new Sock())
        , _epoll_ptr(new Epoll())
        , _OnMessage(OnMessage)
        , _quit(true)
    {
    }

    void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, \
                        except_func_t except_cb, const std::string& ip = "0.0.0.0", 
                        uint16_t port = 0)
    {
        // 1. 给sock创建connection对象, 将lstensock添加到connection中
        // 同时,listeinsock和connection放入_connections
        std::shared_ptr<Connection> new_con(new Connection(sock));
        new_con->SetWeakPtr(shared_from_this());  // 返回当前对象的shared_ptr
        new_con->SetHandler(recv_cb, send_cb, except_cb);
        new_con->_ip = ip;
        new_con->_port = port;
        // 2. 添加到map
        _connections.insert(std::make_pair(sock, new_con));
        // 3. 添加对应事件
        _epoll_ptr->EpollUpdate(EPOLL_CTL_ADD, sock, event);

    }

    void Init()
    {
        _listensocket_ptr->Socket();
        int opt = 1;
        setsockopt(_listensocket_ptr->Fd(), SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, 
                   &opt, sizeof(opt));
        // et模式需要非阻塞
        SetNonBlock(_listensocket_ptr->Fd());
        lg.logmessage(info, "listensock create success:%d", _listensocket_ptr->Fd());

        _listensocket_ptr->Bind(port);
        _listensocket_ptr->Listen();
        // 关联connection
        AddConnection(_listensocket_ptr->Fd(), EVENT_IN,
                      std::bind(&TpcServer::Accepter, this, std::placeholders::_1), 
                      nullptr, nullptr);
    }

    void Accepter(std::weak_ptr<Connection> con)
    {
        // 获取强引用对象, 检查是否销毁
        auto connection = con.lock();
        // 获取新链接
        while (true)
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            // ::调用原生函数
            int sock = ::accept(connection->Fd(), (struct sockaddr *)&peer, &len);
            if (sock > 0)
            {
                char ipbuf[128];
                inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));
                uint16_t port = ntohs(peer.sin_port);
                lg.logmessage(info, "get a new clinet[%s:%d]:%d", ipbuf, port, sock);
                // 设置非阻塞
                SetNonBlock(sock);
                // 添加连接事件
                AddConnection(sock, EVENT_IN,\
                              std::bind(&TpcServer::Recver, this, std::placeholders::_1),\
                              std::bind(&TpcServer::Sender, this, std::placeholders::_1),\
                              std::bind(&TpcServer::Excepter, this, std::placeholders::_1),\
                              ipbuf, port);
            }
            else 
            {
                if (errno == EWOULDBLOCK)
                {
                    break;
                }
                else if (errno == EINTR)  // 信号中断
                {
                    continue;
                }
                else
                {
                    break;
                }
            }
        }
    }

    void Recver(std::weak_ptr<Connection> con)
    {
        if (con.expired())
            return;
        auto connec = con.lock();
        int sock = connec->Fd();

        while (true)
        {
            char buff[g_buff_size];
            memset(buff, 0, sizeof(buff));
            ssize_t n = recv(sock, buff, sizeof(buff) - 1, 0);  // 非阻塞读取
            if (n > 0)
            {
                connec->AppendInbuff(buff);
            }
            else if (n == 0)  // 错误处理
            {
                lg.logmessage(info, "sockfd:%d, client[%s:%d] quit", sock, connec->_ip.c_str(), connec->_port);
                connec->_except_cb(connec);
                return;
            }
            else
            {
                if (errno == EWOULDBLOCK)  // 读完
                {
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    lg.logmessage(warning, "sockfd:%d, client[%s:%d] recv error", sock, connec->_ip.c_str(), connec->_port);
                    connec->_except_cb(connec);
                    return;
                }
            }
        }

        // 数据有了, 不一定安全 1.检测 2.如果有完整报文,处理
        _OnMessage(connec);
    }

    void Sender(std::weak_ptr<Connection> con)
    {
        if (con.expired())
            return;
        auto connection = con.lock();
        auto &outbuff = connection->Outbuffer();
        while (true)
        {
            ssize_t n = send(connection->Fd(), outbuff.c_str(), outbuff.size(), 0);
            if (n > 0)
            {
                outbuff.erase(0, n);
                if (outbuff.empty())
                {
                    break;
                }
            }
            else if (n == 0)
            {
                return;  // 没有发
            }
            else
            {
                if (errno == EWOULDBLOCK)
                {
                    break;
                }
                else if (errno == EINTR)
                {
                    continue;
                }
                else
                {
                    lg.logmessage(info, "sockfd:%d, client[%s:%d] recv error", connection->Fd(), connection->_ip.c_str(), connection->_port);
                    connection->_except_cb(connection);
                    return;
                }
            }
        }

        // 没发完, 开启对写事件关心
        if (!outbuff.empty())
        {
            EnableEvent(connection->Fd(), true, true);
        }
        else
        {
            EnableEvent(connection->Fd(), true, false);
        }
    }

    void EnableEvent(int sock, bool readable, bool writeable)
    {
        uint32_t evnet = 0;
        evnet |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
        _epoll_ptr->EpollUpdate(EPOLL_CTL_MOD, sock, evnet);
    }

    void Excepter(std::weak_ptr<Connection> con)
    {
        if (con.expired())
            return;
        auto connection = con.lock();
        int fd = connection->Fd();
        lg.logmessage(warning, "Excepter handler:%d, client[%s:%d] excepter error", connection->Fd(), connection->_ip.c_str(), connection->_port);

        // 1. 移除关心
        _epoll_ptr->EpollUpdate(EPOLL_CTL_DEL, fd, 0);
        // 2. 关闭文件
        lg.logmessage(debug, "close %d...", fd);
        close(fd);
        // 3. unordered_map中移除
        lg.logmessage(debug, "remove connection %d", fd);
        _connections.erase(fd);
    }

    bool IsSafeConnection(int sock)
    {
        auto it = _connections.find(sock);
        if (it == _connections.end())
        {
            return false;
        }
        else
        {
            return true;
        }  
    }

    void Dispatcher(int timeout)
    {
        int n = _epoll_ptr->EpollWait(_revs, num, timeout);
        for (int i = 0; i < n; i++)
        {
            int sock = _revs[i].data.fd;
            uint32_t event = _revs[i].events;

            // 统一将异常转换为读写问题
            if (event & EPOLLERR)
            {
                event |= (EPOLLIN | EPOLLOUT);
            }

            if (event & EPOLLHUP)
            {
                event |= (EPOLLIN | EPOLLOUT);
            }

            if ((event & EPOLLIN) && IsSafeConnection(sock))
            {
                if (_connections[sock]->_recv_cb)
                {
                    _connections[sock]->_recv_cb(_connections[sock]);
                }
            }

            if ((event & EPOLLOUT) && IsSafeConnection(sock))
            {
                if (_connections[sock]->_send_cb)
                {
                    _connections[sock]->_send_cb(_connections[sock]);
                }
            }
        }
    }

    void Loop()
    {
        _quit = false;

        while (!_quit)
        {
            Dispatcher(3000);
            PrintConnection();
        }

        _quit = true;
    }

    void PrintConnection()
    {
        std::cout << "_connection list: ";
        for (auto &con: _connections)
        {
            std::cout << con.second->Fd() << ",";
        }

        std::cout << std::endl;
    }

private:
    std::shared_ptr<Sock> _listensocket_ptr;  // 监听socket, 可以移到外部
    std::shared_ptr<Epoll> _epoll_ptr;  // 内核
    std::unordered_map<int, std::shared_ptr<Connection>> _connections;
    struct epoll_event _revs[num];
    func_t _OnMessage;
    bool _quit;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380

TcpServer.cc

#include <memory>
#include "TpcServer.hpp"
#include "Calculator.hpp"

Calculator calculator;
void DefaultOnMessage(std::weak_ptr<Connection> con)
{
     if(con.expired()) return;
     auto connection_ptr = con.lock();
     std::cout << connection_ptr->Inbuffer() << std::endl;
     std::string response_str = calculator.Handler(connection_ptr->Inbuffer());  // 业务逻辑简单,如果复杂,需要拿到结果单独线程处理
     if (response_str.empty())
     {
         return;
     }

     lg.logmessage(debug, "%s", response_str.c_str());
     connection_ptr->AppendOutbuff(response_str);
     //connection_ptr->_send_cb(connection_ptr); 
     auto tcpserver = connection_ptr->_tcp_server_ptr.lock();  
     tcpserver->Sender(connection_ptr);
}

int main()
{
    std::shared_ptr<TpcServer> svr(new TpcServer(DefaultOnMessage));
    svr->Init();
    svr->Loop();
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

Clinet.cc

#include <time.h>
#include <unistd.h>
#include <assert.h>
#include "Socket.hpp"
#include "Protocol.hpp"

int main()
{
    srand(time(NULL));
    std::cout << "准备连接" << std::endl;
    uint16_t serverport = 8000;
    string serverip = "106.54.46.147";
    struct sockaddr_in server;
    bzero(&server, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_addr.s_addr = inet_addr(serverip.c_str());
    server.sin_port = htons(serverport);

    const string opers = "+-*/%=^";

    Sock socket;
    socket.Socket();
    bool r = socket.Connect(serverip, serverport);
    if (!r)
        return 1;

    std::cout << "连接成功, 开始发送数据" << std::endl;
    int cnt = 1;
    while (cnt <= 5)
    {
       std::cout << "=============第" << cnt << "次测试...." << "============" << std::endl;
       string package;
       int x = rand() % 100;
       int y = rand() % 100 + 1;
       char op = opers[rand() % opers.size()];

       Request req(x, y, op);
       req.DebugPrint();

       req.Serialize(&package);
       package = Encode(package);
       std::cout << package << std::endl;
       write(socket._sockfd, package.c_str(), package.size());

       char buff[1024];
       int n = read(socket._sockfd, buff, sizeof(buff));

       string inbuff_stream;
       if (n > 0)
       {
           buff[n] = 0;
           inbuff_stream += buff;
           std::cout << inbuff_stream << std::endl;

           string content;
           bool r = Decode(inbuff_stream, &content);
           assert(r);

           Response resp;
           r = resp.Deserialize(content);
           assert(r);

           resp.DebugPrint();
       }

        std::cout << "=======================================" << std::endl;
        sleep(1);
        cnt++;
    }

    socket.Close();
    return 0;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

Comm.hpp

#pragma once
#include <fcntl.h>
#include <unistd.h>
#include "Socket.hpp"

void SetNonBlock(int sock)
{
    int f1 = fcntl(sock, F_GETFL);
    if (f1 < 0)
    {
        exit(NONBLOCKERR);
    }
    fcntl(sock, F_SETFL, f1 | O_NONBLOCK);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

Epoll.hpp

#pragma once
#include <sys/epoll.h>
#include "nocopy.hpp"
#include "log.hpp"

class Epoll : public nocopy
{

    static const int size = 128;
    Log log;

public:
    Epoll()
    {
        _epfd = epoll_create(size);
        if (_epfd == -1)
        {
            log.logmessage(ERROR, "epoll create error:%s", strerror(errno));
        }
        else
        {
            log.logmessage(info, "epoll create success:%d", _epfd);
        }
    }

    int EpollWait(struct epoll_event revents[], int num, int timeout)
    {
        int n = epoll_wait(_epfd, revents, num, timeout);
        return n;
    }

    int EpollUpdate(int oper, int sock, uint32_t event)
    {
        int n = 0;
        if (oper == EPOLL_CTL_DEL)
        {
            n = epoll_ctl(_epfd, oper, sock, nullptr);
            if (n != 0)
            {
                log.logmessage(ERROR, "epoll_ctl delete error");
            }
        }
        else
        {
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock;

            n = epoll_ctl(_epfd, oper, sock, &ev);
            if (n != 0)
            {
                log.logmessage(ERROR, "epoll_ctl add error");
            }
        }
    }

    ~Epoll()
    {
        if (_epfd >= 0)
        {
            close(_epfd);
        }
    }

private:
    int _epfd;
    int _timeout{3000};
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

nocopy.hpp

#pragma once

class nocopy
{
public:
    nocopy(){}
    nocopy(const nocopy &) = delete;
    nocopy& operator=(const nocopy&) = delete;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Socket.hpp

#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include "log.hpp"

enum
{
    SOCKERR = 1,
    BINDERR,
    LISERR,
    NONBLOCKERR
};

Log lg;
const int backlog = 5;
class Sock
{
public:
    Sock()
    {

    }

    void Socket()
    {
        _sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (_sockfd < 0)
        {
            lg.logmessage(fatal, "socket error");
            exit(SOCKERR);
        }

        int opt = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); // 防止偶发性的服务器无法进行立即重启(tcp协议的时候再说)
    }

    void Bind(uint16_t port)
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_addr.s_addr = INADDR_ANY;
        local.sin_port = htons(port);

        int bret = bind(_sockfd, (const struct sockaddr*)&local, sizeof(local));
        if (bret < 0)
        {
            lg.logmessage(fatal, "bind error");
            exit(BINDERR);
        }
    }

    void Listen()
    {
        int lret = listen(_sockfd, backlog);
        if (lret < 0)
        {
            lg.logmessage(fatal, "listen error");
            exit(LISERR);
        }
    }
    
    int Accept(string* clientip, uint16_t* clientport)
    {
        sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int newfd = accept(_sockfd, (sockaddr*)&peer, &len);
        if (newfd < 0)
        {
            lg.logmessage(warning, "accept error");
            return -1;
        }

        char ipstr[64];
        inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));
        *clientip = ipstr;
        *clientport = ntohs(peer.sin_port);

        return newfd;
    }

    bool Connect(const string ip, const uint16_t port)
    {
        sockaddr_in peer;
        memset(&peer, 0, sizeof(peer));
        peer.sin_family = AF_INET;
        inet_pton(AF_INET, ip.c_str(), &peer.sin_addr);
        peer.sin_port = htons(port);

        int cret = connect(_sockfd, (const struct sockaddr*)&peer, sizeof(peer));
        if (cret == -1)
        {
            lg.logmessage(warning, "connect error");
            return false;
        }

        return true;
    }

    void Close()
    {
        close(_sockfd);
    }

    int Fd()
    {
        return _sockfd;
    }
    ~Sock()
    {

    }
public:
    int _sockfd;
};

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121

Protocol.hpp

#pragma once
#include <string>
#include <jsoncpp/json/json.h>

//#define MYSELF 1
//分隔符
const std::string black_sep = " ";
const std::string protocol_sep = "\n";

//解决报文外部格式
 //len\n正文\n
    std::string Encode(std::string& message)
    {
        std::string package = std::to_string(message.size());
        package += protocol_sep;
        package += message;
        package += protocol_sep;

        return package;
    }

    //len\na + b\n
    bool Decode(std::string& message, std::string* content)
    {
        std::size_t pos = message.find(protocol_sep);
        if (pos == std::string::npos)
        {
            return false;
        }

        std::string len_str = message.substr(0, pos);
        std::size_t len = std::stoi(len_str);
        std::size_t total_len = len_str.size() + len + 2;

        //检查长度
        if (message.size() < total_len)
        {
            return false;
        }

        *content = message.substr(pos + 1, len);
        //earse 移除报文
        message.erase(0, total_len);

        return true;
    }

class Request
{
public:
    Request(){}
    Request(int a, int b, char oper)
    {
        _num1 = a;
        _num2 = b;
        _op = oper;
    }
    //a + b
    bool Serialize(std::string* out)
    {
#ifdef MYSELF
        //构建报文有效载荷
        std::string str;
        str += std::to_string(_num1);
        str += black_sep;
        str += _op;
        str += black_sep;
        str += std::to_string(_num2);

        *out = str;
        return true;
#else
        Json::Value root;
        root["x"] = _num1;
        root["y"] = _num2;
        root["op"] = _op;
        Json::FastWriter w;
        *out = w.write(root);

        return true;
#endif
    }

    //a + b
    bool Deserialize(std::string& in)
    {
#ifdef MYSELF
        //a
        std::size_t left = in.find(black_sep);
        if (left == std::string::npos)
        {
            return false;         
        }
        std::string part_a = in.substr(0, left);
        // b
        std::size_t right = in.rfind(black_sep);
        if (right == std::string::npos)
        {
            return false;     
        }
        std::string part_b = in.substr(right + 1);
        //+
        if (left + 2 != right)
        {
            return false;
        }

        _op = in[left+1];
        _num1 = std::stoi(part_a);
        _num2 = std::stoi(part_b);

        return true;
#else      
        Json::Value root;
        Json::Reader r;
        r.parse(in, root);
        _num1 = root["x"].asInt();
        _num2 = root["y"].asInt();
        _op = root["op"].asInt();

        return true;
#endif
    }

    void DebugPrint()
    {
        std::cout << "新请求构建完成:" << _num1 << _op << _num2 << std::endl;
    }

public:
    int _num1;
    int _num2;
    char _op;
};

class Response
{
public:
    Response(){}
    Response(int res, int cod)
    {
        _result = res;
        _code = cod;
    }

    //1000 0
    bool Serialize(std::string* out)
    {
#ifdef MYSELF
        string str = std::to_string(_result);
        str += black_sep;
        str += std::to_string(_code);
        *out = str;

        return true;
#else 
        Json::Value root;
        root["res"] = _result;
        root["code"] = _code;
        Json::FastWriter w;
        *out = w.write(root);

        return true;
#endif
    }

    //1000 0
    bool Deserialize(std::string& in)
    {
#ifdef MYSELF

        std::size_t pos = in.find(black_sep);
        if (pos  == std::string::npos)
        {
            return false;
        }

        std::string left = in.substr(0, pos);
        std::string right = in.substr(pos + 1);

        _result = std::stoi(left);
        _code = std::stoi(right);

        return true;
#else 
        Json::Value root;
        Json::Reader r;
        r.parse(in, root);
        _result = root["res"].asInt();
        _code = root["code"].asInt();

        return true;
#endif
    }

    void DebugPrint()
    {
        std::cout << "结果响应完成,result:" << _result << ",code:" << _code << std::endl;
    }

public:
    int _result;
    int _code;   //0可信,否则表明对应的错误
};

#define MySelf 1


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208

Calcluator.hpp

#pragma once
#include "Protocol.hpp"

enum
{
    DIVZERO = 1,
    MODZERO,
    OTHER_OPER
};

class Calculator
{
public:
    Calculator()
    {

    }

    Response CalculatorHelp(const Request& req)
    {
        Response res(0, 0);
        switch (req._op)
        {
        case '+':
            res._result = req._num1 + req._num2;
            break;
        case '-':
            res._result = req._num1 - req._num2;
            break;
        case '*':
            res._result = req._num1 * req._num2;
            break;
        case '/':
            if (req._num2 == 0)
            {
                res._code = DIVZERO;
            }
            else
            {
                res._result = req._num1 / req._num2;
            }
            break;
        case '%':
            if (req._num2 == 0)
            {
                res._code = MODZERO;
            }
            else
            {
                res._result = req._num1 % req._num2;
                break;
            }
            
        default:
            res._code = OTHER_OPER;
            break;
        }

        return res;
    }

    std::string Handler(std::string& package)
    {
        std::string content;
        bool r = Decode(package, &content);
        if (!r)
        {
            return "";
        }
        Request req;
        r = req.Deserialize(content);
        if (!r)
        {
            return "";
        }
        req.DebugPrint();
        content = "";
        Response res = CalculatorHelp(req);
        res.DebugPrint();
        res.Serialize(&content);
        content = Encode(content); // len\n正文\n

        return content;
    }

    ~Calculator()
    {

    }
};


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92

Log.hpp

#pragma once
#include <stdarg.h>
#include <iostream>
#include <stdio.h>
#include <cstring>
#include <time.h>
#include <cerrno>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

using namespace std;

#define info 0
#define debug 1
#define warning 2
#define ERROR 3
#define fatal 4

#define screen 1
#define onefile 2
#define classfile 3

#define PATH "log.txt"

class Log
{
public:

    Log(int style = screen)
    {
        printstyle = style;
        dir = "log/";
    }

    void enable(int method)
    {
        printstyle = method;
    }

    const char *leveltostring(int level)
    {
        switch (level)
        {
        case 0:
            return "info";
            break;
        case 1:
            return "debug";
            break;
        case 2:
            return "warning";
            break;
        case 3:
            return "error";
            break;
        case 4:
            return "fatal";
            break;
        default:
            return "none";
            break;
        }
    }

    void printlog(int level, const string &logtxt)
    {
        switch (printstyle)
        {
        case screen:
            cout << logtxt;
            break;
        case onefile:
            printonefile(PATH, logtxt);
            break;
        case classfile:
            printclassfile(level, logtxt);
            break;
        }
    }

    void logmessage(int level, const char *format, ...)
    {
        time_t t = time(0);
        tm *ctime = localtime(&t);
        char leftbuff[1024];
        sprintf(leftbuff, "[%s]%d-%d-%d %d:%d:%d:", leveltostring(level), ctime->tm_year + 1900,
                ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour, ctime->tm_min, ctime->tm_sec);

        char rightbuff[1024];
        va_list s;
        va_start(s, format);
        vsprintf(rightbuff, format, s);
        va_end(s);
        char logtext[2048];
        sprintf(logtext, "%s %s\n", leftbuff, rightbuff);
        //printf(logtext);
        printlog(level, logtext);
    }
 

    void printonefile(const string& logname, const string& logtxt)
    {
        int fd = open(logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);  
        if (fd < 0)
        {
            return;
        }

        write(fd, logtxt.c_str(), logtxt.size());
        close(fd);
    }

    void printclassfile(int level, const string &logtxt)
    {
        //log.txt.info
        string filename = dir + PATH;
        filename += ".";
        filename += leveltostring(level);
        printonefile(filename, logtxt);
    }

    ~Log(){};

private:
    int printstyle;
    string dir; //分类日志,放入目录中
};

// int sum(int n, ...)
// {
//     int sum = 0;
//     va_list s;
//     va_start(s, n);

//     while (n)
//     {
//         sum = sum + va_arg(s, int);
//         n--;
//     }

//     return sum;
// }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144

CMakeLists.txt

cmake_minimum_required(VERSION 2.8.12.2)
project(TpcServer)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_CXX_STANDARD_REQUIRED ON) # 确保必须使用指定的标准

add_executable(TpcServer TpcServer.cc)
target_link_libraries(TpcServer jsoncpp)

add_executable(Client Client.cc)
target_link_libraries(Client jsoncpp)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

5. 改进

listen分离

将TpcServer里的listen套接字提出来单独成一个类,将Accpeter获取连接的函数放到这个类,通过AddCpnnection函数加入关注,这个类就做listen的连接管理。这样做的好处是可以将listen连接也作为一个正常连接统一处理
在这里插入图片描述

后续的设计可以加入多线程,主线程维护vector,里面是获得的连接fd,线程之间通信,争取这个fd会话,每一个线程也是一个Ractor,也可以负载均衡式的分配连接。这样就可以加入多线程,同时每个连接的报文内容处理如果比较复杂,可以交由线程池来处理,只需要拿到结果
这种一个连接一个reacotr,叫one thread one loop

连接管理

对于一些不活跃的连接需要处理。使用一个定时器类,里面保存一个最小堆,存每个连接的超时时间,可以回指connection和tpcserver做更多开发。在主循环每次事件分配后可以做一些其他事情,就是查询超时连接,如果有可以走错误处理,关闭这个链接。同时,事件分配的等待时间可以设置为堆顶的超时时间

6. Reactor的理论

它是一个半同步半异步的模型,类似于打地鼠,监测哪个链接有事件就处理哪个。同步体现在事件的就绪是需要等,异步体现在回调函数,如果不想自己做,可以交由线程处理,比如报文的处理可以由线程来,直接取得结果。这种叫反应堆

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/928025
推荐阅读
相关标签
  

闽ICP备14008679号