当前位置:   article > 正文

简析gRPC client 连接管理

grpcclient

简析gRPC client 连接管理

背景

  1. 客户端skd 使用gRPC作为通信协议,定时(大概是120s)向服务器发送pingServer 请求。
  2. 服务端是80端口,如xxx:80.

问题

  1. 发现客户端不断的端口重连服务器的。
  2. 使用netstat -antp

clipboard.png

  1. 如图, 如标红的服务器地址连接是TIME_WAIT,后面有和服务器建立连接 ESTABLISHED。
  2. TIME_WAIT 状态表明是client 端主动断开了连接。
  3. 这和我之前的认知有点冲突,gRPC 应该是长连接,为什么这里每次都断开呢,这样不就长了短连接了吗?
  4. 而且客户端主动断开的,会不会是client端哪里有问题?
  5. 带着疑问,在client 抓了一包,
  6. 发现client 总是受到一个 length17 的包,然后就开始FIN 包,走TCP 挥手的流程。
  7. 使用WireShark 对tcpdump的结果查看,发现这个length 17 的包,是一个GOAWAY 包。

如图:

clipboard.png

  1. 这个是HTTP2定义的一个“优雅”退出的机制。
  2. 这里有HTTP2 GOAWAY stream 包的说明。

HTTP2 GOAWAY 说明

  1. 根据之前的对gRPC的了解,gRPC client 会解析域名,然后会维护一个lb 负载均衡,
  2. 这个应该是gRPC对idle 连接的管理。pingServer 的时间间隔是120s, 但是gRPC 认为中间是idle连接,
  3. 所以通知client 关闭空闲连接?
  4. 为了验证这个想法,修改了一下gRPC 的demo, 因为我们client 端使用是cpp 的gRPC 异步调用方式,
  5. 所以更加gRPC 的异步demo, 写了一个简单访问服务器的async_client

