当前位置:   article > 正文

搜狗开源框架Workflow网络模型分析

搜狗开源框架Workflow网络模型分析

workflow是一个比较轻量化的后端服务框架,支持Linux/Mac/Windows主流平台,其网络模块是框架的核心。在workflow-windows分支上可以看到对windows的IOCP的封装,对于学习windows IOCP网络编程有很好的启发意义。因此,有必要对该网络模块的工作原理进行分析(源码位置:workflow-windows/src/kernel_win/)。

IOCP完成端口主要封装在WinPoller类中,提供网络IO的异步读写,强制唤醒,提前终止等操作。除此之外,WinPoller内部还封装了一个定时器,用于处理延时任务,还支持用户事件转发,用来配合外部任务流的运行。

为进一步了解该网络模块的工作原理,需要理清workflow中的服务框架的组成关系,搞明白一个服务是怎么被拉起的,何时结束的。从官方给出http_server使用示例,可以看到服务启动的整个流程,如下图,显示了WinPoller在workflow网络服务框架中所处的位置,以及一些主要的接口方法。站在Communicator的角度上,可以非常直观地看到对WinPoller的调用过程。

 通过WinPoller的put_io接口,可以投递多种不同类型的异步事件,比如读IO事件,写IO事件,建立连接事件,接受连接事件,延时事件(即sleep事件)以及用户自定义事件(即user事件),并可以通过get_io_result拿到异步事件的结果,完成后续操作。

下面给出一个简单的TCP Server(或UDP Server)示例以供参考,支持多客户端并发连接(只是简单的实现了accept、connect和read等事件的处理,主要借鉴communicator内部实现):


