当前位置:   article > 正文

(四)elasticsearch 源码之索引流程分析_elasticsearch索引应用mappings源码分析

elasticsearch索引应用mappings源码分析

https://www.cnblogs.com/darcy-yuan/p/17024341.html

1.概览

前面我们讨论了es是如何启动,本文研究下es是如何索引文档的。

下面是启动流程图,我们按照流程图的顺序依次描述。

 

其中主要类的关系如下:

2. 索引流程 (primary)

我们用postman发送请求,创建一个文档

我们发送的是http请求,es也有一套http请求处理逻辑,和spring的mvc类似

  1. // org.elasticsearch.rest.RestController
  2. private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
  3. final int contentLength = request.content().length();
  4. if (contentLength > 0) {
  5. final XContentType xContentType = request.getXContentType(); // 校验content-type
  6. if (xContentType == null) {
  7. sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);
  8. return;
  9. }
  10. if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {
  11. channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT_ACCEPTABLE,
  12. "Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead"));
  13. return;
  14. }
  15. }
  16. RestChannel responseChannel = channel;
  17. try {
  18. if (handler.canTripCircuitBreaker()) {
  19. inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
  20. } else {
  21. inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
  22. }
  23. // iff we could reserve bytes for the request we need to send the response also over this channel
  24. responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
  25. handler.handleRequest(request, responseChannel, client);
  26. } catch (Exception e) {
  27. responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
  28. }
  29. }
  1. // org.elasticsearch.rest.BaseRestHandler
  2. @Override
  3. public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
  4. // prepare the request for execution; has the side effect of touching the request parameters
  5. final RestChannelConsumer action = prepareRequest(request, client);
  6. // validate unconsumed params, but we must exclude params used to format the response
  7. // use a sorted set so the unconsumed parameters appear in a reliable sorted order
  8. final SortedSet<String> unconsumedParams =
  9. request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
  10. // validate the non-response params
  11. if (!unconsumedParams.isEmpty()) {
  12. final Set<String> candidateParams = new HashSet<>();
  13. candidateParams.addAll(request.consumedParams());
  14. candidateParams.addAll(responseParams());
  15. throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
  16. }
  17. if (request.hasContent() && request.isContentConsumed() == false) {
  18. throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
  19. }
  20. usageCount.increment();
  21. // execute the action
  22. action.accept(channel); // 执行action
  23. }
  1. // org.elasticsearch.rest.action.document.RestIndexAction
  2. public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
  3. IndexRequest indexRequest;
  4. final String type = request.param("type");
  5. if (type != null && type.equals(MapperService.SINGLE_MAPPING_NAME) == false) {
  6. deprecationLogger.deprecatedAndMaybeLog("index_with_types", TYPES_DEPRECATION_MESSAGE); // type 已经废弃
  7. indexRequest = new IndexRequest(request.param("index"), type, request.param("id"));
  8. } else {
  9. indexRequest = new IndexRequest(request.param("index"));
  10. indexRequest.id(request.param("id"));
  11. }
  12. indexRequest.routing(request.param("routing"));
  13. indexRequest.setPipeline(request.param("pipeline"));
  14. indexRequest.source(request.requiredContent(), request.getXContentType());
  15. indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
  16. indexRequest.setRefreshPolicy(request.param("refresh"));
  17. indexRequest.version(RestActions.parseVersion(request));
  18. indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
  19. indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
  20. indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
  21. String sOpType = request.param("op_type");
  22. String waitForActiveShards = request.param("wait_for_active_shards");
  23. if (waitForActiveShards != null) {
  24. indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
  25. }
  26. if (sOpType != null) {
  27. indexRequest.opType(sOpType);
  28. }
  29. return channel ->
  30. client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing()))); // 执行index操作的consumer
  31. }

