当前位置:   article > 正文

Elasticsearch原理学习--Rest请求处理流程_elasticsearch 请求处理类是哪个

elasticsearch 请求处理类是哪个

         之前我们已经通过博客《Elasticsearch学习--ES源码下载、导入及运行》了解学习到如何在本地将Elasticsearch服务运行起来,整个2018年下半年过去了,由于在学习其他知识,对于ES学习这块就暂时先放下的,接下来我们就对ES有一个系统的学习。

这篇博客我们通过访问:http://localhost:9200/ 来简单了解一下ES对请求的处理流程。 

1、Netty4HttpRequestHandler

  ES提供Netty4HttpRequestHandler作为Http请求的处理器,基于Netty(高性能网络框架)实现,提供channelRead0方法来接收请求。在channelRead0方法中主要创建对象Netty4HttpRequest和Netty4HttpChannel,接下来交由Netty4HttpServerTransport去处理。

  1. @Override
  2. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. //省略部分代码
  4. final Netty4HttpRequest httpRequest;
  5. try {
  6. httpRequest = new Netty4HttpRequest(serverTransport.xContentRegistry, copy, ctx.channel());
  7. } catch (Exception ex) {
  8. if (pipelinedRequest != null) {
  9. pipelinedRequest.release();
  10. }
  11. throw ex;
  12. }
  13. final Netty4HttpChannel channel =
  14. new Netty4HttpChannel(serverTransport, httpRequest, pipelinedRequest, detailedErrorsEnabled, threadContext);
  15. if (request.decoderResult().isSuccess()) {
  16. serverTransport.dispatchRequest(httpRequest, channel);
  17. } else {
  18. assert request.decoderResult().isFailure();
  19. serverTransport.dispatchBadRequest(httpRequest, channel, request.decoderResult().cause());
  20. }
  21. }

2、Netty4HttpServerTransport

在Netty4HttpServerTransport的dispatchRequest中没有做过多的业务处理,然后交由Dispatcher去处理。

  1. void dispatchRequest(final RestRequest request, final RestChannel channel) {
  2. final ThreadContext threadContext = threadPool.getThreadContext();
  3. try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
  4. dispatcher.dispatchRequest(request, channel, threadContext);
  5. }
  6. }

3、Dispatcher

  Dispatcher的实现类RestController用来处理所有的Http请求,在tryAllHandlers中会查找所有的请求处理器来处理请求。

  1. @Override
  2. public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
  3. if (request.rawPath().equals("/favicon.ico")) {
  4. handleFavicon(request, channel);
  5. return;
  6. }
  7. try {
  8. tryAllHandlers(request, channel, threadContext);
  9. } catch (Exception e) {
  10. try {
  11. channel.sendResponse(new BytesRestResponse(channel, e));
  12. } catch (Exception inner) {
  13. inner.addSuppressed(e);
  14. logger.error((Supplier<?>) () ->
  15. new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
  16. }
  17. }
  18. }

在tryAllHandlers中会查找RestHandler处理器

  1. void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
  2. //省略部分代码
  3. //根据请求查找所有的处理器
  4. Iterator<MethodHandlers> allHandlers = getAllHandlers(request);
  5. for (Iterator<MethodHandlers> it = allHandlers; it.hasNext(); ) {
  6. final Optional<RestHandler> mHandler = Optional.ofNullable(it.next()).flatMap(mh -> mh.getHandler(request.method()));
  7. requestHandled = dispatchRequest(request, channel, client, mHandler);
  8. if (requestHandled) {
  9. break;
  10. }
  11. }
  12. // If request has not been handled, fallback to a bad request error.
  13. if (requestHandled == false) {
  14. handleBadRequest(request, channel);
  15. }
  16. }