注: 在一台十几年前快报废的办公本上大概压力测了下,吞吐量貌似还不错,轻松过5000。

  1. #include <iostream>
  2. #include <assert.h>
  3. #include <atomic>
  4. #include "WinPoller.h"
  5. #pragma comment(lib, "Ws2_32.lib")
  6. #pragma comment(lib,"Mswsock.lib")
  7. class ReadContextEx
  8. {
  9. public:
  10. char* buffer_internal_cached;
  11. DWORD msgsize;
  12. WSABUF buffer;
  13. const DWORD max_msg_size = 64 * 1024;
  14. ReadContextEx(size_t buffer_size)
  15. {
  16. if (buffer_size > max_msg_size)
  17. {
  18. buffer_internal_cached = (char*)malloc(max_msg_size);
  19. msgsize = max_msg_size;
  20. }
  21. else
  22. {
  23. buffer_internal_cached = (char*)malloc(buffer_size);
  24. msgsize = buffer_size;
  25. }
  26. if (buffer_internal_cached != nullptr)
  27. {
  28. buffer.buf = buffer_internal_cached;
  29. buffer.len = msgsize;
  30. }
  31. }
  32. ~ReadContextEx()
  33. {
  34. free(buffer_internal_cached);
  35. }
  36. };
  37. class ConnectContextEx
  38. {
  39. public:
  40. void* entry;
  41. struct sockaddr* addr;
  42. socklen_t addrlen;
  43. struct sockaddr_in addr_in;
  44. ConnectContextEx(void* e)
  45. {
  46. entry = e;
  47. addrlen = sizeof(struct sockaddr);
  48. memset(&addr_in, 0x00, addrlen);
  49. addr = (struct sockaddr*)&addr_in;
  50. }
  51. };
  52. static inline int __set_fd_nonblock(SOCKET fd)
  53. {
  54. unsigned long mode = 1;
  55. int ret = ioctlsocket(fd, FIONBIO, &mode);
  56. if (ret == SOCKET_ERROR)
  57. {
  58. errno = WSAGetLastError();
  59. return -1;
  60. }
  61. return 0;
  62. }
  63. static int __bind_and_listen(SOCKET listen_sockfd, const struct sockaddr* addr, socklen_t addrlen)
  64. {
  65. struct sockaddr_storage ss;
  66. socklen_t len = sizeof(struct sockaddr_storage);
  67. if (getsockname(listen_sockfd, (struct sockaddr*)&ss, &len) == SOCKET_ERROR)
  68. {
  69. if (WSAGetLastError() == WSAEINVAL)
  70. {
  71. if (bind(listen_sockfd, addr, addrlen) == SOCKET_ERROR)
  72. return -1;
  73. }
  74. }
  75. if (listen(listen_sockfd, SOMAXCONN) == SOCKET_ERROR)
  76. return -1;
  77. return 0;
  78. }
  79. static int __bind_local(SOCKET sockfd, const struct sockaddr* addr, socklen_t addrlen)
  80. {
  81. struct sockaddr_storage ss;
  82. socklen_t len = sizeof(struct sockaddr_storage);
  83. if (getsockname(sockfd, (struct sockaddr*)&ss, &len) == SOCKET_ERROR)
  84. {
  85. if (WSAGetLastError() == WSAEINVAL)
  86. {
  87. if (bind(sockfd, addr, addrlen) == SOCKET_ERROR)
  88. return -1;
  89. }
  90. }
  91. return 0;
  92. }
  93. static int __bind_any(SOCKET sockfd, int sa_family)
  94. {
  95. struct sockaddr_storage addr;
  96. socklen_t addrlen;
  97. memset(&addr, 0, sizeof(struct sockaddr_storage));
  98. addr.ss_family = sa_family;
  99. if (sa_family == AF_INET)
  100. {
  101. struct sockaddr_in* sin = (struct sockaddr_in*)&addr;
  102. sin->sin_addr.s_addr = INADDR_ANY;
  103. sin->sin_port = 0;
  104. addrlen = sizeof(struct sockaddr_in);
  105. }
  106. else if (sa_family == AF_INET6)
  107. {
  108. struct sockaddr_in6* sin6 = (struct sockaddr_in6*)&addr;
  109. sin6->sin6_addr = in6addr_any;
  110. sin6->sin6_port = 0;
  111. addrlen = sizeof(struct sockaddr_in6);
  112. }
  113. else
  114. addrlen = sizeof(struct sockaddr_storage);
  115. if (bind(sockfd, (struct sockaddr*)&addr, addrlen) == SOCKET_ERROR)
  116. return -1;
  117. return 0;
  118. }
  119. static int __sync_send(SOCKET sockfd, const void* buf, size_t size)
  120. {
  121. int ret;
  122. if (size == 0 || !buf)
  123. return 0;
  124. ret = send(sockfd, (const char*)buf, size, 0);
  125. if (ret == size)
  126. return size;
  127. if (ret > 0)
  128. {
  129. errno = ENOBUFS;
  130. ret = -1;
  131. }
  132. return ret;
  133. }
  134. int __create_stream_socket(unsigned short address_family, int type, bool is_blocked = false)
  135. {
  136. SOCKET sock = (int)socket(address_family, type, 0);
  137. if (sock != INVALID_SOCKET)
  138. {
  139. if (!is_blocked && __set_fd_nonblock(sock) < 0)
  140. {
  141. closesocket(sock);
  142. return -1;
  143. }
  144. }
  145. else
  146. return -1;
  147. return (int)sock;
  148. }
  149. void handle_accept_result(WinPoller* poller, struct poller_result* res)
  150. {
  151. AcceptConext* ctx = (AcceptConext*)res->data.context;
  152. SOCKET listen_fd = (SOCKET)res->data.handle;
  153. SOCKET sockfd = ctx->accept_sockfd;
  154. switch (res->state)
  155. {
  156. case PR_ST_SUCCESS://todo error???
  157. case PR_ST_FINISHED:
  158. if (sockfd != INVALID_SOCKET)
  159. {
  160. if (poller->bind((HANDLE)sockfd) >= 0)
  161. {
  162. struct poller_data data;
  163. int timeout;
  164. auto* new_ctx = new ReadContextEx(1024);
  165. data.operation = PD_OP_READ;
  166. data.handle = (HANDLE)sockfd;
  167. data.context = new_ctx;
  168. if (poller->put_io(&data, -1) < 0)
  169. {
  170. delete new_ctx;
  171. poller->unbind_socket(sockfd);
  172. }
  173. else
  174. {
  175. ctx->remote, ctx->remote_len;
  176. char buf[20] = { 0 };
  177. inet_ntop(AF_INET, &((sockaddr_in*)ctx->remote)->sin_addr, buf, sizeof(buf));
  178. printf(" Accept a new connection: ip=[%s], port=%d.\n", buf, ntohs(((sockaddr_in*)ctx->remote)->sin_port));
  179. }
  180. }
  181. else
  182. {
  183. closesocket(sockfd);
  184. ctx->accept_sockfd = INVALID_SOCKET;
  185. }
  186. }
  187. break;
  188. case PR_ST_ERROR:
  189. case PR_ST_STOPPED:
  190. case PR_ST_TIMEOUT:
  191. {
  192. closesocket(sockfd);
  193. //poller->unbind_socket(listen_fd);// terminate server
  194. //listen_fd = INVALID_SOCKET;
  195. }
  196. break;
  197. default:
  198. assert(0);
  199. break;
  200. }
  201. ctx->accept_sockfd = __create_stream_socket(AF_INET,SOCK_STREAM);
  202. if (listen_fd != INVALID_SOCKET && ctx->accept_sockfd)
  203. {
  204. if (poller->put_io(&res->data, -1) >= 0)
  205. return;//reuse context
  206. closesocket(ctx->accept_sockfd);
  207. ctx->accept_sockfd = INVALID_SOCKET;
  208. }
  209. if (listen_fd != INVALID_SOCKET)
  210. poller->unbind_socket(listen_fd);
  211. delete ctx;
  212. }
  213. void handle_connect_result(WinPoller* poller, struct poller_result* res)
  214. {
  215. ConnectContextEx* ctx = (ConnectContextEx*)res->data.context;
  216. struct sockaddr_in target_address = *(struct sockaddr_in*)(ctx->addr);
  217. SOCKET handle = (SOCKET)res->data.handle;
  218. delete ctx;
  219. char target_ip_str[30] = {};
  220. switch (res->state)
  221. {
  222. case PR_ST_SUCCESS://todo error???
  223. case PR_ST_FINISHED:
  224. if (handle != INVALID_SOCKET)
  225. {
  226. inet_ntop(AF_INET, &target_address.sin_addr, target_ip_str, 30);
  227. // greet message.
  228. printf("connect to server success[%s].\n", target_ip_str);
  229. __sync_send(handle, "", 0);
  230. auto* new_ctx = new ReadContextEx(1024);
  231. struct poller_data data;
  232. data.operation = PD_OP_READ;
  233. data.handle = (HANDLE)handle;
  234. data.context = new_ctx;
  235. if (poller->put_io(&data, -1) < 0)
  236. {
  237. delete new_ctx;
  238. poller->unbind_socket(handle);
  239. }
  240. else
  241. {
  242. return;
  243. }
  244. }
  245. res->error = errno;
  246. break;
  247. case PR_ST_ERROR:
  248. {
  249. inet_ntop(AF_INET, &target_address.sin_addr, target_ip_str, 30);
  250. printf("connect to %s failed, error=%d.\n", target_ip_str, res->error);
  251. }
  252. break;
  253. case PR_ST_TIMEOUT:
  254. {
  255. poller->unbind_socket(handle);
  256. printf("connect timeout, error=%d.\n", res->error);
  257. }
  258. break;
  259. case PR_ST_STOPPED:
  260. poller->unbind_socket(handle);
  261. break;
  262. default:
  263. assert(0);
  264. break;
  265. }
  266. }
  267. void handle_read_result(WinPoller* poller, struct poller_result* res)
  268. {
  269. ReadContextEx* ctx = (ReadContextEx*)res->data.context;
  270. std::string buf;
  271. int timeout;
  272. switch (res->state)
  273. {
  274. case PR_ST_SUCCESS:
  275. {
  276. buf = std::string(ctx->buffer.buf, res->iobytes);
  277. if (buf.size() > 0 && buf[buf.size() - 1] != '\n')
  278. {
  279. buf = buf + "\n";
  280. }
  281. else if (buf.size() == 0)
  282. buf = "\n";
  283. printf(" Recv data from client: dataLen=%d, msg body=%s", res->iobytes, buf.c_str());
  284. /*
  285. * 处理数据,并将处理结果发出去
  286. * 需要在此处可投递异步写事件(一般而言写消息用同步方式更高效,除非一次性要发送大量的数据,否则应该用同步接口)
  287. *
  288. */
  289. char send_msg[100] = {};
  290. int sz = sprintf_s(send_msg, 100, "Hello there, already get your msg : %s.\n", buf.c_str());
  291. __sync_send((SOCKET)res->data.handle, send_msg, sz);
  292. //继续投递读事件
  293. res->data.operation = PD_OP_READ;
  294. if (poller->put_io(&res->data, -1) >= 0)
  295. {
  296. ctx = NULL;//reuse context
  297. }
  298. else
  299. {
  300. printf("Internal error.");
  301. poller->unbind_socket((SOCKET)res->data.handle);
  302. }
  303. }
  304. break;
  305. case PR_ST_FINISHED:
  306. case PR_ST_TIMEOUT:
  307. case PR_ST_ERROR:
  308. {
  309. printf("client disconnet or dead. \n");
  310. poller->unbind_socket((SOCKET)res->data.handle);
  311. }
  312. break;
  313. case PR_ST_STOPPED:
  314. {
  315. poller->unbind_socket((SOCKET)res->data.handle);
  316. printf("client has been kicked off. \n");//socket本地主动关闭,最常见的比如超时机制踢掉不活跃的连接
  317. }
  318. break;
  319. default:
  320. assert(0);
  321. break;
  322. }
  323. delete ctx;
  324. }
  325. void handle_sleep_result(WinPoller* poller, struct poller_result* res)
  326. {
  327. int io_type=0;
  328. socklen_t optlen = sizeof(io_type);
  329. int ret = getsockopt((SOCKET)res->data.handle, SOL_SOCKET, SO_TYPE,(char*)&io_type, &optlen);
  330. if(ret >= 0 && (io_type == SOCK_DGRAM || io_type == SOCK_STREAM))
  331. {
  332. printf("Network Timer Event Triggered.\n");//网络定时事件触发
  333. poller->unbind_socket((SOCKET)res->data.handle);
  334. }
  335. }
  336. int lanuch_async_connect(WinPoller* poller, const char* target_ip, unsigned int target_port, int timeout = -1, unsigned int local_port = 0)
  337. {
  338. SOCKET sockfd = __create_stream_socket(AF_INET,SOCK_STREAM);
  339. if (sockfd != INVALID_SOCKET)
  340. {
  341. if (poller->bind((HANDLE)sockfd) >= 0)
  342. {
  343. int bind_local_result = 0;
  344. if (local_port == 0)
  345. {
  346. bind_local_result = __bind_any(sockfd, AF_INET);
  347. }
  348. else
  349. {
  350. bind_local_result = __bind_any(sockfd, AF_INET);// TODO: real bind to local_port
  351. target_port = 0;
  352. }
  353. if (bind_local_result >= 0)
  354. {
  355. poller_data data;
  356. auto* new_ctx = new ConnectContextEx(nullptr);
  357. data.operation = PD_OP_CONNECT;
  358. data.handle = (HANDLE)sockfd;
  359. data.context = new_ctx;
  360. new_ctx->addr_in.sin_family = AF_INET;
  361. inet_pton(AF_INET, target_ip, &new_ctx->addr_in.sin_addr);
  362. new_ctx->addr_in.sin_port = htons(target_port);
  363. int err = poller->put_io(&data, timeout);
  364. if (err >= 0)
  365. return sockfd;
  366. else
  367. {
  368. printf(" put async connect event failed: error=%d.\n", errno);
  369. }
  370. delete new_ctx;
  371. poller->unbind_socket(sockfd);
  372. }
  373. else
  374. {
  375. poller->unbind_socket(sockfd);
  376. }
  377. }
  378. closesocket(sockfd);
  379. }
  380. return -1;
  381. }
  382. void handle_udp_accept_result(WinPoller* poller, struct poller_result* res)
  383. {
  384. char client_ip_str[30] = {};
  385. UdpAcceptCtx* ctx = (UdpAcceptCtx*)res->data.context;
  386. SOCKET listen_fd = (SOCKET)res->data.handle;
  387. //SOCKET sock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
  388. SOCKET sock = __create_stream_socket(AF_INET, SOCK_DGRAM);
  389. sockaddr_in address;
  390. address.sin_family = AF_INET;
  391. address.sin_addr.s_addr = htonl(INADDR_ANY);
  392. address.sin_port = 0;//htons(UDP_SERVER_PORT);
  393. //int reuse = 1;
  394. //setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(int));
  395. int ret = __bind_local(sock, (SOCKADDR*)&address, sizeof(SOCKADDR));
  396. if (ret >= 0 && sock!=INVALID_SOCKET)
  397. {
  398. ret = connect(sock, (struct sockaddr*)&ctx->remoteAddr, ctx->remoteAddrLen);
  399. if (poller->bind((HANDLE)sock) >= 0)
  400. {
  401. poller_data data;
  402. data.handle = (HANDLE)sock;
  403. data.operation = PD_OP_READ;
  404. auto* new_ctx = new ReadContextEx(1024);
  405. data.context = new_ctx;
  406. if (poller->put_io(&data, 5000) < 0)// if client don't reply in 5 seconds, consider it as a dead connection.
  407. {
  408. delete new_ctx;
  409. poller->unbind_socket(sock);
  410. }
  411. else
  412. {
  413. char greet_msg[100] = {};
  414. int sz = sprintf_s(greet_msg, 99, "hello there, get your first msg: %s, length=%d.\n", ctx->wsaBuf.buf,res->iobytes);
  415. //sendto(sock, greet_msg, sz, 0,(const sockaddr*)&ctx->remoteAddr, ctx->remoteAddrLen);
  416. __sync_send(sock, greet_msg,sz);
  417. inet_ntop(AF_INET, &ctx->remoteAddr.sin_addr, client_ip_str, 30);
  418. int port = ntohs(ctx->remoteAddr.sin_port);
  419. printf(" udp client reached: %s, %d.\n", client_ip_str, port);
  420. }
  421. }
  422. }
  423. else
  424. {
  425. if(sock != INVALID_SOCKET)
  426. closesocket(sock);
  427. }
  428. if (listen_fd != INVALID_SOCKET)
  429. {
  430. if (poller->put_io(&res->data, -1) >= 0)
  431. return;//reuse context
  432. }
  433. if (listen_fd != INVALID_SOCKET)
  434. poller->unbind_socket(listen_fd);
  435. delete ctx;
  436. return;
  437. }
  438. int main()
  439. {
  440. WSADATA wsaData;
  441. int port = 9218;
  442. const char* ip_str = "127.0.0.1";
  443. int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
  444. if (ret != 0)
  445. return ret;
  446. struct sockaddr_in bind_addr;
  447. bind_addr.sin_family = AF_INET;
  448. inet_pton(AF_INET, ip_str, &bind_addr.sin_addr);
  449. bind_addr.sin_port = htons(port);
  450. int type = SOCK_DGRAM;
  451. SOCKET listen_fd = __create_stream_socket(AF_INET,type);
  452. if (listen_fd < 0)
  453. return -1;
  454. WinPoller* poller = new WinPoller(1);
  455. bool server_succ = false, server_bind_ok = false;;
  456. if (poller->bind((HANDLE)listen_fd) >= 0)
  457. {
  458. server_bind_ok = true;
  459. if (type == SOCK_STREAM&&__bind_and_listen(listen_fd, (struct sockaddr*)(&bind_addr), sizeof sockaddr) >= 0)
  460. {
  461. poller_data data;
  462. auto* new_ctx = new AcceptConext(nullptr);
  463. data.operation = PD_OP_ACCEPT;
  464. data.handle = (HANDLE)listen_fd;
  465. data.context = new_ctx;
  466. new_ctx->accept_sockfd = __create_stream_socket(AF_INET,SOCK_STREAM);
  467. if (new_ctx->accept_sockfd <= 0)
  468. {
  469. delete new_ctx;
  470. }
  471. else
  472. {
  473. if (poller->put_io(&data, -1) < 0)
  474. {
  475. closesocket(new_ctx->accept_sockfd);
  476. delete new_ctx;
  477. }
  478. else
  479. {
  480. server_succ = true;
  481. }
  482. }
  483. }
  484. if (type == SOCK_DGRAM && __bind_local(listen_fd, (struct sockaddr*)(&bind_addr), sizeof sockaddr) >= 0)
  485. {
  486. poller_data data;
  487. data.handle= (HANDLE)listen_fd;
  488. data.operation = PD_OP_ACCEPT + 100;
  489. UdpAcceptCtx* new_ctx = new UdpAcceptCtx(1024);
  490. data.context = new_ctx;
  491. if (poller->put_io(&data, -1) < 0)
  492. {
  493. delete new_ctx;
  494. }
  495. else
  496. {
  497. server_succ = true;
  498. }
  499. }
  500. }
  501. if (server_succ)
  502. {
  503. if(type == SOCK_STREAM)
  504. printf(" TCP server launched success: address = %s, port = %d.\n", ip_str, port);
  505. if(type == SOCK_DGRAM)
  506. printf(" UDP server launched success: address = %s, port = %d.\n", ip_str, port);
  507. }
  508. else
  509. {
  510. if (server_bind_ok)
  511. poller->unbind_socket(listen_fd);
  512. else
  513. closesocket(listen_fd);
  514. delete poller;
  515. WSACleanup();
  516. return -1;
  517. }
  518. std::cout << "Hello World!\n";
  519. poller->start();//start timer, otherwise timeout mechanism will make no sense
  520. //lanuch_async_connect(poller,"127.0.0.1",8277);// connect to another server if you like.
  521. poller_result res;
  522. while (1)
  523. {
  524. int ret = poller->get_io_result(&res, -1);
  525. if (ret < 0)// poller->stop() has been called somewhere, maybe in another thread
  526. {
  527. break;
  528. }
  529. else if (ret > 0)
  530. {
  531. //printf("%lld %d\n", res.data.handle, res.data.operation);
  532. switch (res.data.operation & 0xFF)
  533. {
  534. case PD_OP_READ:
  535. {
  536. handle_read_result(poller, &res);
  537. }
  538. break;
  539. case PD_OP_WRITE:
  540. //handle_write_result(&res);
  541. break;
  542. case PD_OP_CONNECT:
  543. {
  544. handle_connect_result(poller, &res);
  545. }
  546. break;
  547. case PD_OP_ACCEPT:
  548. {
  549. handle_accept_result(poller, &res);
  550. }
  551. break;
  552. case PD_OP_SLEEP:
  553. handle_sleep_result(poller, &res);
  554. break;
  555. case PD_OP_USER:
  556. //handle_event_result(&res);
  557. break;
  558. case 100+ PD_OP_ACCEPT:
  559. {
  560. handle_udp_accept_result(poller,&res);
  561. }
  562. break;
  563. default:
  564. assert(0);
  565. break;
  566. }
  567. }
  568. }
  569. WSACleanup();
  570. poller->unbind_socket(listen_fd);
  571. delete poller;
  572. return 0;
  573. }