然后我们来看index操作具体是怎么处理的,主要由TransportAction管理

  1. // org.elasticsearch.action.support.TransportAction
  2. public final Task execute(Request request, ActionListener<Response> listener) {
  3. /*
  4. * While this version of execute could delegate to the TaskListener
  5. * version of execute that'd add yet another layer of wrapping on the
  6. * listener and prevent us from using the listener bare if there isn't a
  7. * task. That just seems like too many objects. Thus the two versions of
  8. * this method.
  9. */
  10. Task task = taskManager.register("transport", actionName, request); // 注册任务管理器,call -> task
  11. execute(task, request, new ActionListener<Response>() { // ActionListener 封装
  12. @Override
  13. public void onResponse(Response response) {
  14. try {
  15. taskManager.unregister(task);
  16. } finally {
  17. listener.onResponse(response);
  18. }
  19. }
  20. @Override
  21. public void onFailure(Exception e) {
  22. try {
  23. taskManager.unregister(task);
  24. } finally {
  25. listener.onFailure(e);
  26. }
  27. }
  28. });
  29. return task;
  30. }
  31. ...
  32. public final void execute(Task task, Request request, ActionListener<Response> listener) {
  33. ActionRequestValidationException validationException = request.validate();
  34. if (validationException != null) {
  35. listener.onFailure(validationException);
  36. return;
  37. }
  38. if (task != null && request.getShouldStoreResult()) {
  39. listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
  40. }
  41. RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger); // 链式处理
  42. requestFilterChain.proceed(task, actionName, request, listener);
  43. }
  44. ...
  45. public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
  46. int i = index.getAndIncrement();
  47. try {
  48. if (i < this.action.filters.length) {
  49. this.action.filters[i].apply(task, actionName, request, listener, this); // 先处理过滤器
  50. } else if (i == this.action.filters.length) {
  51. this.action.doExecute(task, request, listener); // 执行action操作
  52. } else {
  53. listener.onFailure(new IllegalStateException("proceed was called too many times"));
  54. }
  55. } catch(Exception e) {
  56. logger.trace("Error during transport action execution.", e);
  57. listener.onFailure(e);
  58. }
  59. }

实际上是TransportBulkAction执行具体操作

  1. // org.elasticsearch.action.bulk.TransportBulkAction
  2. protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
  3. final long startTime = relativeTime();
  4. final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
  5. boolean hasIndexRequestsWithPipelines = false;
  6. final MetaData metaData = clusterService.state().getMetaData();
  7. ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
  8. for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
  9. IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
  10. if (indexRequest != null) {
  11. // get pipeline from request
  12. String pipeline = indexRequest.getPipeline();
  13. if (pipeline == null) { // 不是管道
  14. // start to look for default pipeline via settings found in the index meta data
  15. IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
  16. // check the alias for the index request (this is how normal index requests are modeled)
  17. if (indexMetaData == null && indexRequest.index() != null) {
  18. AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); // 使用别名
  19. if (indexOrAlias != null && indexOrAlias.isAlias()) {
  20. AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
  21. indexMetaData = alias.getWriteIndex();
  22. }
  23. }
  24. // check the alias for the action request (this is how upserts are modeled)
  25. if (indexMetaData == null && actionRequest.index() != null) {
  26. AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());
  27. if (indexOrAlias != null && indexOrAlias.isAlias()) {
  28. AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
  29. indexMetaData = alias.getWriteIndex();
  30. }
  31. }
  32. if (indexMetaData != null) {
  33. // Find the default pipeline if one is defined from and existing index.
  34. String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
  35. indexRequest.setPipeline(defaultPipeline);
  36. if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
  37. hasIndexRequestsWithPipelines = true;
  38. }
  39. } else if (indexRequest.index() != null) {
  40. // No index exists yet (and is valid request), so matching index templates to look for a default pipeline
  41. List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
  42. assert (templates != null);
  43. String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
  44. // order of templates are highest order first, break if we find a default_pipeline
  45. for (IndexTemplateMetaData template : templates) {
  46. final Settings settings = template.settings();
  47. if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
  48. defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
  49. break;
  50. }
  51. }
  52. indexRequest.setPipeline(defaultPipeline);
  53. if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
  54. hasIndexRequestsWithPipelines = true;
  55. }
  56. }
  57. } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
  58. hasIndexRequestsWithPipelines = true;
  59. }
  60. }
  61. }
  62. if (hasIndexRequestsWithPipelines) {
  63. // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
  64. // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
  65. // this path is never taken.
  66. try {
  67. if (clusterService.localNode().isIngestNode()) {
  68. processBulkIndexIngestRequest(task, bulkRequest, listener);
  69. } else {
  70. ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
  71. }
  72. } catch (Exception e) {
  73. listener.onFailure(e);
  74. }
  75. return;
  76. }
  77. if (needToCheck()) { // 根据批量请求自动创建索引,方便后续写入数据
  78. // Attempt to create all the indices that we're going to need during the bulk before we start.
  79. // Step 1: collect all the indices in the request
  80. final Set<String> indices = bulkRequest.requests.stream()
  81. // delete requests should not attempt to create the index (if the index does not
  82. // exists), unless an external versioning is used
  83. .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
  84. || request.versionType() == VersionType.EXTERNAL
  85. || request.versionType() == VersionType.EXTERNAL_GTE)
  86. .map(DocWriteRequest::index)
  87. .collect(Collectors.toSet());
  88. /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
  89. * that we'll use when we try to run the requests. */
  90. final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
  91. Set<String> autoCreateIndices = new HashSet<>();
  92. ClusterState state = clusterService.state();
  93. for (String index : indices) {
  94. boolean shouldAutoCreate;
  95. try {
  96. shouldAutoCreate = shouldAutoCreate(index, state);
  97. } catch (IndexNotFoundException e) {
  98. shouldAutoCreate = false;
  99. indicesThatCannotBeCreated.put(index, e);
  100. }
  101. if (shouldAutoCreate) {
  102. autoCreateIndices.add(index);
  103. }
  104. }
  105. // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
  106. if (autoCreateIndices.isEmpty()) {
  107. executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); // 索引
  108. } else {
  109. final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
  110. for (String index : autoCreateIndices) {
  111. createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
  112. @Override
  113. public void onResponse(CreateIndexResponse result) {
  114. if (counter.decrementAndGet() == 0) {
  115. threadPool.executor(ThreadPool.Names.WRITE).execute(
  116. () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
  117. }
  118. }
  119. @Override
  120. public void onFailure(Exception e) {
  121. if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
  122. // fail all requests involving this index, if create didn't work
  123. for (int i = 0; i < bulkRequest.requests.size(); i++) {
  124. DocWriteRequest<?> request = bulkRequest.requests.get(i);
  125. if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
  126. bulkRequest.requests.set(i, null);
  127. }
  128. }
  129. }
  130. if (counter.decrementAndGet() == 0) {
  131. executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
  132. inner.addSuppressed(e);
  133. listener.onFailure(inner);
  134. }), responses, indicesThatCannotBeCreated);
  135. }
  136. }
  137. });
  138. }
  139. }
  140. } else {
  141. executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
  142. }
  143. }

