赞
踩
之前我们已经通过博客《Elasticsearch学习--ES源码下载、导入及运行》了解学习到如何在本地将Elasticsearch服务运行起来,整个2018年下半年过去了,由于在学习其他知识,对于ES学习这块就暂时先放下的,接下来我们就对ES有一个系统的学习。
这篇博客我们通过访问:http://localhost:9200/ 来简单了解一下ES对请求的处理流程。
1、Netty4HttpRequestHandler
ES提供Netty4HttpRequestHandler作为Http请求的处理器,基于Netty(高性能网络框架)实现,提供channelRead0方法来接收请求。在channelRead0方法中主要创建对象Netty4HttpRequest和Netty4HttpChannel,接下来交由Netty4HttpServerTransport去处理。
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
-
- //省略部分代码
-
- final Netty4HttpRequest httpRequest;
- try {
- httpRequest = new Netty4HttpRequest(serverTransport.xContentRegistry, copy, ctx.channel());
- } catch (Exception ex) {
- if (pipelinedRequest != null) {
- pipelinedRequest.release();
- }
- throw ex;
- }
- final Netty4HttpChannel channel =
- new Netty4HttpChannel(serverTransport, httpRequest, pipelinedRequest, detailedErrorsEnabled, threadContext);
-
- if (request.decoderResult().isSuccess()) {
- serverTransport.dispatchRequest(httpRequest, channel);
- } else {
- assert request.decoderResult().isFailure();
- serverTransport.dispatchBadRequest(httpRequest, channel, request.decoderResult().cause());
- }
- }
2、Netty4HttpServerTransport
在Netty4HttpServerTransport的dispatchRequest中没有做过多的业务处理,然后交由Dispatcher去处理。
- void dispatchRequest(final RestRequest request, final RestChannel channel) {
- final ThreadContext threadContext = threadPool.getThreadContext();
- try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
- dispatcher.dispatchRequest(request, channel, threadContext);
- }
- }
3、Dispatcher
Dispatcher的实现类RestController用来处理所有的Http请求,在tryAllHandlers中会查找所有的请求处理器来处理请求。
- @Override
- public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
- if (request.rawPath().equals("/favicon.ico")) {
- handleFavicon(request, channel);
- return;
- }
- try {
- tryAllHandlers(request, channel, threadContext);
- } catch (Exception e) {
- try {
- channel.sendResponse(new BytesRestResponse(channel, e));
- } catch (Exception inner) {
- inner.addSuppressed(e);
- logger.error((Supplier<?>) () ->
- new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
- }
- }
- }
在tryAllHandlers中会查找RestHandler处理器
- void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
-
- //省略部分代码
-
- //根据请求查找所有的处理器
- Iterator<MethodHandlers> allHandlers = getAllHandlers(request);
- for (Iterator<MethodHandlers> it = allHandlers; it.hasNext(); ) {
- final Optional<RestHandler> mHandler = Optional.ofNullable(it.next()).flatMap(mh -> mh.getHandler(request.method()));
- requestHandled = dispatchRequest(request, channel, client, mHandler);
- if (requestHandled) {
- break;
- }
- }
-
- // If request has not been handled, fallback to a bad request error.
- if (requestHandled == false) {
- handleBadRequest(request, channel);
- }
- }
在dispatchRequest方法中会根据查找对应的RestHandler的实现类,这样调用RestHandler的handleRequest方法,处理Http请求。
- boolean dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client,
- final Optional<RestHandler> mHandler) throws Exception {
-
- //省略部分代码
- final RestHandler wrappedHandler = mHandler.map(h -> handlerWrapper.apply(h)).get();
- wrappedHandler.handleRequest(request, responseChannel, client);
-
- //省略部分代码
-
- return requestHandled;
- }
RestHandler的实现类RestMainAction
4、RestHandler
handleRequest的实现方法在抽象类BaseRestHandler中,在这个方法中调用抽象方法prepareRequest获取RestChannelConsumer,然后调用action.accept(channel)方法,这里ES采用了 lambda编程,最终的实现及调用者在prepareRequest方法中。
- @Override
- public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
-
- //构建调用者
- final RestChannelConsumer action = prepareRequest(request, client);
-
-
- // execute the action
- action.accept(channel);
- }
prepareRequest在具体的Action的实现类中,在这里就是在RestMainAction中,最终会调用NodeClient的execute方法。
- @Override
- public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
- //执行NodeClient的execute方法
- return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), new RestBuilderListener<MainResponse>(channel) {
- @Override
- public RestResponse buildResponse(MainResponse mainResponse, XContentBuilder builder) throws Exception {
- return convertMainResponse(mainResponse, request, builder);
- }
- });
- }
5、NodeClient
NodeClient的execute方法在父类AbstractClient中,会创建监听器Listener,执行doExecute方法。
- @Override
- public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
- Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
- listener = threadedWrapper.wrap(listener);
- doExecute(action, request, listener);
- }
在doExecute方法中会根据action获取TransportAction的实现类,这里获取的是
- public < Request extends ActionRequest,
- Response extends ActionResponse
- > Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
- return transportAction(action).execute(request, listener);
- }
6、TransportAction
获取TransportAction的实现类首先调用execute方法
- public final Task execute(Request request, ActionListener<Response> listener) {
-
- //注册任务
- Task task = taskManager.register("transport", actionName, request);
- if (task == null) {
- execute(null, request, listener);
- } else {
- //执行任务
- execute(task, request, new ActionListener<Response>() {
- @Override
- public void onResponse(Response response) {
- taskManager.unregister(task);
- listener.onResponse(response);
- }
-
- @Override
- public void onFailure(Exception e) {
- taskManager.unregister(task);
- listener.onFailure(e);
- }
- });
- }
- return task;
- }
在执行execute方法时会获取请求拦截器链,然后执行拦截器链,类似Web服务的Filter过滤器。
- public final void execute(Task task, Request request, ActionListener<Response> listener) {
- ActionRequestValidationException validationException = request.validate();
- if (validationException != null) {
- listener.onFailure(validationException);
- return;
- }
-
- if (task != null && request.getShouldStoreResult()) {
- listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
- }
-
- RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
- requestFilterChain.proceed(task, actionName, request, listener);
- }
在RequestFilterChain中执行proceed方法,最终还是执行action的doExecute方法。
- @Override
- public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
- int i = index.getAndIncrement();
- try {
- if (i < this.action.filters.length) {
- this.action.filters[i].apply(task, actionName, request, listener, this);
- } else if (i == this.action.filters.length) {
- this.action.doExecute(task, request, listener);
- } else {
- listener.onFailure(new IllegalStateException("proceed was called too many times"));
- }
- } catch(Exception e) {
- logger.trace("Error during transport action execution.", e);
- listener.onFailure(e);
- }
- }
TransportAction的实现类TransportMainAction调用doExecute,在此方法中组装数据然后调用ActionListener的onResponse方法返回数据。
- @Override
- protected void doExecute(MainRequest request, ActionListener<MainResponse> listener) {
- ClusterState clusterState = clusterService.state();
- assert Node.NODE_NAME_SETTING.exists(settings);
- final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false;
- listener.onResponse(
- new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, clusterState.getClusterName(),
- clusterState.metaData().clusterUUID(), Build.CURRENT, available));
- }
7、ActionListener
ActionListener的实现类RestActionListener提供onResponse方法
- @Override
- public final void onResponse(Response response) {
- try {
- processResponse(response);
- } catch (Exception e) {
- onFailure(e);
- }
- }
在子类RestResponseListener中会构建返回值,调用Netty4HttpChannel的sendResponse方法返回数据
- @Override
- protected final void processResponse(Response response) throws Exception {
- //返回数据
- channel.sendResponse(buildResponse(response));
- }
对应查询:http://localhost:9200地址返回的数据在RestBuilderListener中构建
- new RestBuilderListener<MainResponse>(channel) {
- @Override
- public RestResponse buildResponse(MainResponse mainResponse, XContentBuilder builder) throws Exception {
- return convertMainResponse(mainResponse, request, builder);
- }
- });
8、 Netty4HttpChannel
在Nett4HttpChannel中调用sendResponse方法返回数据。
- @Override
- public void sendResponse(RestResponse response) {
- // if the response object was created upstream, then use it;
- // otherwise, create a new one
- ByteBuf buffer = Netty4Utils.toByteBuf(response.content());
- final FullHttpResponse resp;
- if (HttpMethod.HEAD.equals(nettyRequest.method())) {
- resp = newResponse(Unpooled.EMPTY_BUFFER);
- } else {
- resp = newResponse(buffer);
- }
- resp.setStatus(getStatus(response.status()));
-
- Netty4CorsHandler.setCorsResponseHeaders(nettyRequest, resp, transport.getCorsConfig());
-
- String opaque = nettyRequest.headers().get("X-Opaque-Id");
- if (opaque != null) {
- setHeaderField(resp, "X-Opaque-Id", opaque);
- }
-
- // Add all custom headers
- addCustomHeaders(resp, response.getHeaders());
- addCustomHeaders(resp, threadContext.getResponseHeaders());
-
- BytesReference content = response.content();
- boolean releaseContent = content instanceof Releasable;
- boolean releaseBytesStreamOutput = bytesOutputOrNull() instanceof ReleasableBytesStreamOutput;
- try {
- // If our response doesn't specify a content-type header, set one
- setHeaderField(resp, HttpHeaderNames.CONTENT_TYPE.toString(), response.contentType(), false);
- // If our response has no content-length, calculate and set one
- setHeaderField(resp, HttpHeaderNames.CONTENT_LENGTH.toString(), String.valueOf(buffer.readableBytes()), false);
-
- addCookies(resp);
-
- final ChannelPromise promise = channel.newPromise();
-
- if (releaseContent) {
- promise.addListener(f -> ((Releasable)content).close());
- }
-
- if (releaseBytesStreamOutput) {
- promise.addListener(f -> bytesOutputOrNull().close());
- }
-
- if (isCloseConnection()) {
- promise.addListener(ChannelFutureListener.CLOSE);
- }
-
- final Object msg;
- if (pipelinedRequest != null) {
- msg = pipelinedRequest.createHttpResponse(resp, promise);
- } else {
- msg = resp;
- }
- channel.writeAndFlush(msg, promise);
- releaseContent = false;
- releaseBytesStreamOutput = false;
- } finally {
- if (releaseContent) {
- ((Releasable) content).close();
- }
- if (releaseBytesStreamOutput) {
- bytesOutputOrNull().close();
- }
- if (pipelinedRequest != null) {
- pipelinedRequest.release();
- }
- }
- }
9、请求执行流程图:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。