当前位置:   article > 正文

C++高并发网络架构与实现——第六篇_c++客户端服务器架构

c++客户端服务器架构

目录

一,将客户端改为多线程

二,为服务器添加多线程处理消息


今日实现任务

  1. 将客户端改为多线程
  2. 为服务器添加多线程处理消息
  3. 添加客户端退出事件,并且对代码进行注解,补充,代码调整
  4. 实现阶段目标,一秒发送200万包,能够稳定处理,并且对代码也进行了优化等

1,客户端架构

即启动多个线程,然后在每个线程中建立客户端,然后与服务器进行连接,进行发送消息的活动。

eg:

之前的方法是在主线程中建立1000个客户端,然后进行连接和发送消息。

现在创立四个线程,然后将1000个客户端分散到四个线程中,每个线程处理250个从而减轻压力,并且提升速度。

2,服务器架构

启动多个线程,在客户端连接之后,将新连接的客户端分配到数量比较少的线程中,然后在每个线程中与客户端进行接收消息的活动。

eg:

之前的方法是在主线程中处理连接后的1000个客户端,然后进行连接和发送消息。

现在创立四个线程,然后将连接的1000个客户端分散到四个线程中,每个线程处理250个达到减轻压力,并且提升速度的目的。

3,整体架构

现在还有待提升的地方就是服务器连接客户端的过程,提升一秒接收客户端的连接数量。