在dispatchRequest方法中会根据查找对应的RestHandler的实现类,这样调用RestHandler的handleRequest方法,处理Http请求。

  1. boolean dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client,
  2. final Optional<RestHandler> mHandler) throws Exception {
  3. //省略部分代码
  4. final RestHandler wrappedHandler = mHandler.map(h -> handlerWrapper.apply(h)).get();
  5. wrappedHandler.handleRequest(request, responseChannel, client);
  6. //省略部分代码
  7. return requestHandled;
  8. }

 RestHandler的实现类RestMainAction

4、RestHandler

handleRequest的实现方法在抽象类BaseRestHandler中,在这个方法中调用抽象方法prepareRequest获取RestChannelConsumer,然后调用action.accept(channel)方法,这里ES采用了 lambda编程,最终的实现及调用者在prepareRequest方法中。

  1. @Override
  2. public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
  3. //构建调用者
  4. final RestChannelConsumer action = prepareRequest(request, client);
  5. // execute the action
  6. action.accept(channel);
  7. }

prepareRequest在具体的Action的实现类中,在这里就是在RestMainAction中,最终会调用NodeClient的execute方法。

  1. @Override
  2. public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
  3. //执行NodeClient的execute方法
  4. return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), new RestBuilderListener<MainResponse>(channel) {
  5. @Override
  6. public RestResponse buildResponse(MainResponse mainResponse, XContentBuilder builder) throws Exception {
  7. return convertMainResponse(mainResponse, request, builder);
  8. }
  9. });
  10. }

5、NodeClient

NodeClient的execute方法在父类AbstractClient中,会创建监听器Listener,执行doExecute方法。

  1. @Override
  2. public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
  3. Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
  4. listener = threadedWrapper.wrap(listener);
  5. doExecute(action, request, listener);
  6. }

在doExecute方法中会根据action获取TransportAction的实现类,这里获取的是

  1. public < Request extends ActionRequest,
  2. Response extends ActionResponse
  3. > Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
  4. return transportAction(action).execute(request, listener);
  5. }

6、TransportAction

获取TransportAction的实现类首先调用execute方法

  1. public final Task execute(Request request, ActionListener<Response> listener) {
  2. //注册任务
  3. Task task = taskManager.register("transport", actionName, request);
  4. if (task == null) {
  5. execute(null, request, listener);
  6. } else {
  7. //执行任务
  8. execute(task, request, new ActionListener<Response>() {
  9. @Override
  10. public void onResponse(Response response) {
  11. taskManager.unregister(task);
  12. listener.onResponse(response);
  13. }
  14. @Override
  15. public void onFailure(Exception e) {
  16. taskManager.unregister(task);
  17. listener.onFailure(e);
  18. }
  19. });
  20. }
  21. return task;
  22. }

在执行execute方法时会获取请求拦截器链,然后执行拦截器链,类似Web服务的Filter过滤器。

  1. public final void execute(Task task, Request request, ActionListener<Response> listener) {
  2. ActionRequestValidationException validationException = request.validate();
  3. if (validationException != null) {
  4. listener.onFailure(validationException);
  5. return;
  6. }
  7. if (task != null && request.getShouldStoreResult()) {
  8. listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
  9. }
  10. RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
  11. requestFilterChain.proceed(task, actionName, request, listener);
  12. }

 在RequestFilterChain中执行proceed方法,最终还是执行action的doExecute方法。

  1. @Override
  2. public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
  3. int i = index.getAndIncrement();
  4. try {
  5. if (i < this.action.filters.length) {
  6. this.action.filters[i].apply(task, actionName, request, listener, this);
  7. } else if (i == this.action.filters.length) {
  8. this.action.doExecute(task, request, listener);
  9. } else {
  10. listener.onFailure(new IllegalStateException("proceed was called too many times"));
  11. }
  12. } catch(Exception e) {
  13. logger.trace("Error during transport action execution.", e);
  14. listener.onFailure(e);
  15. }
  16. }

 TransportAction的实现类TransportMainAction调用doExecute,在此方法中组装数据然后调用ActionListener的onResponse方法返回数据。

  1. @Override
  2. protected void doExecute(MainRequest request, ActionListener<MainResponse> listener) {
  3. ClusterState clusterState = clusterService.state();
  4. assert Node.NODE_NAME_SETTING.exists(settings);
  5. final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false;
  6. listener.onResponse(
  7. new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, clusterState.getClusterName(),
  8. clusterState.metaData().clusterUUID(), Build.CURRENT, available));
  9. }

 7、ActionListener