WinPoller可以直接拿来作为作为一个核心部件,实现各种网络服务框架,也可以用作客户端,封装各种协议与其他服务框架交互。

为了支持UDP并发服务,对WinPoller进行了一下扩充,源代码如下:

  1. /*
  2. Copyright (c) 2019 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. Authors: Wu Jiaxu (wujiaxu@sogou-inc.com)
  13. */
  14. #ifndef _WPOLLER_H_
  15. #define _WPOLLER_H_
  16. #include <thread>
  17. #include <mutex>
  18. # include <Ws2tcpip.h>
  19. # include <Ws2def.h>
  20. #define ACCEPT_ADDR_SIZE (sizeof (struct sockaddr_storage) + 16)
  21. struct poller_data
  22. {
  23. HANDLE handle;
  24. void *context;
  25. #define PD_OP_READ 1
  26. #define PD_OP_WRITE 2
  27. #define PD_OP_ACCEPT 3
  28. #define PD_OP_CONNECT 4
  29. #define PD_OP_SLEEP 5
  30. #define PD_OP_USER 16
  31. uint16_t operation;
  32. };
  33. struct poller_result
  34. {
  35. #define PR_ST_SUCCESS 0
  36. #define PR_ST_FINISHED 1
  37. #define PR_ST_ERROR 2
  38. #define PR_ST_STOPPED 5
  39. #define PR_ST_TIMEOUT 6
  40. int state;
  41. int error;
  42. DWORD iobytes;
  43. struct poller_data data;
  44. };
  45. class AcceptConext
  46. {
  47. public:
  48. void *service;
  49. SOCKET accept_sockfd;
  50. char *buf;
  51. struct sockaddr *remote;
  52. int remote_len;
  53. AcceptConext(void *sc)
  54. {
  55. service = sc;
  56. buf = new char[ACCEPT_ADDR_SIZE * 2];
  57. }
  58. ~AcceptConext()
  59. {
  60. delete []buf;
  61. }
  62. };
  63. class UdpAcceptCtx {
  64. public:
  65. SOCKADDR_IN remoteAddr;
  66. int remoteAddrLen;
  67. char* greet_buff;
  68. size_t buff_size;
  69. WSABUF wsaBuf;
  70. UdpAcceptCtx(size_t size)
  71. {
  72. if (size > 1024)
  73. buff_size = 1024;
  74. else
  75. buff_size = size;
  76. greet_buff = new char[buff_size];
  77. memset(greet_buff,0x00, buff_size);
  78. remoteAddrLen = sizeof(SOCKADDR_IN);
  79. memset(&remoteAddr, 0x00, remoteAddrLen);
  80. wsaBuf.buf = greet_buff;
  81. wsaBuf.len = buff_size;
  82. }
  83. ~UdpAcceptCtx()
  84. {
  85. delete[] greet_buff;
  86. }
  87. };
  88. class ConnectContext
  89. {
  90. public:
  91. void *entry;
  92. struct sockaddr *addr;
  93. socklen_t addrlen;
  94. ConnectContext(void *e, struct sockaddr *a, socklen_t l)
  95. {
  96. entry = e;
  97. addr = a;
  98. addrlen = l;
  99. }
  100. };
  101. class ReadContext
  102. {
  103. public:
  104. void *entry;
  105. DWORD msgsize;
  106. WSABUF buffer;
  107. ReadContext(void *e)
  108. {
  109. entry = e;
  110. msgsize = 0;
  111. }
  112. };
  113. class WriteContext
  114. {
  115. public:
  116. char *buf;
  117. void *entry;
  118. WSABUF *buffers;
  119. DWORD count;
  120. WriteContext(void *e)
  121. {
  122. buf = NULL;
  123. entry = e;
  124. }
  125. ~WriteContext()
  126. {
  127. delete []buf;
  128. }
  129. };
  130. class WinPoller
  131. {
  132. public:
  133. WinPoller(size_t poller_threads);
  134. ~WinPoller();
  135. int start();
  136. void stop();
  137. int bind(HANDLE handle);
  138. void unbind_socket(SOCKET sockfd) const;
  139. int transfer(const struct poller_data *data, DWORD iobytes);
  140. int put_io(const struct poller_data *data, int timeout);
  141. int get_io_result(struct poller_result *res, int timeout);
  142. int cancel_pending_io(HANDLE handle) const;
  143. void timer_routine();
  144. private:
  145. void *timer_queue_;
  146. std::mutex timer_mutex_;
  147. std::thread *timer_thread_;
  148. HANDLE timer_handle_;
  149. HANDLE iocp_;
  150. SOCKET lpfn_sockfd_;
  151. void *lpfn_connectex_;
  152. //void *lpfn_disconnectex_;
  153. volatile bool stop_;
  154. };
  155. #endif
  1. /*
  2. Copyright (c) 2019 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. Authors: Wu Jiaxu (wujiaxu@sogou-inc.com)
  13. */
  14. #include <Winsock2.h>
  15. #include <Ioapiset.h>
  16. #include <Mswsock.h>
  17. #include <Synchapi.h>
  18. #include <stdint.h>
  19. #include <string.h>
  20. #include <atomic>
  21. #include <chrono>
  22. #include <set>
  23. #include "WinPoller.h"
  24. #define GET_CURRENT_MS std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()
  25. #define IOCP_KEY_HANDLE 1
  26. #define IOCP_KEY_STOP 2
  27. static OVERLAPPED __stop_overlap;
  28. class IOCPData
  29. {
  30. public:
  31. poller_data data;
  32. OVERLAPPED overlap;
  33. int64_t deadline;
  34. bool cancel_by_timer;
  35. bool in_rbtree;
  36. bool queue_out;
  37. IOCPData(const struct poller_data *d, int t)
  38. {
  39. data = *d;
  40. memset(&overlap, 0, sizeof (OVERLAPPED));
  41. deadline = t;
  42. cancel_by_timer = false;
  43. in_rbtree = false;
  44. queue_out = false;
  45. ref = 1;
  46. }
  47. void incref()
  48. {
  49. ref++;
  50. }
  51. void decref()
  52. {
  53. if (--ref == 0)
  54. delete this;
  55. }
  56. private:
  57. ~IOCPData() { }
  58. std::atomic<int> ref;
  59. };
  60. static inline bool operator<(const IOCPData& x, const IOCPData& y)
  61. {
  62. if (x.deadline != y.deadline)
  63. return x.deadline < y.deadline;
  64. return (const ULONG_PTR)(&x.overlap) < (const ULONG_PTR)(&y.overlap);
  65. }
  66. class CMP
  67. {
  68. public:
  69. bool operator() (IOCPData *x, IOCPData *y) const
  70. {
  71. return *x < *y;
  72. }
  73. };
  74. WinPoller::WinPoller(size_t poller_threads)
  75. {
  76. timer_queue_ = new std::set<IOCPData *, CMP>();
  77. timer_thread_ = NULL;
  78. stop_ = false;
  79. timer_handle_ = CreateWaitableTimer(NULL, FALSE, NULL);
  80. iocp_ = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, (DWORD)poller_threads);
  81. GUID GuidConnectEx = WSAID_CONNECTEX;
  82. //GUID GuidDisconnectEx = WSAID_DISCONNECTEX;
  83. DWORD dwBytes;
  84. lpfn_sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
  85. if (WSAIoctl(lpfn_sockfd_, SIO_GET_EXTENSION_FUNCTION_POINTER,
  86. &GuidConnectEx, sizeof(GuidConnectEx),
  87. &lpfn_connectex_, sizeof(lpfn_connectex_),
  88. &dwBytes, NULL, NULL) == SOCKET_ERROR)
  89. lpfn_connectex_ = NULL;
  90. /*
  91. if (WSAIoctl(lpfn_sockfd_, SIO_GET_EXTENSION_FUNCTION_POINTER,
  92. &GuidDisconnectEx, sizeof(GuidDisconnectEx),
  93. &lpfn_disconnectex_, sizeof(lpfn_disconnectex_),
  94. &dwBytes, NULL, NULL) == SOCKET_ERROR)
  95. lpfn_disconnectex_ = NULL;*/
  96. if (!timer_handle_ || !iocp_ || !lpfn_connectex_)
  97. abort();
  98. }
  99. WinPoller::~WinPoller()
  100. {
  101. closesocket(lpfn_sockfd_);
  102. CloseHandle(iocp_);
  103. CloseHandle(timer_handle_);
  104. delete (std::set<IOCPData *, CMP> *)timer_queue_;
  105. }
  106. int WinPoller::start()
  107. {
  108. timer_thread_ = new std::thread(&WinPoller::timer_routine, this);
  109. stop_ = false;
  110. return 0;
  111. }
  112. void WinPoller::stop()
  113. {
  114. LARGE_INTEGER due;
  115. due.QuadPart = -1;
  116. stop_ = true;
  117. SetWaitableTimer(timer_handle_, &due, 0, NULL, NULL, FALSE);//通知定时器线程1ns后退出等待
  118. if (timer_thread_)
  119. {
  120. timer_thread_->join();
  121. delete timer_thread_;
  122. timer_thread_ = NULL;
  123. }
  124. PostQueuedCompletionStatus(iocp_, sizeof (OVERLAPPED),
  125. IOCP_KEY_STOP, &__stop_overlap);
  126. }
  127. void WinPoller::timer_routine()
  128. {
  129. auto *timer_queue = (std::set<IOCPData *, CMP> *)timer_queue_;
  130. while (!stop_)
  131. {
  132. if (WaitForSingleObject(timer_handle_, INFINITE) == WAIT_OBJECT_0)
  133. {
  134. std::lock_guard<std::mutex> lock(timer_mutex_);
  135. if (timer_queue->empty())
  136. continue;
  137. int64_t cur_ms = GET_CURRENT_MS;
  138. while (!timer_queue->empty())
  139. {
  140. const auto it = timer_queue->cbegin();
  141. IOCPData *iocp_data = *it;
  142. if (cur_ms < iocp_data->deadline)
  143. {
  144. LARGE_INTEGER due;
  145. due.QuadPart = iocp_data->deadline - cur_ms;
  146. due.QuadPart *= -10000;
  147. SetWaitableTimer(timer_handle_, &due, 0, NULL, NULL, FALSE);
  148. break;
  149. }
  150. iocp_data->in_rbtree = false;
  151. iocp_data->cancel_by_timer = true;
  152. if (iocp_data->data.operation == PD_OP_SLEEP)
  153. PostQueuedCompletionStatus(iocp_, sizeof IOCPData, IOCP_KEY_HANDLE, &iocp_data->overlap);
  154. else if (CancelIoEx(iocp_data->data.handle, &iocp_data->overlap) == 0 && GetLastError() == ERROR_NOT_FOUND)
  155. iocp_data->cancel_by_timer = false;
  156. timer_queue->erase(it);
  157. iocp_data->decref();
  158. }
  159. }
  160. }
  161. std::lock_guard<std::mutex> lock(timer_mutex_);
  162. while (!timer_queue->empty())
  163. {
  164. const auto it = timer_queue->cbegin();
  165. IOCPData *iocp_data = *it;
  166. iocp_data->in_rbtree = false;
  167. if (iocp_data->data.operation == PD_OP_SLEEP)
  168. PostQueuedCompletionStatus(iocp_, sizeof IOCPData, IOCP_KEY_HANDLE, &iocp_data->overlap);
  169. else
  170. CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
  171. timer_queue->erase(it);
  172. iocp_data->decref();
  173. }
  174. }
  175. int WinPoller::bind(HANDLE handle)
  176. {
  177. if (CreateIoCompletionPort(handle, iocp_, IOCP_KEY_HANDLE, 0) != NULL)
  178. return 0;
  179. errno = GetLastError();
  180. return -1;
  181. }
  182. void WinPoller::unbind_socket(SOCKET sockfd) const
  183. {
  184. CancelIoEx((HANDLE)sockfd, NULL);
  185. shutdown(sockfd, SD_BOTH);
  186. }
  187. int WinPoller::cancel_pending_io(HANDLE handle) const
  188. {
  189. if (CancelIoEx(handle, NULL) != 0)
  190. return 0;
  191. errno = GetLastError();
  192. return -1;
  193. }
  194. static int __accept_io(IOCPData *iocp_data, int timeout)
  195. {
  196. AcceptConext *ctx = (AcceptConext *)iocp_data->data.context;
  197. DWORD dwBytes;
  198. BOOL ret = AcceptEx((SOCKET)iocp_data->data.handle, ctx->accept_sockfd,
  199. ctx->buf, 0, ACCEPT_ADDR_SIZE, ACCEPT_ADDR_SIZE,
  200. &dwBytes, &iocp_data->overlap);
  201. if (ret == TRUE || WSAGetLastError() == ERROR_IO_PENDING)
  202. {
  203. if (ret != TRUE && timeout == 0)
  204. CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
  205. return 0;
  206. }
  207. else
  208. errno = WSAGetLastError();
  209. return -1;
  210. }
  211. static int __accept_udp_io(IOCPData* iocp_data, int timeout)
  212. {
  213. UdpAcceptCtx* ctx = (UdpAcceptCtx*)iocp_data->data.context;
  214. DWORD dwFlag = 0, dwRecv = 0;
  215. int ret = WSARecvFrom((SOCKET)iocp_data->data.handle, &(ctx->wsaBuf), 1, &dwRecv, &dwFlag, (struct sockaddr*)&(ctx->remoteAddr), &ctx->remoteAddrLen, &iocp_data->overlap, NULL);
  216. int err = WSAGetLastError();
  217. if (ret == TRUE || err == ERROR_IO_PENDING)
  218. {
  219. if (ret != TRUE && timeout == 0)
  220. CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
  221. return 0;
  222. }
  223. else
  224. errno = WSAGetLastError();
  225. return -1;
  226. }
  227. static int __connect_io(IOCPData *iocp_data, int timeout, void *lpfn)
  228. {
  229. ConnectContext *ctx = (ConnectContext *)iocp_data->data.context;
  230. LPFN_CONNECTEX lpfn_connectex = (LPFN_CONNECTEX)lpfn;
  231. BOOL ret = lpfn_connectex((SOCKET)iocp_data->data.handle,
  232. ctx->addr, ctx->addrlen, NULL, 0, NULL,
  233. &iocp_data->overlap);
  234. if (ret == TRUE || WSAGetLastError() == ERROR_IO_PENDING)
  235. {
  236. if (ret != TRUE && timeout == 0)
  237. CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
  238. return 0;
  239. }
  240. errno = WSAGetLastError();
  241. return -1;
  242. }
  243. static int __read_io(IOCPData *iocp_data, int timeout)
  244. {
  245. ReadContext *ctx = (ReadContext *)iocp_data->data.context;
  246. DWORD Flags = 0;
  247. int ret = WSARecv((SOCKET)iocp_data->data.handle, &ctx->buffer, 1, NULL, &Flags, &iocp_data->overlap, NULL);
  248. if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
  249. {
  250. if (ret != 0 && timeout == 0)
  251. CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
  252. return 0;
  253. }
  254. errno = WSAGetLastError();
  255. return -1;
  256. }
  257. static int __write_io(IOCPData *iocp_data, int timeout)
  258. {
  259. WriteContext *ctx = (WriteContext *)iocp_data->data.context;
  260. int ret = WSASend((SOCKET)iocp_data->data.handle, ctx->buffers, ctx->count, NULL, 0, &iocp_data->overlap, NULL);
  261. if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
  262. {
  263. if (ret != 0 && timeout == 0)
  264. CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
  265. return 0;
  266. }
  267. errno = WSAGetLastError();
  268. return -1;
  269. }
  270. static int __sleep_io(IOCPData *iocp_data, int timeout, HANDLE iocp)
  271. {
  272. if (timeout == 0)
  273. {
  274. if (PostQueuedCompletionStatus(iocp, sizeof IOCPData, IOCP_KEY_HANDLE, &iocp_data->overlap) != 0)
  275. return 0;
  276. errno = GetLastError();
  277. return -1;
  278. }
  279. return 0;
  280. }
  281. int WinPoller::transfer(const struct poller_data *data, DWORD iobytes)
  282. {
  283. if (data->operation != PD_OP_USER)
  284. {
  285. errno = EINVAL;
  286. return -1;
  287. }
  288. IOCPData *iocp_data = new IOCPData(data, -1);
  289. if (PostQueuedCompletionStatus(iocp_, iobytes, IOCP_KEY_HANDLE, &iocp_data->overlap) != 0)
  290. return 0;
  291. iocp_data->decref();
  292. errno = GetLastError();
  293. return -1;
  294. }
  295. int WinPoller::put_io(const struct poller_data *data, int timeout)
  296. {
  297. auto *timer_queue = (std::set<IOCPData *, CMP> *)timer_queue_;
  298. IOCPData *iocp_data = new IOCPData(data, timeout);
  299. bool succ;
  300. iocp_data->incref();//for timeout
  301. switch (data->operation & 0xFF)
  302. {
  303. case PD_OP_READ:
  304. succ = (__read_io(iocp_data, timeout) >= 0);
  305. break;
  306. case PD_OP_WRITE:
  307. succ = (__write_io(iocp_data, timeout) >= 0);
  308. break;
  309. case PD_OP_ACCEPT:
  310. succ = (__accept_io(iocp_data, timeout) >= 0);
  311. break;
  312. case PD_OP_ACCEPT+100:
  313. succ = (__accept_udp_io(iocp_data, timeout) >= 0);
  314. break;
  315. case PD_OP_CONNECT:
  316. succ = (__connect_io(iocp_data, timeout, lpfn_connectex_) >= 0);
  317. break;
  318. case PD_OP_SLEEP:
  319. succ = (__sleep_io(iocp_data, timeout, iocp_) >= 0);
  320. break;
  321. default:
  322. succ = false;
  323. errno = EINVAL;
  324. break;
  325. }
  326. if (timeout <= 0)
  327. iocp_data->decref();
  328. if (!succ)
  329. {
  330. iocp_data->decref();
  331. return -1;
  332. }
  333. if (timeout > 0)
  334. {
  335. iocp_data->deadline += GET_CURRENT_MS;
  336. timer_mutex_.lock();
  337. if (!iocp_data->queue_out)
  338. {
  339. iocp_data->in_rbtree = true;
  340. timer_queue->insert(iocp_data);
  341. if (*timer_queue->cbegin() == iocp_data)
  342. {
  343. LARGE_INTEGER due;
  344. due.QuadPart = timeout;
  345. due.QuadPart *= -10000;
  346. SetWaitableTimer(timer_handle_, &due, 0, NULL, NULL, FALSE);
  347. }
  348. }
  349. timer_mutex_.unlock();
  350. }
  351. return 0;
  352. }
  353. static void __accept_on_success(struct poller_result *res)
  354. {
  355. SOCKET listen_sockfd = (SOCKET)res->data.handle;
  356. AcceptConext *ctx = (AcceptConext *)res->data.context;
  357. struct sockaddr *local;
  358. struct sockaddr *remote;
  359. int local_len = sizeof (struct sockaddr);
  360. int remote_len = sizeof (struct sockaddr);
  361. int seconds;
  362. int seconds_len = sizeof (int);
  363. if (getsockopt(ctx->accept_sockfd, SOL_SOCKET, SO_CONNECT_TIME, (char *)&seconds, &seconds_len) == 0)
  364. {
  365. if (setsockopt(ctx->accept_sockfd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&listen_sockfd, sizeof (listen_sockfd)) == 0)
  366. {
  367. GetAcceptExSockaddrs(ctx->buf, 0, ACCEPT_ADDR_SIZE, ACCEPT_ADDR_SIZE, &local, &local_len, &remote, &remote_len);
  368. ctx->remote = remote;
  369. ctx->remote_len = remote_len;
  370. return;
  371. }
  372. }
  373. res->state = PR_ST_ERROR;
  374. res->error = WSAGetLastError();
  375. }
  376. static void __connect_on_success(struct poller_result *res)
  377. {
  378. SOCKET sockfd = (SOCKET)res->data.handle;
  379. ConnectContext *ctx = (ConnectContext *)res->data.context;
  380. int seconds;
  381. int seconds_len = sizeof (int);
  382. if (getsockopt(sockfd, SOL_SOCKET, SO_CONNECT_TIME, (char *)&seconds, &seconds_len) == 0)
  383. {
  384. //if (seconds == 0xFFFFFFFF) error?
  385. if (setsockopt(sockfd, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0) == 0)
  386. return;
  387. }
  388. res->state = PR_ST_ERROR;
  389. res->error = WSAGetLastError();
  390. }
  391. int WinPoller::get_io_result(struct poller_result *res, int timeout)
  392. {
  393. DWORD bytes_transferred;
  394. ULONG_PTR completion_key;
  395. OVERLAPPED* pOverlapped;
  396. DWORD dwMilliseconds;
  397. if (stop_)
  398. dwMilliseconds = 100;
  399. else if (timeout >= 0)
  400. dwMilliseconds = timeout;
  401. else
  402. dwMilliseconds = INFINITE;
  403. if (GetQueuedCompletionStatus(iocp_, &bytes_transferred, &completion_key,
  404. &pOverlapped, dwMilliseconds) == FALSE)
  405. {
  406. res->state = PR_ST_ERROR;
  407. res->error = GetLastError();
  408. if (pOverlapped == NULL && res->error == ERROR_ABANDONED_WAIT_0)
  409. return -1;// IOCP closed
  410. if (res->error == ERROR_OPERATION_ABORTED)
  411. res->state = PR_ST_STOPPED;
  412. }
  413. else if (pOverlapped == NULL)
  414. {
  415. // An unrecoverable error occurred in the completion port.
  416. // Wait for the next notification
  417. res->state = PR_ST_ERROR;
  418. res->error = ENOENT;
  419. }
  420. else if (bytes_transferred == 0)
  421. {
  422. res->state = PR_ST_FINISHED;
  423. res->error = ECONNRESET;
  424. }
  425. else
  426. {
  427. res->state = PR_ST_SUCCESS;
  428. res->error = 0;
  429. }
  430. if (!pOverlapped)
  431. return 0;
  432. res->iobytes = bytes_transferred;
  433. if (completion_key == IOCP_KEY_STOP)
  434. {
  435. PostQueuedCompletionStatus(iocp_, sizeof (OVERLAPPED),
  436. IOCP_KEY_STOP, &__stop_overlap);
  437. //return 0;
  438. return -1;// Thread over
  439. }
  440. IOCPData *iocp_data = CONTAINING_RECORD(pOverlapped, IOCPData, overlap);
  441. if (iocp_data->deadline > 0)// timeout > 0
  442. {
  443. timer_mutex_.lock();
  444. iocp_data->queue_out = true;
  445. if (iocp_data->in_rbtree)
  446. {
  447. iocp_data->in_rbtree = false;
  448. ((std::set<IOCPData *, CMP> *)timer_queue_)->erase(iocp_data);
  449. iocp_data->decref();
  450. }
  451. timer_mutex_.unlock();
  452. if (res->state == PR_ST_STOPPED)
  453. {
  454. std::lock_guard<std::mutex> lock(timer_mutex_);
  455. if (iocp_data->cancel_by_timer)
  456. {
  457. res->state = PR_ST_TIMEOUT;
  458. res->error = ETIMEDOUT;
  459. }
  460. }
  461. }
  462. else if (iocp_data->deadline == 0 && res->state == PR_ST_STOPPED)// timeout == 0
  463. {
  464. res->state = PR_ST_TIMEOUT;
  465. res->error = ETIMEDOUT;
  466. }
  467. res->data = iocp_data->data;
  468. if (res->state == PR_ST_SUCCESS || res->state == PR_ST_FINISHED)
  469. {
  470. switch (res->data.operation)
  471. {
  472. case PD_OP_ACCEPT:
  473. __accept_on_success(res);
  474. break;
  475. case PD_OP_CONNECT:
  476. __connect_on_success(res);
  477. break;
  478. }
  479. }
  480. iocp_data->decref();
  481. return 1;
  482. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/64931
推荐阅读
相关标签
  

闽ICP备14008679号