接下来, BulkOperation将 BulkRequest 转换成 BulkShardRequest,也就是具体在哪个分片上执行操作

  1. // org.elasticsearch.action.bulk.TransportBulkAction
  2. protected void doRun() {
  3. final ClusterState clusterState = observer.setAndGetObservedState();
  4. if (handleBlockExceptions(clusterState)) {
  5. return;
  6. }
  7. final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
  8. MetaData metaData = clusterState.metaData();
  9. for (int i = 0; i < bulkRequest.requests.size(); i++) {
  10. DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
  11. //the request can only be null because we set it to null in the previous step, so it gets ignored
  12. if (docWriteRequest == null) {
  13. continue;
  14. }
  15. if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {
  16. continue;
  17. }
  18. Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); // 解析索引
  19. try {
  20. switch (docWriteRequest.opType()) {
  21. case CREATE:
  22. case INDEX:
  23. IndexRequest indexRequest = (IndexRequest) docWriteRequest;
  24. final IndexMetaData indexMetaData = metaData.index(concreteIndex);
  25. MappingMetaData mappingMd = indexMetaData.mappingOrDefault();
  26. Version indexCreated = indexMetaData.getCreationVersion();
  27. indexRequest.resolveRouting(metaData);
  28. indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); // 校验indexRequest,自动生成id
  29. break;
  30. case UPDATE:
  31. TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(),
  32. (UpdateRequest) docWriteRequest);
  33. break;
  34. case DELETE:
  35. docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
  36. // check if routing is required, if so, throw error if routing wasn't specified
  37. if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName())) {
  38. throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
  39. }
  40. break;
  41. default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
  42. }
  43. } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
  44. BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
  45. docWriteRequest.id(), e);
  46. BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
  47. responses.set(i, bulkItemResponse);
  48. // make sure the request gets never processed again
  49. bulkRequest.requests.set(i, null);
  50. }
  51. }
  52. // first, go over all the requests and create a ShardId -> Operations mapping
  53. Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
  54. for (int i = 0; i < bulkRequest.requests.size(); i++) {
  55. DocWriteRequest<?> request = bulkRequest.requests.get(i);
  56. if (request == null) {
  57. continue;
  58. }
  59. String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
  60. ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
  61. request.routing()).shardId(); // 根据文档id路由确定分片
  62. List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
  63. shardRequests.add(new BulkItemRequest(i, request));
  64. }
  65. if (requestsByShard.isEmpty()) {
  66. listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
  67. buildTookInMillis(startTimeNanos)));
  68. return;
  69. }
  70. final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
  71. String nodeId = clusterService.localNode().getId();
  72. for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
  73. final ShardId shardId = entry.getKey();
  74. final List<BulkItemRequest> requests = entry.getValue();
  75. BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), // 构建BulkShardRequest
  76. requests.toArray(new BulkItemRequest[requests.size()]));
  77. bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
  78. bulkShardRequest.timeout(bulkRequest.timeout());
  79. if (task != null) {
  80. bulkShardRequest.setParentTask(nodeId, task.getId());
  81. }
  82. shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
  83. @Override
  84. public void onResponse(BulkShardResponse bulkShardResponse) {
  85. for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
  86. // we may have no response if item failed
  87. if (bulkItemResponse.getResponse() != null) {
  88. bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
  89. }
  90. responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
  91. }
  92. if (counter.decrementAndGet() == 0) {
  93. finishHim();
  94. }
  95. }
  96. @Override
  97. public void onFailure(Exception e) {
  98. // create failures for all relevant requests
  99. for (BulkItemRequest request : requests) {
  100. final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
  101. DocWriteRequest<?> docWriteRequest = request.request();
  102. responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
  103. new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
  104. }
  105. if (counter.decrementAndGet() == 0) {
  106. finishHim();
  107. }
  108. }
  109. private void finishHim() {
  110. listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
  111. buildTookInMillis(startTimeNanos)));
  112. }
  113. });
  114. }
  115. }