ActionListener的实现类RestActionListener提供onResponse方法

  1. @Override
  2. public final void onResponse(Response response) {
  3. try {
  4. processResponse(response);
  5. } catch (Exception e) {
  6. onFailure(e);
  7. }
  8. }

 在子类RestResponseListener中会构建返回值,调用Netty4HttpChannel的sendResponse方法返回数据

  1. @Override
  2. protected final void processResponse(Response response) throws Exception {
  3. //返回数据
  4. channel.sendResponse(buildResponse(response));
  5. }

对应查询:http://localhost:9200地址返回的数据在RestBuilderListener中构建

  1. new RestBuilderListener<MainResponse>(channel) {
  2. @Override
  3. public RestResponse buildResponse(MainResponse mainResponse, XContentBuilder builder) throws Exception {
  4. return convertMainResponse(mainResponse, request, builder);
  5. }
  6. });

8、 Netty4HttpChannel

在Nett4HttpChannel中调用sendResponse方法返回数据。

  1. @Override
  2. public void sendResponse(RestResponse response) {
  3. // if the response object was created upstream, then use it;
  4. // otherwise, create a new one
  5. ByteBuf buffer = Netty4Utils.toByteBuf(response.content());
  6. final FullHttpResponse resp;
  7. if (HttpMethod.HEAD.equals(nettyRequest.method())) {
  8. resp = newResponse(Unpooled.EMPTY_BUFFER);
  9. } else {
  10. resp = newResponse(buffer);
  11. }
  12. resp.setStatus(getStatus(response.status()));
  13. Netty4CorsHandler.setCorsResponseHeaders(nettyRequest, resp, transport.getCorsConfig());
  14. String opaque = nettyRequest.headers().get("X-Opaque-Id");
  15. if (opaque != null) {
  16. setHeaderField(resp, "X-Opaque-Id", opaque);
  17. }
  18. // Add all custom headers
  19. addCustomHeaders(resp, response.getHeaders());
  20. addCustomHeaders(resp, threadContext.getResponseHeaders());
  21. BytesReference content = response.content();
  22. boolean releaseContent = content instanceof Releasable;
  23. boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput;
  24. try {
  25. // If our response doesn't specify a content-type header, set one
  26. setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false);
  27. // If our response has no content-length, calculate and set one
  28. setHeaderField(resp, HttpHeaderNames.CONTENT_LENGTH.toString(), String.valueOf(buffer.readableBytes()), false);
  29. addCookies(resp);
  30. final ChannelPromise promise = channel.newPromise();
  31. if (releaseContent) {
  32. promise.addListener(f -> ((Releasable)content).close());
  33. }
  34. if (releaseBytesStreamOutput) {
  35. promise.addListener(f -> bytesOutputOrNull().close());
  36. }
  37. if (isCloseConnection()) {
  38. promise.addListener(ChannelFutureListener.CLOSE);
  39. }
  40. final Object msg;
  41. if (pipelinedRequest != null) {
  42. msg = pipelinedRequest.createHttpResponse(resp, promise);
  43. } else {
  44. msg = resp;
  45. }
  46. channel.writeAndFlush(msg, promise);
  47. releaseContent = false;
  48. releaseBytesStreamOutput = false;
  49. } finally {
  50. if (releaseContent) {
  51. ((Releasable) content).close();
  52. }
  53. if (releaseBytesStreamOutput) {
  54. bytesOutputOrNull().close();
  55. }
  56. if (pipelinedRequest != null) {
  57. pipelinedRequest.release();
  58. }
  59. }
  60. }

9、请求执行流程图:

请求处理流程

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/961105
推荐阅读
相关标签
  

闽ICP备14008679号