一,将客户端改为多线程

  • EasyTcpClient.hpp
  1. #include "EasyTcpClient.hpp"
  2. #include<thread>
  3. #include<mutex>
  4. bool g_bRun = true;
  5. mutex _mutexx;
  6. void cmdThread()//(EasyTcpClient* client)
  7. {
  8. while (1)
  9. {
  10. char cmdBuf[256] = {};
  11. cin >> cmdBuf;
  12. if (strcmp(cmdBuf, "exit") == 0){
  13. g_bRun = false;
  14. cout << "退出子线程" << endl;
  15. return;
  16. }
  17. else{
  18. cout << "不支持的命令" << endl;
  19. }
  20. }
  21. }
  22. //客户端数量
  23. const int cCount = 100;
  24. //发送线程数量
  25. const int tCount = 4;
  26. //客户端数组
  27. EasyTcpClient* client[cCount];
  28. void sendThread(int id)
  29. {
  30. {
  31. lock_guard<mutex> lock(_mutexx);
  32. cout << "thread " << id << " start" << endl;
  33. }
  34. //四个线程ID
  35. int c = cCount / tCount;
  36. int begin = (id - 1)*c;
  37. int end = id*c;
  38. for (int i = begin; i < end; i++)
  39. {
  40. client[i] = new EasyTcpClient();
  41. }
  42. for (int i = begin; i < end; i++)
  43. {
  44. client[i]->Connect((char *)"127.0.0.1", 4567); //192.168.247.128
  45. }
  46. cout << "thread=" << id << " Connect=" << begin << " end= " << end << endl;
  47. chrono::milliseconds t(3000);
  48. this_thread::sleep_for(t);
  49. Login login[1];
  50. for (int i = 0; i < 10; i++)
  51. {
  52. strcpy(login[i].userName, "lyd");
  53. strcpy(login[i].PassWord, "lydmm");
  54. }
  55. const int nLen = sizeof(login);
  56. int _true = 1;
  57. while (g_bRun && _true)
  58. {
  59. for (int i = begin; i < end; i++)
  60. {
  61. if (client[i]->SendData(login,nLen) == SOCKET_ERROR)
  62. {
  63. _true = 0;
  64. break;
  65. }
  66. client[i]->OnRun();
  67. }
  68. }
  69. for (int i = begin; i < end; i++)
  70. {
  71. delete client[i];
  72. client[i]->Close();
  73. }
  74. g_bRun = false;
  75. cout << "thread " << id << " exit" << endl;
  76. }
  77. int main()
  78. {
  79. //启动UI线程
  80. thread t1(cmdThread); //启动线程函数
  81. t1.detach();
  82. //启动发送线程
  83. for (int n = 0; n < tCount; n++)
  84. {
  85. thread t1(sendThread, n + 1);
  86. t1.detach();
  87. }
  88. while (g_bRun)
  89. {
  90. Sleep(100);
  91. }
  92. cout << "已退出" << endl;
  93. system("pause");
  94. return 0;
  95. }
  • MessageHeader.hpp
  1. enum CMD
  2. {
  3. CMD_LOGIN, //登入
  4. CMD_LOGIN_RESULT,
  5. CMD_LOGOUT, //登出
  6. CMD_LOGOUT_RESULT,
  7. CMD_NEW_USER_JOIN, //新的用户加入
  8. CMD_ERROR, //错误
  9. };
  10. struct DataHeader
  11. {
  12. DataHeader()
  13. {
  14. dataLength = sizeof(DataHeader);
  15. cmd = CMD_ERROR;
  16. }
  17. short dataLength;
  18. short cmd;
  19. };
  20. //匹配四个消息结构体
  21. struct Login : public DataHeader
  22. {
  23. Login()
  24. {
  25. dataLength = sizeof(Login);
  26. cmd = CMD_LOGIN;
  27. }
  28. char userName[32];
  29. char PassWord[32];
  30. //char data[932];
  31. };
  32. struct LoginResult : public DataHeader
  33. {
  34. LoginResult()
  35. {
  36. dataLength = sizeof(LoginResult);
  37. cmd = CMD_LOGIN_RESULT;
  38. result = 0;
  39. }
  40. int result;
  41. //char data[992];
  42. };
  43. struct Logout : public DataHeader
  44. {
  45. Logout()
  46. {
  47. dataLength = sizeof(Logout);
  48. cmd = CMD_LOGOUT;
  49. }
  50. char userName[32];
  51. };
  52. struct LogoutResult : public DataHeader
  53. {
  54. LogoutResult()
  55. {
  56. dataLength = sizeof(LogoutResult);
  57. cmd = CMD_LOGOUT_RESULT;
  58. result = 0;
  59. }
  60. int result;
  61. };
  62. struct NewUserJoin :public DataHeader
  63. {
  64. NewUserJoin()
  65. {
  66. dataLength = sizeof(NewUserJoin);
  67. cmd = CMD_NEW_USER_JOIN;
  68. sock = 0;
  69. }
  70. int sock;
  71. };
  • client.cpp
  1. #include "EasyTcpClient.hpp"
  2. #include<thread>
  3. #include<mutex>
  4. bool g_bRun = true;
  5. mutex _mutexx;
  6. void cmdThread()//(EasyTcpClient* client)
  7. {
  8. while (1)
  9. {
  10. char cmdBuf[256] = {};
  11. cin >> cmdBuf;
  12. if (strcmp(cmdBuf, "exit") == 0){
  13. g_bRun = false;
  14. cout << "退出子线程" << endl;
  15. return;
  16. }
  17. else{
  18. cout << "不支持的命令" << endl;
  19. }
  20. }
  21. }
  22. //客户端数量
  23. const int cCount = 100;
  24. //发送线程数量
  25. const int tCount = 4;
  26. //客户端数组
  27. EasyTcpClient* client[cCount];
  28. void sendThread(int id)
  29. {
  30. {
  31. lock_guard<mutex> lock(_mutexx);
  32. cout << "thread " << id << " start" << endl;
  33. }
  34. //四个线程ID
  35. int c = cCount / tCount;
  36. int begin = (id - 1)*c;
  37. int end = id*c;
  38. for (int i = begin; i < end; i++)
  39. {
  40. client[i] = new EasyTcpClient();
  41. }
  42. for (int i = begin; i < end; i++)
  43. {
  44. client[i]->Connect((char *)"127.0.0.1", 4567); //192.168.247.128
  45. }
  46. cout << "thread=" << id << " Connect=" << begin << " end= " << end << endl;
  47. chrono::milliseconds t(3000);
  48. this_thread::sleep_for(t);
  49. Login login[1];
  50. for (int i = 0; i < 10; i++)
  51. {
  52. strcpy(login[i].userName, "lyd");
  53. strcpy(login[i].PassWord, "lydmm");
  54. }
  55. const int nLen = sizeof(login);
  56. int _true = 1;
  57. while (g_bRun && _true)
  58. {
  59. for (int i = begin; i < end; i++)
  60. {
  61. if (client[i]->SendData(login,nLen) == SOCKET_ERROR)
  62. {
  63. _true = 0;
  64. break;
  65. }
  66. client[i]->OnRun();
  67. }
  68. }
  69. for (int i = begin; i < end; i++)
  70. {
  71. delete client[i];
  72. client[i]->Close();
  73. }
  74. g_bRun = false;
  75. cout << "thread " << id << " exit" << endl;
  76. }
  77. int main()
  78. {
  79. //启动UI线程
  80. thread t1(cmdThread); //启动线程函数
  81. t1.detach();
  82. //启动发送线程
  83. for (int n = 0; n < tCount; n++)
  84. {
  85. thread t1(sendThread, n + 1);
  86. t1.detach();
  87. }
  88. while (g_bRun)
  89. {
  90. Sleep(100);
  91. }
  92. cout << "已退出" << endl;
  93. system("pause");
  94. return 0;
  95. }