看下id路由逻辑

  1. // org.elasticsearch.cluster.routing.OperationRouting
  2. public static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
  3. final String effectiveRouting;
  4. final int partitionOffset;
  5. if (routing == null) {
  6. assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
  7. effectiveRouting = id;
  8. } else {
  9. effectiveRouting = routing;
  10. }
  11. if (indexMetaData.isRoutingPartitionedIndex()) {
  12. partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
  13. } else {
  14. // we would have still got 0 above but this check just saves us an unnecessary hash calculation
  15. partitionOffset = 0;
  16. }
  17. return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
  18. }
  19. private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
  20. final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset; // hash
  21. // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
  22. // of original index to hash documents
  23. return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
  24. }

然后看看此分片是在当前节点,还是远程节点上,现在进入routing阶段。(笔者这里只启动了一个节点,我们就看下本地节点的逻辑)

  1. // org.elasticsearch.action.support.replication.TransportReplicationAction
  2. protected void doRun() {
  3. setPhase(task, "routing");
  4. final ClusterState state = observer.setAndGetObservedState();
  5. final String concreteIndex = concreteIndex(state, request);
  6. final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
  7. if (blockException != null) {
  8. if (blockException.retryable()) {
  9. logger.trace("cluster is blocked, scheduling a retry", blockException);
  10. retry(blockException);
  11. } else {
  12. finishAsFailed(blockException);
  13. }
  14. } else {
  15. // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
  16. final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
  17. if (indexMetaData == null) {
  18. retry(new IndexNotFoundException(concreteIndex));
  19. return;
  20. }
  21. if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
  22. throw new IndexClosedException(indexMetaData.getIndex());
  23. }
  24. // resolve all derived request fields, so we can route and apply it
  25. resolveRequest(indexMetaData, request);
  26. assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
  27. "request waitForActiveShards must be set in resolveRequest";
  28. final ShardRouting primary = primary(state);
  29. if (retryIfUnavailable(state, primary)) {
  30. return;
  31. }
  32. final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
  33. if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { // 根据路由确定primary在哪个node上,然后和当前node做比较
  34. performLocalAction(state, primary, node, indexMetaData);
  35. } else {
  36. performRemoteAction(state, primary, node);
  37. }
  38. }
  39. }