代码:

  1. #include <iostream>
  2. #include <memory>
  3. #include <string>
  4. #include <grpcpp/grpcpp.h>
  5. #include <grpc/support/log.h>
  6. #include <thread>
  7. #include "gateway.grpc.pb.h"
  8. using grpc::Channel;
  9. using grpc::ClientAsyncResponseReader;
  10. using grpc::ClientContext;
  11. using grpc::CompletionQueue;
  12. using grpc::Status;
  13. using yournamespace::PingReq;
  14. using yournamespace::PingResp;
  15. using yournamespace::srv;
  16. class GatewayClient {
  17. public:
  18. explicit GatewayClient(std::shared_ptr<Channel> channel)
  19. : stub_(srv::NewStub(channel)) {}
  20. // Assembles the client's payload and sends it to the server.
  21. //void PingServer(const std::string& user) {
  22. void PingServer() {
  23. // Data we are sending to the server.
  24. PingReq request;
  25. request.set_peerid("1111111111111113");
  26. request.set_clientinfo("");
  27. request.set_capability(1);
  28. request.add_iplist(4197554190);
  29. request.set_tcpport(8080);
  30. request.set_udpport(8080);
  31. request.set_upnpip(4197554190);
  32. request.set_upnpport(8080);
  33. request.set_connectnum(10000);
  34. request.set_downloadingspeed(100);
  35. request.set_uploadingspeed(10);
  36. request.set_maxdownloadspeed(0);
  37. request.set_maxuploadspeed(0);
  38. // Call object to store rpc data
  39. AsyncClientCall* call = new AsyncClientCall;
  40. // stub_->PrepareAsyncSayHello() creates an RPC object, returning
  41. // an instance to store in "call" but does not actually start the RPC
  42. // Because we are using the asynchronous API, we need to hold on to
  43. // the "call" instance in order to get updates on the ongoing RPC.
  44. call->response_reader =
  45. stub_->AsyncPing(&call->context, request, &cq_);
  46. // StartCall initiates the RPC call
  47. //call->response_reader->StartCall();
  48. // Request that, upon completion of the RPC, "reply" be updated with the
  49. // server's response; "status" with the indication of whether the operation
  50. // was successful. Tag the request with the memory address of the call object.
  51. call->response_reader->Finish(&call->reply, &call->status, (void*)call);
  52. }
  53. // Loop while listening for completed responses.
  54. // Prints out the response from the server.
  55. void AsyncCompleteRpc() {
  56. void* got_tag;
  57. bool ok = false;
  58. // Block until the next result is available in the completion queue "cq".
  59. while (cq_.Next(&got_tag, &ok)) {
  60. // The tag in this example is the memory location of the call object
  61. AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
  62. // Verify that the request was completed successfully. Note that "ok"
  63. // corresponds solely to the request for updates introduced by Finish().
  64. GPR_ASSERT(ok);
  65. if (call->status.ok())
  66. std::cout << "xNetClient received: " << call->reply.code() << " task:" << call->reply.tasks_size() <<" pinginterval:"<< call->reply.pinginterval() << std::endl;
  67. else
  68. //std::cout << "RPC failed" << std::endl;
  69. std::cout << ": status = " << call->status.error_code() << " (" << call->status.error_message() << ")" << std::endl;
  70. // Once we're complete, deallocate the call object.
  71. delete call;
  72. }
  73. }
  74. private:
  75. // struct for keeping state and data information
  76. struct AsyncClientCall {
  77. // Container for the data we expect from the server.
  78. PingResp reply;
  79. // Context for the client. It could be used to convey extra information to
  80. // the server and/or tweak certain RPC behaviors.
  81. ClientContext context;
  82. // Storage for the status of the RPC upon completion.
  83. Status status;
  84. std::unique_ptr<ClientAsyncResponseReader<PingResp>> response_reader;
  85. };
  86. // Out of the passed in Channel comes the stub, stored here, our view of the
  87. // server's exposed services.
  88. std::unique_ptr<srv::Stub> stub_;
  89. // The producer-consumer queue we use to communicate asynchronously with the
  90. // gRPC runtime.
  91. CompletionQueue cq_;
  92. };
  93. int main(int argc, char** argv) {
  94. // Instantiate the client. It requires a channel, out of which the actual RPCs
  95. // are created. This channel models a connection to an endpoint (in this case,
  96. // localhost at port 50051). We indicate that the channel isn't authenticated
  97. // (use of InsecureChannelCredentials()).
  98. if (argc < 2){
  99. std::cout << "usage: " <<argv[0]<< " domain:port" << std::endl;
  100. std::cout << "eg: " <<argv[0]<< " gw.xnet.xcloud.sandai.net:80" << std::endl;
  101. return 0;
  102. }
  103. GatewayClient xNetClient(grpc::CreateChannel( argv[1], grpc::InsecureChannelCredentials()));
  104. // Spawn reader thread that loops indefinitely
  105. std::thread thread_ = std::thread(&GatewayClient::AsyncCompleteRpc, &xNetClient);
  106. for (int i = 0; i < 1000; i++) {
  107. xNetClient.PingServer(); // The actual RPC call!
  108. std::this_thread::sleep_for(std::chrono::seconds(120));
  109. }
  110. std::cout << "Press control-c to quit" << std::endl << std::endl;
  111. thread_.join(); //blocks forever
  112. return 0;
  113. }

接下来的时间很简单,运行一下。
使用netstat -natp 观察,可以重新。 async_client 也是断开,重连。
进一步调试发现,把发包的时间修改为10s 的时候,可以保持连接,大于10s基本上连接就会断开。

小结

小结一下:
gRPC 管理连接的方式,默认情况下,大于10s没有数据发送,gRPC 就会认为是个idle 连接。server 端会给client 端发送一个GOAWAY 的包。client 收到这个包之后就会主动关闭连接。下次需要发包的时候,就会重新建立连接。

目前还不知道是不是有配置项修改这个值,对gRPC 的机制还不是很熟,后面再研究一下。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/145971
推荐阅读
相关标签
  

闽ICP备14008679号