二,为服务器添加多线程处理消息

  • CELLTimestamp.hpp
  1. #ifndef CELLTimestamp_hpp_
  2. #define CELLTimestamp_hpp_
  3. #pragma once
  4. #include<chrono>
  5. using namespace std::chrono;
  6. class CELLTimestamp
  7. {
  8. public:
  9. CELLTimestamp()
  10. {
  11. update();
  12. }
  13. ~CELLTimestamp()
  14. {
  15. }
  16. void update()
  17. {
  18. _begin = high_resolution_clock::now();
  19. }
  20. //获取当前秒
  21. double getElapsedSecond()
  22. {
  23. return this->getElapsedTimeInMicroSec() * 0.000001;
  24. }
  25. //获取毫秒
  26. double getElapsedTimeInMilliSec()
  27. {
  28. return this->getElapsedTimeInMicroSec() * 0.001;
  29. }
  30. //获取微秒
  31. long long getElapsedTimeInMicroSec()
  32. {
  33. return duration_cast<microseconds>(high_resolution_clock::now() - _begin).count();
  34. }
  35. protected:
  36. time_point<high_resolution_clock> _begin;
  37. };
  38. #endif
  • EasyTcpServer.hpp
  1. #ifndef _EasyTcpServer_hpp_
  2. #define _EasyTcpServer_hpp_
  3. #ifdef _WIN32
  4. #define FD_SETSIZE 10240
  5. #define WIN32_LEAN_AND_MEAN
  6. #include<windows.h>
  7. #include<Winsock2.h>
  8. #pragma comment(lib,"ws2_32.lib")
  9. #else
  10. #include<unistd.h>
  11. #include<arpa/inet.h>
  12. #include<string.h>
  13. #define SOCKET int
  14. #define INVALID_SOCKET (SOCKET)(~0)
  15. #define SOCKET_ERROR (-1)
  16. #endif
  17. #include<iostream>
  18. #include<vector>
  19. #include<thread>
  20. #include<mutex>
  21. #include<atomic>
  22. #include"MessageHeader.hpp"
  23. #include"CELLTimestamp.hpp"
  24. using namespace std;
  25. //缓冲区最小单元大小
  26. #ifndef RECV_BUFF_SIZE
  27. #define RECV_BUFF_SIZE 10240
  28. #endif
  29. //#define _CellServer_THREAD_COUNT 4
  30. //客户端数据类型
  31. class ClientSocket
  32. {
  33. public:
  34. ClientSocket(SOCKET sockfd = INVALID_SOCKET)
  35. {
  36. _sockfd = sockfd;
  37. memset(_szMsgBuf, 0, sizeof(_szMsgBuf));
  38. _lastPos = 0;
  39. }
  40. SOCKET sockfd()
  41. {
  42. return _sockfd;
  43. }
  44. char *msgBuf()
  45. {
  46. return _szMsgBuf;
  47. }
  48. int getLastPos()
  49. {
  50. return _lastPos;
  51. }
  52. void setLastPos(int pos)
  53. {
  54. _lastPos = pos;
  55. }
  56. //发送数据
  57. int SendData(DataHeader *header)
  58. {
  59. if (header)
  60. {
  61. return (int)send(_sockfd, (const char *)header, header->dataLength, 0);
  62. }
  63. return SOCKET_ERROR;
  64. }
  65. private:
  66. SOCKET _sockfd;
  67. //第二缓冲区,消息缓冲区
  68. char _szMsgBuf[RECV_BUFF_SIZE * 10];
  69. //消息缓冲区的数据尾部位置
  70. int _lastPos = 0;
  71. };
  72. //网络事件接口
  73. class INetEvent
  74. {
  75. public:
  76. //客户端加入事件
  77. virtual void OnNetJoin(ClientSocket *pClient) = 0;
  78. //客户端离开事件
  79. virtual void OnNetLeave(ClientSocket *pClient) = 0;
  80. //客户端消息事件
  81. virtual void OnNetMsg(ClientSocket *pClient, DataHeader *header) = 0;
  82. private:
  83. };
  84. class CellServer
  85. {
  86. public:
  87. CellServer(SOCKET sock = INVALID_SOCKET)
  88. {
  89. _sock = sock;
  90. _pNetEvent = nullptr;
  91. }
  92. ~CellServer()
  93. {
  94. Close();
  95. _sock = INVALID_SOCKET;
  96. }
  97. //是否工作中
  98. bool isRun()
  99. {
  100. return _sock != INVALID_SOCKET;
  101. }
  102. void setEventObj(INetEvent *event)
  103. {
  104. _pNetEvent = event;
  105. }
  106. //关闭Socket
  107. void Close()
  108. {
  109. if (_sock != INVALID_SOCKET)
  110. {
  111. #ifdef _WIN32
  112. for (int n = (int)_clients.size() - 1; n >= 0; n--)
  113. {
  114. closesocket(_clients[n]->sockfd());
  115. delete _clients[n];
  116. }
  117. //关闭套接字closesocket
  118. closesocket(_sock);
  119. //-----------
  120. //清除Windows socket环境
  121. //WSACleanup();
  122. #else
  123. for (int n = (int)_clients.size() - 1; n >= 0; n--)
  124. {
  125. close(_clients[n]->sockfd());
  126. delete _clients[n];
  127. }
  128. //关闭套接字closesocket
  129. close(_sock);
  130. #endif
  131. _clients.clear();
  132. }
  133. }
  134. //int mun = 0;
  135. //处理网络消息
  136. bool OnRun()
  137. {
  138. while (isRun())
  139. {
  140. //从缓冲区中取出客户数据
  141. if (_clientsBuff.size() >0)
  142. {
  143. lock_guard<mutex> lock(_mutex);
  144. for (auto pClient : _clientsBuff)
  145. {
  146. _clients.push_back(pClient);
  147. }
  148. _clientsBuff.clear();
  149. }
  150. //如果没有需要处理的客户端就跳过
  151. if (_clients.empty())
  152. {
  153. chrono::milliseconds t(1);
  154. this_thread::sleep_for(t);
  155. continue;
  156. //return true;
  157. }
  158. //伯克利套接字
  159. fd_set fdRead;
  160. //fd_set fdWrite;
  161. //fd_set fdExp;
  162. //清理集合
  163. FD_ZERO(&fdRead);
  164. //FD_ZERO(&fdWrite);
  165. //FD_ZERO(&fdExp);
  166. //将描述符(socket)加入集合
  167. //FD_SET(_sock, &fdRead);
  168. //FD_SET(_sock, &fdWrite);
  169. //FD_SET(_sock, &fdExp);
  170. //将描述符(socket)加入集合
  171. SOCKET maxSock = _clients[0]->sockfd();
  172. int nn = (int)_clients.size() - 1;
  173. for (int n = (int)_clients.size() - 1; n >= 0; n--)
  174. {
  175. FD_SET(_clients[n]->sockfd(), &fdRead);
  176. if (maxSock < _clients[n]->sockfd())
  177. {
  178. maxSock = _clients[n]->sockfd();
  179. }
  180. }
  181. //nfds 是一个整数值,是指fd_set集合中所有描述符(socket)的范围,而不是数量
  182. //即是所有文件描述符最大值+1,在Windows中这个参数可以写0
  183. //添加非阻塞
  184. //timeval t = { 1, 0 };
  185. int ret = select(maxSock + 1, &fdRead, 0, 0, nullptr);
  186. if (ret < 0)
  187. {
  188. //cout << "select任务结束 " <<nn<< endl;
  189. //Close();
  190. continue;
  191. }
  192. for (int n = (int)_clients.size() - 1; n >= 0; n--)
  193. {
  194. if (FD_ISSET(_clients[n]->sockfd(), &fdRead))
  195. {
  196. if (RecvData(_clients[n]) == -1)
  197. {
  198. auto iter = _clients.begin() + n;
  199. if (iter != _clients.end())
  200. {
  201. if (_pNetEvent)
  202. {
  203. _pNetEvent->OnNetLeave(_clients[n]);
  204. }
  205. ///顺序不能变
  206. delete _clients[n];
  207. _clients.erase(iter);
  208. }
  209. }
  210. }
  211. }
  212. }
  213. return true;
  214. }
  215. //缓冲区
  216. char _szRecv[RECV_BUFF_SIZE];// = {};
  217. //接收数据 处理粘包哦拆分包
  218. int RecvData(ClientSocket* pClient)
  219. {
  220. //5,接受客户端的请求数据
  221. int nLen = (int)recv(pClient->sockfd(), (char*)&_szRecv, RECV_BUFF_SIZE, 0);
  222. if (nLen <= 0)
  223. {
  224. //cout <<pClient->sockfd()<< " 客户端已经退出,任务结束" << endl;
  225. return -1;
  226. }
  227. //将收取到的消息拷贝到消息缓冲区
  228. memcpy(pClient->msgBuf() + pClient->getLastPos(), _szRecv, nLen);
  229. //消息缓冲区的数据尾部后移
  230. pClient->setLastPos(pClient->getLastPos() + nLen);
  231. //判断消息缓冲区的数据长度大于消息头DataHeader长度
  232. while (pClient->getLastPos() >= sizeof(DataHeader))
  233. {
  234. //这个时候就知道当前消息长度
  235. DataHeader *header = (DataHeader*)pClient->msgBuf();
  236. //判断消息缓冲区的长度大于消息长度
  237. if (pClient->getLastPos() >= header->dataLength)
  238. {
  239. //消息缓冲区剩余未处理数据的长度
  240. int nSize = pClient->getLastPos() - header->dataLength;
  241. //处理网络消息
  242. OnNetMsg(pClient, header);
  243. //将消息缓冲区剩余未处理数据前移
  244. memcpy(pClient->msgBuf(), pClient->msgBuf() + header->dataLength, nSize);
  245. pClient->setLastPos(nSize);
  246. }
  247. else{
  248. //消息缓冲区剩余数据不够一条完整消息
  249. break;
  250. }
  251. }
  252. return 0;
  253. }
  254. //响应网络消息
  255. virtual void OnNetMsg(ClientSocket *pClient, DataHeader *header)
  256. {
  257. _pNetEvent->OnNetMsg(pClient, header);
  258. }
  259. void addClient(ClientSocket *pClient)
  260. {
  261. lock_guard<mutex> lock(_mutex);
  262. //_mutex.lock();
  263. _clientsBuff.push_back(pClient);
  264. //_mutex.unlock();
  265. }
  266. void Start()
  267. {
  268. _thread = thread(mem_fn(&CellServer::OnRun), this);
  269. }
  270. size_t getClientCount()
  271. {
  272. return _clients.size()+_clientsBuff.size();
  273. }
  274. private:
  275. SOCKET _sock;
  276. //正式客户队列
  277. vector<ClientSocket*> _clients;
  278. //客户缓冲队列
  279. vector<ClientSocket*> _clientsBuff;
  280. //缓冲队列的锁
  281. mutex _mutex;
  282. thread _thread;
  283. //网络事件对象
  284. INetEvent *_pNetEvent;
  285. };
  286. class EasyTcpServer:public INetEvent
  287. {
  288. private:
  289. SOCKET _sock;
  290. //vector<ClientSocket*> _clients;
  291. //消息处理对象,内部会创建线程
  292. vector<CellServer*> _cellServers;
  293. //每秒消息计时
  294. CELLTimestamp _tTime;
  295. //收到消息计数
  296. public:
  297. atomic<int> _recvCount;
  298. //客户端计数
  299. atomic<int> _clientCount;
  300. public:
  301. EasyTcpServer()
  302. {
  303. _sock = INVALID_SOCKET;
  304. _recvCount = 0;
  305. _clientCount = 0;
  306. }
  307. virtual ~EasyTcpServer()
  308. {
  309. Close();
  310. }
  311. //初始化Socket
  312. SOCKET InitSocket()
  313. {
  314. #ifdef _WIN32
  315. //启动Windows socket 2.x环境
  316. WORD ver = MAKEWORD(2, 2);
  317. WSADATA dat;
  318. WSAStartup(ver, &dat);
  319. #endif
  320. //1,用Socket API建立建立TCP客户端
  321. if (_sock != INVALID_SOCKET)
  322. {
  323. cout << "_sock=" << (int)_sock << "关闭旧连接" << endl;
  324. Close();
  325. }
  326. _sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  327. if (_sock == INVALID_SOCKET)
  328. {
  329. cout << "建立socket失败" << endl;
  330. }
  331. else
  332. {
  333. cout << (int)_sock << " 建立socket成功" << endl;
  334. }
  335. return _sock;
  336. }
  337. //绑定IP和端口号
  338. int Bind(const char *ip, unsigned short port)
  339. {
  340. //if (_sock != INVALID_SOCKET)
  341. //{
  342. // InitSocket();
  343. //}
  344. //2,bind 绑定用于接受客户端连接的网络接口
  345. sockaddr_in _sin = {};
  346. _sin.sin_family = AF_INET;
  347. _sin.sin_port = htons(port);
  348. #ifdef _WIN32
  349. if (ip){
  350. _sin.sin_addr.S_un.S_addr = inet_addr(ip);
  351. }
  352. else{
  353. _sin.sin_addr.S_un.S_addr = INADDR_ANY;
  354. }
  355. #else
  356. if (ip){
  357. _sin.sin_addr.s_addr = inet_addr(ip);
  358. }
  359. else{
  360. _sin.sin_addr.s_addr = INADDR_ANY;
  361. }
  362. #endif
  363. int ret = ::bind(_sock, (sockaddr*)&_sin, sizeof(_sin));
  364. if (SOCKET_ERROR == ret)
  365. {
  366. cout << (int)_sock << "错误,绑定网络端口失败" << endl;
  367. }
  368. else
  369. {
  370. cout << "绑定网络端口成功" << port << endl;
  371. }
  372. return ret;
  373. }
  374. //监听端口号
  375. int Listen(int n)
  376. {
  377. int ret = listen(_sock, n);
  378. if (SOCKET_ERROR == ret)
  379. {
  380. cout << (int)_sock << " 错误,监听网络端口失败" << endl;
  381. }
  382. else
  383. {
  384. cout << (int)_sock << " 监听网络端口成功" << endl;
  385. }
  386. return ret;
  387. }
  388. //接受客户端连接
  389. SOCKET Accept()
  390. {
  391. //4accept 等待客户端连接
  392. sockaddr_in clientAddr = {};
  393. int nAddrlen = sizeof(clientAddr);
  394. SOCKET cSock = INVALID_SOCKET;
  395. #ifdef _WIN32
  396. cSock = accept(_sock, (sockaddr *)&clientAddr, &nAddrlen);
  397. #else
  398. cSock = accept(_sock, (sockaddr *)&clientAddr, (socklen_t *)&nAddrlen);
  399. #endif
  400. if (INVALID_SOCKET == cSock)
  401. {
  402. cout << (int)_sock << " 错误,接受到无效的客户端连接" << endl;
  403. }
  404. else
  405. {
  406. //NewUserJoin userJoin;
  407. //SendDataToAll(&userJoin);
  408. //将新客户端分配给客户端数量最少的cellServer
  409. addClientToCellServer(new ClientSocket(cSock));
  410. //获取IP地址 inet_ntoa(clientAddr.sin_addr)
  411. }
  412. return _sock;
  413. }
  414. void addClientToCellServer(ClientSocket *pClient)
  415. {
  416. //_clients.push_back(pClient);
  417. //查找客户数量最少的CellServer消息处理对象
  418. auto pMinServer = _cellServers[0];
  419. for (auto pCellServer : _cellServers)
  420. {
  421. if (pMinServer->getClientCount() > pCellServer->getClientCount())
  422. {
  423. pMinServer = pCellServer;
  424. }
  425. }
  426. pMinServer->addClient(pClient);
  427. OnNetJoin(pClient);
  428. }
  429. void Start(int _CellServer_THREAD_COUNT)
  430. {
  431. for (int n = 0; n < _CellServer_THREAD_COUNT; n++)
  432. {
  433. auto ser = new CellServer(_sock);
  434. _cellServers.push_back(ser);
  435. //注册网络事件接收对象
  436. ser->setEventObj(this);
  437. //启动消息处理线程
  438. ser->Start();
  439. }
  440. }
  441. //关闭Socket
  442. void Close()
  443. {
  444. if (_sock != INVALID_SOCKET)
  445. {
  446. #ifdef _WIN32
  447. //for (int n = (int)_clients.size() - 1; n >= 0; n--)
  448. //{
  449. // closesocket(_clients[n]->sockfd());
  450. // delete _clients[n];
  451. //}
  452. //关闭套接字closesocket
  453. closesocket(_sock);
  454. //-------------
  455. //清除Windows socket环境
  456. WSACleanup();
  457. #else
  458. for (int n = (int)_clients.size() - 1; n >= 0; n--)
  459. {
  460. close(_clients[n]->sockfd());
  461. delete _clients[n];
  462. }
  463. //8,关闭套接字closesocket
  464. close(_sock);
  465. #endif
  466. //_clients.clear();
  467. }
  468. }
  469. //处理网络消息
  470. bool OnRun()
  471. {
  472. if (isRun())
  473. {
  474. time4msg();
  475. //伯克利套接字
  476. fd_set fdRead;
  477. //fd_set fdWrite;
  478. //fd_set fdExp;
  479. FD_ZERO(&fdRead);
  480. //FD_ZERO(&fdWrite);
  481. //FD_ZERO(&fdExp);
  482. FD_SET(_sock, &fdRead);
  483. //FD_SET(_sock, &fdWrite);
  484. //FD_SET(_sock, &fdExp);
  485. //nfds 是一个整数值,是指fd_set集合中所有描述符(socket)的范围,而不是数量
  486. //即是所有文件描述符最大值+1,在Windows中这个参数可以写0
  487. //添加非阻塞
  488. timeval t = { 0, 10 };
  489. int ret = select(_sock + 1, &fdRead, 0, 0, &t); //
  490. if (ret < 0)
  491. {
  492. cout << "Accept Select任务结束" << endl;
  493. Close();
  494. return false;
  495. }
  496. if (FD_ISSET(_sock, &fdRead))
  497. {
  498. FD_CLR(_sock, &fdRead);
  499. Accept();
  500. return true;
  501. }
  502. return true;
  503. }
  504. return false;
  505. }
  506. //是否工作中
  507. bool isRun()
  508. {
  509. return _sock != INVALID_SOCKET;
  510. }
  511. //响应网络消息
  512. void time4msg()
  513. {
  514. auto t1 = _tTime.getElapsedSecond();
  515. if (_tTime.getElapsedSecond() >= 1.0)
  516. {
  517. //cout << "thread "<<_cellServers.size()<<" tTime " << t1 <<" client " <<(int)_clients.size()<<" socket " << _sock << " _recvCount " << _recvCount<< endl;
  518. cout << "thread " << _cellServers.size() << " tTime " << t1 << " client " << (int)_clientCount << " socket " << _sock << " _recvCount " << _recvCount << endl;
  519. _recvCount = 0;
  520. _tTime.update();
  521. }
  522. }
  523. //群发消息
  524. //void SendDataToAll(DataHeader *header)
  525. //{
  526. // for (int n = (int)_clients.size() - 1; n >= 0; n--)
  527. // {
  528. // SendData(_clients[n]->sockfd(), header);
  529. // }
  530. //}
  531. //只会被一个线程调用 安全
  532. virtual void OnNetJoin(ClientSocket *pClient)
  533. {
  534. _clientCount++;
  535. }
  536. //4 多个线程触发不安全
  537. //如果只开启一个cellServer就是安全的
  538. virtual void OnNetLeave(ClientSocket *pClient)
  539. {
  540. _clientCount--;
  541. //for (int n = (int)_clients.size() - 1; n >= 0; n--)
  542. //{
  543. // if (_clients[n] == pClient)
  544. // {
  545. // auto iter = _clients.begin() + n;
  546. // if (iter != _clients.end())
  547. // {
  548. // _clients.erase(iter);
  549. // }
  550. // }
  551. //}
  552. }
  553. //4 多个线程触发不安全
  554. //如果只开启一个cellServer就是安全的
  555. virtual void OnNetMsg(ClientSocket *pClient, DataHeader *header)
  556. {
  557. _recvCount++;
  558. }
  559. };
  560. #endif
  • MessageHeader.hpp
  1. enum CMD
  2. {
  3. CMD_LOGIN, //登入
  4. CMD_LOGIN_RESULT,
  5. CMD_LOGOUT, //登出
  6. CMD_LOGOUT_RESULT,
  7. CMD_NEW_USER_JOIN, //新的用户加入
  8. CMD_ERROR, //错误
  9. };
  10. struct DataHeader
  11. {
  12. DataHeader()
  13. {
  14. dataLength = sizeof(DataHeader);
  15. cmd = CMD_ERROR;
  16. }
  17. short dataLength;
  18. short cmd;
  19. };
  20. //匹配四个消息结构体
  21. struct Login : public DataHeader
  22. {
  23. Login()
  24. {
  25. dataLength = sizeof(Login);
  26. cmd = CMD_LOGIN;
  27. }
  28. char userName[32];
  29. char PassWord[32];
  30. //char data[932];
  31. };
  32. struct LoginResult : public DataHeader
  33. {
  34. LoginResult()
  35. {
  36. dataLength = sizeof(LoginResult);
  37. cmd = CMD_LOGIN_RESULT;
  38. result = 0;
  39. }
  40. int result;
  41. //char data[992];
  42. };
  43. struct Logout : public DataHeader
  44. {
  45. Logout()
  46. {
  47. dataLength = sizeof(Logout);
  48. cmd = CMD_LOGOUT;
  49. }
  50. char userName[32];
  51. };
  52. struct LogoutResult : public DataHeader
  53. {
  54. LogoutResult()
  55. {
  56. dataLength = sizeof(LogoutResult);
  57. cmd = CMD_LOGOUT_RESULT;
  58. result = 0;
  59. }
  60. int result;
  61. };
  62. struct NewUserJoin :public DataHeader
  63. {
  64. NewUserJoin()
  65. {
  66. dataLength = sizeof(NewUserJoin);
  67. cmd = CMD_NEW_USER_JOIN;
  68. sock = 0;
  69. }
  70. int sock;
  71. };
  • server.cpp
  1. #include"EasyTcpServer.hpp"
  2. #include<thread>
  3. class MyServer :public EasyTcpServer
  4. {
  5. public:
  6. //只会被一个线程调用 安全
  7. virtual void OnNetJoin(ClientSocket *pClient)
  8. {
  9. _clientCount++;
  10. //cout << "client " << pClient->sockfd() << "join" << endl;
  11. }
  12. //4 多个线程触发不安全
  13. //如果只开启一个cellServer就是安全的
  14. virtual void OnNetLeave(ClientSocket *pClient)
  15. {
  16. _clientCount--;
  17. //cout << "client " << pClient->sockfd() << "leave" << endl;
  18. }
  19. //4 多个线程触发不安全
  20. //如果只开启一个cellServer就是安全的
  21. virtual void OnNetMsg(ClientSocket *pClient, DataHeader *header)
  22. {
  23. _recvCount++;
  24. switch (header->cmd)
  25. {
  26. case CMD_LOGIN:
  27. {
  28. //做数据偏移
  29. Login *login = (Login*)header;
  30. //cout << "收到命令:CMD_LOGIN, 数据长度:" << login->dataLength;
  31. //cout << " UserName:" << login->userName << " PassWord:" << login->PassWord << endl;
  32. //忽略判断用户名密码是否正确的过程
  33. LoginResult ret;
  34. pClient->SendData(&ret);
  35. }
  36. break;
  37. case CMD_LOGOUT:
  38. {
  39. Login *logout = (Login*)header;
  40. cout << "收到命令:CMD_LOGIN, 数据长度:" << logout->dataLength;
  41. cout << " UserName:" << logout->userName << endl;
  42. //忽略判断用户名密码是否正确的过程
  43. LogoutResult ret;
  44. //SendData(_cSock, (DataHeader *)&ret);
  45. }
  46. break;
  47. default:
  48. {
  49. cout << pClient->sockfd() << " 收到未定义的消息,数据长度 " << header->dataLength;
  50. DataHeader ret;
  51. //SendData(_cSock, (DataHeader *)&ret);
  52. }
  53. break;
  54. }
  55. }
  56. private:
  57. };
  58. int main()
  59. {
  60. MyServer server;
  61. server.InitSocket();
  62. server.Bind(nullptr, 4567);
  63. server.Listen(5);
  64. server.Start(4);
  65. while (server.isRun())
  66. {
  67. server.OnRun();
  68. }
  69. server.Close();
  70. cout << "已退出" << endl;
  71. system("pause");
  72. return 0;
  73. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/162830
推荐阅读
相关标签
  

闽ICP备14008679号