既然是当前节点,那就是发送内部请求

  1. // org.elasticsearch.transport.TransportService
  2. private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
  3. final TransportRequest request,
  4. final TransportRequestOptions options,
  5. TransportResponseHandler<T> handler) {
  6. if (connection == null) {
  7. throw new IllegalStateException("can't send request to a null connection");
  8. }
  9. DiscoveryNode node = connection.getNode();
  10. Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
  11. ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
  12. // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
  13. final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
  14. final TimeoutHandler timeoutHandler;
  15. if (options.timeout() != null) {
  16. timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
  17. responseHandler.setTimeoutHandler(timeoutHandler);
  18. } else {
  19. timeoutHandler = null;
  20. }
  21. try {
  22. if (lifecycle.stoppedOrClosed()) {
  23. /*
  24. * If we are not started the exception handling will remove the request holder again and calls the handler to notify the
  25. * caller. It will only notify if toStop hasn't done the work yet.
  26. *
  27. * Do not edit this exception message, it is currently relied upon in production code!
  28. */
  29. // TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction
  30. throw new TransportException("TransportService is closed stopped can't send request");
  31. }
  32. if (timeoutHandler != null) {
  33. assert options.timeout() != null;
  34. timeoutHandler.scheduleTimeout(options.timeout());
  35. }
  36. connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
  37. ...
  38. private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
  39. final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
  40. try {
  41. onRequestSent(localNode, requestId, action, request, options);
  42. onRequestReceived(requestId, action);
  43. final RequestHandlerRegistry reg = getRequestHandler(action); // 注册器模式 action -> handler
  44. if (reg == null) {
  45. throw new ActionNotFoundTransportException("Action [" + action + "] not found");
  46. }
  47. final String executor = reg.getExecutor();
  48. if (ThreadPool.Names.SAME.equals(executor)) {
  49. //noinspection unchecked
  50. reg.processMessageReceived(request, channel);
  51. } else {
  52. threadPool.executor(executor).execute(new AbstractRunnable() {
  53. @Override
  54. protected void doRun() throws Exception {
  55. //noinspection unchecked
  56. reg.processMessageReceived(request, channel); // 处理请求
  57. }
  58. @Override
  59. public boolean isForceExecution() {
  60. return reg.isForceExecution();
  61. }
  62. @Override
  63. public void onFailure(Exception e) {
  64. try {
  65. channel.sendResponse(e);
  66. } catch (Exception inner) {
  67. inner.addSuppressed(e);
  68. logger.warn(() -> new ParameterizedMessage(
  69. "failed to notify channel of error message for action [{}]", action), inner);
  70. }
  71. }
  72. @Override
  73. public String toString() {
  74. return "processing of [" + requestId + "][" + action + "]: " + request;
  75. }
  76. });
  77. }

然后获取在分片上的执行请求许可

  1. // org.elasticsearch.action.support.replication.TransportReplicationAction
  2. protected void doRun() throws Exception {
  3. final ShardId shardId = primaryRequest.getRequest().shardId();
  4. final IndexShard indexShard = getIndexShard(shardId);
  5. final ShardRouting shardRouting = indexShard.routingEntry();
  6. // we may end up here if the cluster state used to route the primary is so stale that the underlying
  7. // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
  8. // the replica will take over and a replica will be assigned to the first node.
  9. if (shardRouting.primary() == false) {
  10. throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
  11. }
  12. final String actualAllocationId = shardRouting.allocationId().getId();
  13. if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) {
  14. throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]",
  15. primaryRequest.getTargetAllocationID(), actualAllocationId);
  16. }
  17. final long actualTerm = indexShard.getPendingPrimaryTerm();
  18. if (actualTerm != primaryRequest.getPrimaryTerm()) {
  19. throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]",
  20. primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
  21. }
  22. acquirePrimaryOperationPermit( // 获取在primary分片上执行操作的许可
  23. indexShard,
  24. primaryRequest.getRequest(),
  25. ActionListener.wrap(
  26. releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
  27. e -> {
  28. if (e instanceof ShardNotInPrimaryModeException) {
  29. onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
  30. } else {
  31. onFailure(e);
  32. }
  33. }));
  34. }

