当前位置:   article > 正文

httpclient源码分析之MainClientExec

mainclientexec.execute

MainClientExec是HTTP请求处理链中最后一个请求执行环节,负责与另一终端的请求/响应交互,也是很重要的类。

源码版本是4.5.2,主要看execute方法,并在里面添加注释。接着详细说下获取连接的过程。
### execute方法

  1. @Override
  2. public CloseableHttpResponse execute(
  3. final HttpRoute route,
  4. final HttpRequestWrapper request,
  5. final HttpClientContext context,
  6. final HttpExecutionAware execAware) throws IOException, HttpException {
  7. Args.notNull(route, "HTTP route");
  8. Args.notNull(request, "HTTP request");
  9. Args.notNull(context, "HTTP context");
  10. //Auth相关,这里没关注
  11. AuthState targetAuthState = context.getTargetAuthState();
  12. if (targetAuthState == null) {
  13. targetAuthState = new AuthState();
  14. context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
  15. }
  16. AuthState proxyAuthState = context.getProxyAuthState();
  17. if (proxyAuthState == null) {
  18. proxyAuthState = new AuthState();
  19. context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
  20. }
  21. if (request instanceof HttpEntityEnclosingRequest) {
  22. RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
  23. }
  24. //userToken后面作为state,用来从连接池中获取连接的时候使用,默认是null。
  25. //如果设置了值,会设置到连接中,再次获取的时候,则优先取status相等的连接
  26. Object userToken = context.getUserToken();
  27. //ConnectionRequest用来获取HttpClientConnection
  28. //为每一个route设置一个连接池,大小可以配置,默认为2
  29. //从route连接池获取一个连接,优先取status等于userToken的。
  30. //这里没有实质的操作,只是创建一个ConnectionRequest,并将获取连接的操作封装在ConnectionRequest中。
  31. final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
  32. if (execAware != null) {
  33. if (execAware.isAborted()) {
  34. connRequest.cancel();
  35. throw new RequestAbortedException("Request aborted");
  36. } else {
  37. execAware.setCancellable(connRequest);
  38. }
  39. }
  40. final RequestConfig config = context.getRequestConfig();
  41. final HttpClientConnection managedConn;
  42. try {
  43. final int timeout = config.getConnectionRequestTimeout();
  44. //获取连接,这里才执行从连接池中阻塞获取连接的操作,并设置超时时间。
  45. //这里返回的connection,不一定是有效的socket连接,长短连接处理方式不同。
  46. //如果连接没有打开或者不可用,后面会重新建立socket连接。
  47. managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
  48. } catch(final InterruptedException interrupted) {
  49. Thread.currentThread().interrupt();
  50. throw new RequestAbortedException("Request aborted", interrupted);
  51. } catch(final ExecutionException ex) {
  52. Throwable cause = ex.getCause();
  53. if (cause == null) {
  54. cause = ex;
  55. }
  56. throw new RequestAbortedException("Request execution failed", cause);
  57. }
  58. //将连接加入上下文中,暴露连接。
  59. //context就是一个大容器,收藏各种东西,如果觉得有什么资源是需要在别的地方用到的,那就放入context吧。
  60. context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
  61. //是否检查连接的有效性。如果检查不可用,就关闭连接。对于关闭的连接,后面会从三次握手开始,重新建立socket连接。
  62. //如果配置检查,就相当于一个悲观锁,每次请求都会消耗最多30ms来检测,影响性能。4.4版本开始就过时了。
  63. if (config.isStaleConnectionCheckEnabled()) {
  64. // validate connection,首先判断连接是否是打开的
  65. if (managedConn.isOpen()) {
  66. this.log.debug("Stale connection check");
  67. //如果是打开的,进一步判断是否可用
  68. if (managedConn.isStale()) {
  69. this.log.debug("Stale connection detected");
  70. //不可用的时候,需要关闭连接,后面再重新建立连接
  71. managedConn.close();
  72. }
  73. }
  74. }
  75. final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
  76. try {
  77. if (execAware != null) {
  78. execAware.setCancellable(connHolder);
  79. }
  80. HttpResponse response;
  81. for (int execCount = 1;; execCount++) {
  82. //请求是否幂等的,如果不是,则不能retry,抛异常
  83. if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
  84. throw new NonRepeatableRequestException("Cannot retry request " +
  85. "with a non-repeatable request entity.");
  86. }
  87. if (execAware != null && execAware.isAborted()) {
  88. throw new RequestAbortedException("Request aborted");
  89. }
  90. //如果连接没有打开,即连接使用的socket为null,则重新建立连接。
  91. if (!managedConn.isOpen()) {
  92. this.log.debug("Opening connection " + route);
  93. try {
  94. //建立socket连接。
  95. //遍历地址集,成功建立socket连接,就返回,封装在connection中
  96. establishRoute(proxyAuthState, managedConn, route, request, context);
  97. } catch (final TunnelRefusedException ex) {
  98. if (this.log.isDebugEnabled()) {
  99. this.log.debug(ex.getMessage());
  100. }
  101. response = ex.getResponse();
  102. break;
  103. }
  104. }
  105. final int timeout = config.getSocketTimeout();
  106. if (timeout >= 0) {
  107. //设置socketTimeout
  108. managedConn.setSocketTimeout(timeout);
  109. }
  110. if (execAware != null && execAware.isAborted()) {
  111. throw new RequestAbortedException("Request aborted");
  112. }
  113. if (this.log.isDebugEnabled()) {
  114. this.log.debug("Executing request " + request.getRequestLine());
  115. }
  116. if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {
  117. if (this.log.isDebugEnabled()) {
  118. this.log.debug("Target auth state: " + targetAuthState.getState());
  119. }
  120. this.authenticator.generateAuthResponse(request, targetAuthState, context);
  121. }
  122. if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {
  123. if (this.log.isDebugEnabled()) {
  124. this.log.debug("Proxy auth state: " + proxyAuthState.getState());
  125. }
  126. this.authenticator.generateAuthResponse(request, proxyAuthState, context);
  127. }
  128. //和服务器具体交互,发送请求头,如果有响应,再接收响应。
  129. response = requestExecutor.execute(request, managedConn, context);
  130. //根据配置的策略,判断是否保持连接,永久还是一段时长
  131. // The connection is in or can be brought to a re-usable state.
  132. if (reuseStrategy.keepAlive(response, context)) {
  133. // Set the idle duration of this connection
  134. final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
  135. if (this.log.isDebugEnabled()) {
  136. final String s;
  137. if (duration > 0) {
  138. s = "for " + duration + " " + TimeUnit.MILLISECONDS;
  139. } else {
  140. s = "indefinitely";
  141. }
  142. this.log.debug("Connection can be kept alive " + s);
  143. }
  144. connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
  145. connHolder.markReusable();
  146. } else {
  147. connHolder.markNonReusable();
  148. }
  149. //跳过
  150. if (needAuthentication(
  151. targetAuthState, proxyAuthState, route, response, context)) {
  152. // Make sure the response body is fully consumed, if present
  153. final HttpEntity entity = response.getEntity();
  154. if (connHolder.isReusable()) {
  155. EntityUtils.consume(entity);
  156. } else {
  157. managedConn.close();
  158. if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
  159. && proxyAuthState.getAuthScheme() != null
  160. && proxyAuthState.getAuthScheme().isConnectionBased()) {
  161. this.log.debug("Resetting proxy auth state");
  162. proxyAuthState.reset();
  163. }
  164. if (targetAuthState.getState() == AuthProtocolState.SUCCESS
  165. && targetAuthState.getAuthScheme() != null
  166. && targetAuthState.getAuthScheme().isConnectionBased()) {
  167. this.log.debug("Resetting target auth state");
  168. targetAuthState.reset();
  169. }
  170. }
  171. // discard previous auth headers
  172. final HttpRequest original = request.getOriginal();
  173. if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
  174. request.removeHeaders(AUTH.WWW_AUTH_RESP);
  175. }
  176. if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {
  177. request.removeHeaders(AUTH.PROXY_AUTH_RESP);
  178. }
  179. } else {
  180. break;
  181. }
  182. }
  183. if (userToken == null) {
  184. userToken = userTokenHandler.getUserToken(context);
  185. context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
  186. }
  187. if (userToken != null) {
  188. connHolder.setState(userToken);
  189. }
  190. // check for entity, release connection if possible
  191. //判断是否读取了全部的响应,如果是,则释放连接回连接池,
  192. //否则,也要返回连接,以便后面继续从流中读取响应。
  193. final HttpEntity entity = response.getEntity();
  194. if (entity == null || !entity.isStreaming()) {
  195. // connection not needed and (assumed to be) in re-usable state
  196. connHolder.releaseConnection();
  197. return new HttpResponseProxy(response, null);
  198. } else {
  199. return new HttpResponseProxy(response, connHolder);
  200. }
  201. } catch (final ConnectionShutdownException ex) {
  202. final InterruptedIOException ioex = new InterruptedIOException(
  203. "Connection has been shut down");
  204. ioex.initCause(ex);
  205. throw ioex;
  206. } catch (final HttpException ex) {
  207. connHolder.abortConnection();
  208. throw ex;
  209. } catch (final IOException ex) {
  210. connHolder.abortConnection();
  211. throw ex;
  212. } catch (final RuntimeException ex) {
  213. connHolder.abortConnection();
  214. throw ex;
  215. }
  216. }