现在进入primary阶段

  1. // org.elasticsearch.action.support.replication.TransportReplicationAction
  2. setPhase(replicationTask, "primary");
  3. final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
  4. primaryShardReference.close(); // release shard operation lock before responding to caller
  5. setPhase(replicationTask, "finished");
  6. onCompletionListener.onResponse(response);
  7. }, e -> handleException(primaryShardReference, e));
  8. final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
  9. if (syncGlobalCheckpointAfterOperation) {
  10. final IndexShard shard = primaryShardReference.indexShard;
  11. try {
  12. shard.maybeSyncGlobalCheckpoint("post-operation");
  13. } catch (final Exception e) {
  14. // only log non-closed exceptions
  15. if (ExceptionsHelper.unwrap(
  16. e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
  17. // intentionally swallow, a missed global checkpoint sync should not fail this operation
  18. logger.info(
  19. new ParameterizedMessage(
  20. "{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
  21. }
  22. }
  23. }
  24. referenceClosingListener.onResponse(response);
  25. }, referenceClosingListener::onFailure);
  26. new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
  27. ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
  28. newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();

中间的调用跳转不赘述,最后TransportShardBulkAction 调用索引引引擎

  1. // org.elasticsearch.action.bulk.TransportShardBulkAction
  2. static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
  3. MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
  4. ActionListener<Void> itemDoneListener) throws Exception {
  5. final DocWriteRequest.OpType opType = context.getCurrent().opType();
  6. final UpdateHelper.Result updateResult;
  7. if (opType == DocWriteRequest.OpType.UPDATE) {
  8. final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
  9. try {
  10. updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
  11. } catch (Exception failure) {
  12. // we may fail translating a update to index or delete operation
  13. // we use index result to communicate failure while translating update request
  14. final Engine.Result result =
  15. new Engine.IndexResult(failure, updateRequest.version());
  16. context.setRequestToExecute(updateRequest);
  17. context.markOperationAsExecuted(result);
  18. context.markAsCompleted(context.getExecutionResult());
  19. return true;
  20. }
  21. // execute translated update request
  22. switch (updateResult.getResponseResult()) {
  23. case CREATED:
  24. case UPDATED:
  25. IndexRequest indexRequest = updateResult.action();
  26. IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData();
  27. MappingMetaData mappingMd = metaData.mappingOrDefault();
  28. indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
  29. context.setRequestToExecute(indexRequest);
  30. break;
  31. case DELETED:
  32. context.setRequestToExecute(updateResult.action());
  33. break;
  34. case NOOP:
  35. context.markOperationAsNoOp(updateResult.action());
  36. context.markAsCompleted(context.getExecutionResult());
  37. return true;
  38. default:
  39. throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
  40. }
  41. } else {
  42. context.setRequestToExecute(context.getCurrent());
  43. updateResult = null;
  44. }
  45. assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state
  46. final IndexShard primary = context.getPrimary();
  47. final long version = context.getRequestToExecute().version();
  48. final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
  49. final Engine.Result result;
  50. if (isDelete) {
  51. final DeleteRequest request = context.getRequestToExecute();
  52. result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
  53. request.ifSeqNo(), request.ifPrimaryTerm());
  54. } else {
  55. final IndexRequest request = context.getRequestToExecute();
  56. result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse( // lucene 执行引擎
  57. request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
  58. request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
  59. }

3.索引流程(replica)

在ReplicationOperation的execute方法中,primary分片执行完操作后,监听器会向复制分片发送请求

  1. // org.elasticsearch.action.support.replication.ReplicationOperation
  2. public void execute() throws Exception {
  3. final String activeShardCountFailure = checkActiveShardCount();
  4. final ShardRouting primaryRouting = primary.routingEntry();
  5. final ShardId primaryId = primaryRouting.shardId();
  6. if (activeShardCountFailure != null) {
  7. finishAsFailed(new UnavailableShardsException(primaryId,
  8. "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
  9. return;
  10. }
  11. totalShards.incrementAndGet();
  12. pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
  13. primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure)); // 监听器调用 handlePrimaryResult
  14. }
  15. private void handlePrimaryResult(final PrimaryResultT primaryResult) {
  16. this.primaryResult = primaryResult;
  17. primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
  18. primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());
  19. final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
  20. if (replicaRequest != null) {
  21. if (logger.isTraceEnabled()) {
  22. logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request);
  23. }
  24. // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
  25. // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
  26. // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
  27. // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
  28. // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
  29. // of the sampled replication group, and advanced further than what the given replication group would allow it to.
  30. // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
  31. final long globalCheckpoint = primary.computedGlobalCheckpoint();
  32. // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
  33. // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
  34. // on.
  35. final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
  36. assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
  37. final ReplicationGroup replicationGroup = primary.getReplicationGroup();
  38. markUnavailableShardsAsStale(replicaRequest, replicationGroup);
  39. performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); // 在复制分片上执行操作
  40. }
  41. successfulShards.incrementAndGet(); // mark primary as successful
  42. decPendingAndFinishIfNeeded();
  43. }
  44. ...
  45. private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
  46. final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
  47. // for total stats, add number of unassigned shards and
  48. // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
  49. totalShards.addAndGet(replicationGroup.getSkippedShards().size());
  50. final ShardRouting primaryRouting = primary.routingEntry();
  51. for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { // 轮询各个复制分片
  52. if (shard.isSameAllocation(primaryRouting) == false) {
  53. performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
  54. }
  55. }
  56. }
  57. private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
  58. final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
  59. if (logger.isTraceEnabled()) {
  60. logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
  61. }
  62. totalShards.incrementAndGet();
  63. pendingActions.incrementAndGet();
  64. replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, // 调用代理ReplicasProxy
  65. new ActionListener<ReplicaResponse>() {
  66. @Override
  67. public void onResponse(ReplicaResponse response) {
  68. successfulShards.incrementAndGet();
  69. try {
  70. primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
  71. primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
  72. } catch (final AlreadyClosedException e) {
  73. // the index was deleted or this shard was never activated after a relocation; fall through and finish normally
  74. } catch (final Exception e) {
  75. // fail the primary but fall through and let the rest of operation processing complete
  76. final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
  77. primary.failShard(message, e);
  78. }
  79. decPendingAndFinishIfNeeded();
  80. }
  81. @Override
  82. public void onFailure(Exception replicaException) {
  83. logger.trace(() -> new ParameterizedMessage(
  84. "[{}] failure while performing [{}] on replica {}, request [{}]",
  85. shard.shardId(), opType, shard, replicaRequest), replicaException);
  86. // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
  87. if (TransportActions.isShardNotAvailableException(replicaException) == false) {
  88. RestStatus restStatus = ExceptionsHelper.status(replicaException);
  89. shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
  90. shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
  91. }
  92. String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
  93. replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException,
  94. ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
  95. }
  96. @Override
  97. public String toString() {
  98. return "[" + replicaRequest + "][" + shard + "]";
  99. }
  100. });
  101. }

发送transport请求

  1. // org.elasticsearch.action.support.replication.TransportReplicationAction
  2. public void performOn(
  3. final ShardRouting replica,
  4. final ReplicaRequest request,
  5. final long primaryTerm,
  6. final long globalCheckpoint,
  7. final long maxSeqNoOfUpdatesOrDeletes,
  8. final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
  9. String nodeId = replica.currentNodeId();
  10. final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
  11. if (node == null) {
  12. listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
  13. return;
  14. }
  15. final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
  16. request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
  17. final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(listener,
  18. ReplicaResponse::new);
  19. transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); // 发送transport请求
  20. }

副分片收到请求处理结果与主分片类似,最后调用lucene引擎

  1. // org.elasticsearch.action.bulk.TransportShardBulkAction
  2. private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse, DocWriteRequest<?> docWriteRequest,
  3. IndexShard replica) throws Exception {
  4. final Engine.Result result;
  5. switch (docWriteRequest.opType()) {
  6. case CREATE:
  7. case INDEX:
  8. final IndexRequest indexRequest = (IndexRequest) docWriteRequest;
  9. final ShardId shardId = replica.shardId();
  10. final SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), indexRequest.type(), indexRequest.id(),
  11. indexRequest.source(), indexRequest.getContentType(), indexRequest.routing());
  12. result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(), // 调用lucene引擎
  13. indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
  14. break;
  15. case DELETE:
  16. DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
  17. result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
  18. deleteRequest.type(), deleteRequest.id());
  19. break;
  20. default:
  21. assert false : "Unexpected request operation type on replica: " + docWriteRequest + ";primary result: " + primaryResponse;
  22. throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase());
  23. }

4.总结

本文简单描述了es索引流程,包括了http请求是如何解析的,如何确定分片的。但是仍有许多不足,比如没有讨论远程节点是如何处理的,lucene执行引擎的细节,后面博客会继续探讨这些课题。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/651548
推荐阅读
相关标签
  

闽ICP备14008679号