总结一下关心的大致流程:

  • 创建连接请求
  • 根据连接请求的参数,从连接池中获取一个连接
  • 配置是否需要校验连接可用性。如果检查不可用,就关闭连接。
  • 如果连接没有打开,则创建一个底层的socket连接。
  • 发送请求头部(如果请求中带有entity,则发送)
  • 如果有响应,接收响应(先接收头部,如果有请求主体,则接收)

这里有一点注意一下:
检测连接有效性的时候,报的是SocketTimeOut异常,而真正读响应的时候,报的是Connection reset异常。为什么不一样呢?我还没找到方法验证,但这里很可能是检测的时间很短,只有1ms,首先触发了SocketTimeOut异常,而实际读响应的时候,是不会这么短时间的。

获取连接

接下来详细说说根据ConnectionRequest获取HttpClientConnection。即:
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);

首先看ConnectionRequest为何物:

  1. //org.apache.http.impl.conn.PoolingHttpClientConnectionManager
  2. @Override
  3. public ConnectionRequest requestConnection(
  4. final HttpRoute route,
  5. final Object state) {
  6. Args.notNull(route, "HTTP route");
  7. if (this.log.isDebugEnabled()) {
  8. this.log.debug("Connection request: " + format(route, state) + formatStats(route));
  9. }
  10. //从连接池中获取一个CPoolEntry(Connection的包装类)
  11. final Future<CPoolEntry> future = this.pool.lease(route, state, null);
  12. return new ConnectionRequest() {
  13. @Override
  14. public boolean cancel() {
  15. return future.cancel(true);
  16. }
  17. // ConnectionRequest的get方法。调用leaseConnection方法,并且传入future(CPoolEntry的封装(connection的封装))
  18. @Override
  19. public HttpClientConnection get(
  20. final long timeout,
  21. final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
  22. return leaseConnection(future, timeout, tunit);
  23. }
  24. };
  25. }

所以,ConnectionRequest的get方法,实际是调用PoolingHttpClientConnectionManager的leaseConnection,返回一个HttpClientConnection。
关于获取connection的更多详细信息,可以参考这篇文章,详细讲述了PoolingHttpClientConnectionManager的获取连接给用户的方法。

补充

  • 关于config.isStaleConnectionCheckEnabled():
    如果设置每次请求检查连接是否可用,会影响性能。4.4版本开始过时,但官方推荐使用org.apache.http.impl.conn.PoolingHttpClientConnectionManager#getValidateAfterInactivity()。详细了解看这篇最后补充的校验连接有效的方法,有一个案例分析。

  • 最后返回的HttpResponseProxy带上ConnectionHolder(响应没有一次读完),这篇文章有一个案例了解,查看成功日志的最后几个步骤。RestTemplate读取扩展字段,第二次读取数据。

另一篇关于此段源码的解读,见这里

转载于:https://www.cnblogs.com/shoren/p/httpclient-MainClientExec.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/790898
推荐阅读
相关标签
  

闽ICP备14008679号