当前位置:   article > 正文

Elasticsearch-7.8.0 创建索引底层源码分析(1)_elasticsearch 源码解析

elasticsearch 源码解析

1、前言

本专栏系列基于  elasticsearch-7.8.0  版本分析
陆续打算推出的博文列表如下:

1、创建索引底层源码分析
2、更新索引底层源码分析
3、删除索引底层源码分析
4、elasticsearch中的线程池实现
5、elasticsearch启动过程源码流程分析
6、high level rest client请求流程分析
7、elastic search数据副本模型、读写模型
8、lucene学习总结系列

  • 全文检索的基本原理
  • lucene的总体结构
  • lucene的索引文件格式
  • lucene的打分机制
  • lucene的搜索过程解析

2、Start

本文记录ElasticSearch-7.8.0创建索引源码执行流程,从源码角度看一下创建索引的底层实现和涉及到的服务,比如AllocationService、MasterService、IndexService等,以及这些服务的相关操作,若有不合理的地方,希望各位看官能够不吝赐教。

  • 场景1:创建索引,没有写入文档。
  1. curl -X PUT "localhost:9200/cool3" -d '{
  2.   "settings": {
  3.       "number_of_shards": 3,
  4.       "number_of_replicas": 2
  5.     }
  6. }'

总体流程图

总述:客户端提交创建索引的基本信息(索引名称、分区数、副本数等),提交到服务端,服务端将CreateIndexRequest封装成CreateIndexClusterStateUpdateRequest。根据actionName获得具体响应的action,具体入口是

org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction

拿到响应action后调用实例属性MetadataCreateIndexService#createIndex()进入到onlyCreateIndex(),该实例属性的作用是:负责提交创建索引请求的服务;所以创建索引大致可以分为以下几个主要流程:

1)master节点发起集群状态更新任务

        创建索引会改变 当前集群状态——ClusterState,集群状态只能在主节点上更新,所以onlyCreateIndex方法进来后,就会由ClusterService调起MasterService,并在master节点上发起:提交一批集群状态更新任务——MasterService#submitStateUpdateTasks;经过一系列调用,在master节点运行创建索引任务MasterService#runTasks,再进入到具体的创建索引逻辑,具体调用栈如下

1、MetadataCreateIndexService#onlyCreateIndex
    clusterService.submitStateUpdateTask({...}) // 发起提交集群状态更新任务
2、ClusterService#submitStateUpdateTask
    submitStateUpdateTask(source, updateTask, ...);
3、ClusterService#submitStateUpdateTask
    submitStateUpdateTasks(source, ...);
4、ClusterService#submitStateUpdateTask
    masterService.submitStateUpdateTasks(source, tasks, config, executor);
5、MasterService#submitStateUpdateTasks
    taskBatcher.submitTasks(safeTasks, config.timeout());
6、TaskBatcher#submitTasks
    threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
7、PrioritizedEsThreadPoolExecutor#execute
    execute(command); // 提交创建索引任务
8、EsThreadPoolExecutor#execute
    super.execute(command);
9、java.util.concurrent.ThreadPoolExecutor#execute
    java.util.concurrent.ThreadPoolExecutor.Worker#run
    runWorker(Worker w)
10、java.util.concurrent.ThreadPoolExecutor#runWorker
    task.run();
10、TaskBatcher.BatchedTask#run
    // 创建索引、更新集群的状态信息是在Runnable#run()中,也就是TaskBatcher.BatchedTask#run方法中,这个方法继承了java类的Runnable接口
    runIfNotProcessed(this);
11、TaskBatcher#runIfNotProcessed
    run(updateTask.batchingKey, toExecute, tasksSummary);  // 最终回到run方法
    // MasterService.Batcher#run实现了上面TaskBatcher#run
12、MasterService.Batcher#run
    runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
13、MasterService#runTasks
    final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
14、MasterService#calculateTaskOutputs // 输入创建索引任务,输出集群状态变化结果
    ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
15、MasterService#executeTasks
    clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
16、ClusterStateTaskExecutor#execute
    ClusterState result = execute(currentState); // 抽象类ClusterStateUpdateTask实现了ClusterStateTaskExecutor接口
17、ClusterStateUpdateTask#execute // 该方法:根据当前状态更新群集状态。如果不应更改任何状态,则返回*相同实例*。
18、clusterService.submitStateUpdateTask({
        new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(...) {
                @Override
                public ClusterState execute(ClusterState currentState) throws Exception {
                        return applyCreateIndexRequest(currentState, request, false);
                }
        }
})
// AckedClusterStateUpdateTask继承ClusterStateUpdateTask类且实现AckedClusterStateTaskListener接口
// 回到步骤1、形成闭环;进入创建索引具体流程:applyCreateIndexRequest,详见调用栈-2
......

调用栈-1

调用栈-1——类继承关系

2)匹配索引模板启动IndexService

        创建索引需要通过IndexModule初始化一个IndexService,IndexService主要包含创建、删除、关闭分片等操作,即维护一个索引的基本操作。下面是调用栈

1、MetadataCreateIndexService#onlyCreateIndex
    return applyCreateIndexRequest(currentState, request, false); // 进入创建索引逻辑
2、MetadataCreateIndexService#applyCreateIndexRequest
    return applyCreateIndexRequest(currentState, request, silent, null);
3、MetadataCreateIndexService#applyCreateIndexRequest
    validate(request, currentState); // 索引相关检验
    return applyCreateIndexRequestWithV1Templates(currentState, request, silent, v1Templates, metadataTransformer);
4、MetadataCreateIndexService#applyCreateIndexRequestWithV1Templates
    return applyCreateIndexWithTemporaryService(...)
5、MetadataCreateIndexService#applyCreateIndexWithTemporaryService
    return indicesService.<ClusterState, Exception>withTempIndexService(...)
6、IndicesService#withTempIndexService // 
    final IndexService indexService = createIndexService(...) // 创建IndexService入口
7、IndicesService#createIndexService
    final IndexModule indexModule = new IndexModule(...) // 创建IndexModule
    ... // 为创建索引添加各种监听器
    return indexModule.newIndexService(...) // 新建IndexService
8、IndexModule#newIndexService
    final IndexService indexService = new IndexService(...)
9、IndicesService#withTempIndexService // 回到步骤6、创建IndexService入口,形成闭环
        return indexServiceConsumer.apply(indexService);
10、MetadataCreateIndexService#applyCreateIndexWithTemporaryService
    return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, allocationService::reroute, metadataTransformer); // 步骤6,进入具体创建索引操作
11、AllocationService#reroute
        reroute(allocation); --> shardsAllocator.allocate(allocation);
12、BalancedShardsAllocator#allocate
        balancer.allocateUnassigned(); // 分配未分配的分片
        balancer.moveShards(); // 重新定位无法再留在节点上的分片
        balancer.balance(); // 重新分配分片,以便分片在集群中平衡
13、回到调用栈-1的步骤13、,形成闭环

调用栈-2

3)创建索引

        将索引创建到集群状态中即生成新的cluster state,下面是创建成功后的日志

[es-source-node] [cool3] creating index, cause [api], templates [], shards [3]/[2], mappings []

下面是创建索引的具体实现

  1. static ClusterState clusterStateCreateIndex(ClusterState currentState, Set<ClusterBlock> clusterBlocks, IndexMetadata indexMetadata,
  2. BiFunction<ClusterState, String, ClusterState> rerouteRoutingTable,
  3. BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) {
  4. Metadata.Builder builder = Metadata.builder(currentState.metadata())
  5. .put(indexMetadata, false);
  6. if (metadataTransformer != null) {
  7. metadataTransformer.accept(builder, indexMetadata);
  8. }
  9. Metadata newMetadata = builder.build();
  10. String indexName = indexMetadata.getIndex().getName();
  11. ClusterBlocks.Builder blocks = createClusterBlocksBuilder(currentState, indexName, clusterBlocks);
  12. blocks.updateBlocks(indexMetadata);
  13. ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metadata(newMetadata).build();
  14. RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
  15. .addAsNew(updatedState.metadata().index(indexName));
  16. updatedState = ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build();
  17. // 如果index状态open,执行allocationService.reroute 将分片分配到其他节点
  18. return rerouteRoutingTable.apply(updatedState, "index [" + indexName + "] created");
  19. }

这里详细说下cluster state:cluster state是全局性信息,包含了整个群集中所有分片的元信息(规则, 位置, 大小等信息),并保持每个每节的信息同步;下面的说明4、中,需要关注如何实现发布整体还是发布差异到具体的节点上,后续再分析

1、表示集群的当前状态。
2、除了{@link RoutingNodes}结构之外,cluster state对象是不可变的,该结构是根据{@link RoutingTable}的需求构建的。
3、集群状态只能在主节点上更新。所有更新都在单个线程上执行,并由{@link ClusterService}控制。在每次更新之后,{@link Discovery#publish}方法将集群状态的新版本发布给集群中的所有其他节点。实际的发布机制被委托给{@link Discovery#publish}方法,并取决于发现的类型。
4、集群状态实现{@link Diffable}接口,以支持发布集群状态差异的部分,而不是每次更改时的整个状态。如果某个节点处于集群状态的早期版本中,则发布机制只应将差异发送给该节点。如果某个节点在集群状态的早期版本中不存在,比如新加入的节点,则此节点不太可能具有早期集群状态版本,这时候就应发送完整版本。为了确保差异应用于集群状态的正确版本,每个集群状态版本更新都会生成{@link#stateUUID},唯一地标识该版本的状态。该uuid由{@link ClusterStateDiff#apply}方法验证,以确保应用了正确的差异。如果UUID不匹配,{@link ClusterStateDiff#apply}方法抛出{@link CompatibleClusterStateVersionException},这会导致发布机制将集群状态的完整版本发送到引发此异常的节点。

上面创建完索引后,也就会生成一个新的cluster state,集群状态 cluster state包含更新的路由。接着就会回调到AllocationService#reroute,执行分片分配。

4)AllocationService负责分片路由,将创建成功后的索引分配到分片上

        分片分配的核心逻辑是根据分片规则&分片权重(index、cluster)进行位置判断,然后进行数据移动、移动结束初始化启动、最后调整clusterstate完成分配。

        AllocationService这个服务负责管理集群的节点分配,包括选择节点用于shard allocation——分片分配,管理新加入集群的新节点和分片的重新路由,他的触发条件有:

  • 新增或删除 index 索引
  • node 节点的新增或删除
  • 执行 reroute 命令
  • 修改 replica 副本数量
  • 集群重启

此时我们的场景就是 新建索引 ;分片路由的具体实现逻辑入口如下:AllocationService#reroute

  1. /**
  2. * 在存活的节点中重新路由 路由表
  3. * 如果返回了相同的ClusterState实例,则不会进行任何更改。
  4. */
  5. public ClusterState reroute(ClusterState clusterState, String reason) {
  6. // 1、检查复制副本是否具有需要调整的自动扩展功能。如果需要更改,则返回更新的群集状态;如果不需要更改,则返回相同的群集状态。
  7. ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
  8. // 2、创建一个{@link RoutingNodes}。这是一个开销很大的操作,因此只能调用一次!
  9. RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
  10. // 3、洗牌未分配的节点
  11. routingNodes.unassigned().shuffle();
  12. /**
  13. * Elasticsearch 主要通过两个基础组件来完成分片分配这个过程的: allocator 和 deciders;
  14. * allocator 寻找最优的节点来分配分片;
  15. * allocator 负责找出拥有分片数量最少的节点列表, 按分片数量递增排序, 分片数量较少的会被优先选择; 对于新建索引, allocator 的目标是以更为均衡的方式把新索引的分片分配到集群的节点中;
  16. * deciders 负责判断并决定是否要进行分配;
  17. * deciders 依次遍历 allocator 给出的节点列表, 判断是否要把分片分配给该节点, 比如是否满足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等;
  18. */
  19. RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
  20. clusterInfoService.getClusterInfo(), currentNanoTime());
  21. // 关注这个方法
  22. reroute(allocation);
  23. if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
  24. return clusterState;
  25. }
  26. return buildResultAndLogHealthChange(clusterState, allocation, reason);
  27. }

shard allocation, 是一个将分片分配到节点的过程;可能发生该操作的过程包括:

  • 初始恢复(initial recovery)
  • 副本分配(replica allocation)
  • 重新平衡(rebalance)
  • 节点的新增和删除

分片的分配操作, 是由 master 节点来决定什么时候移动分片, 以及移动到哪个节点上, 以达到集群的均衡,详见——官方文档

哪些分片应该分配到哪些节点上?哪个分片作为主分片, 哪个作为副本分片?
Elasticsearch 主要通过两个基础组件来完成分片分配这个过程的: allocator 和 deciders;

  • allocator 寻找最优的节点来分配分片
  • deciders 负责判断并决定是否要进行分配

① 对于新建的索引:

  • allocator的目标是以更为均衡的方式把新索引的分片分配到集群的节点中;deciders依次遍历allocator给出的节点列表, 判断是否要把分片分配给该节点, 比如是否满足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等;

② 已有的索引

  • allocator 对于主分片, 只允许把主分片指定在已经拥有该分片完整数据的节点上; 对于副本分片, 则是先判断其他节点上是否已有该分片的数据的拷贝, 如果有这样的节点, allocator 则优先把分片分配到这其中一个节点上;

执行真正reroute逻辑,如果有节点没有分配shard,则执行gatewayAllocator.allocateUnassigned。关于gatewayAllocator的分配主要分为primaryShardAllocator和replicaShardAllocator:

  1. primaryShardAllocator.allocateUnassigned(allocation);
  2. replicaShardAllocator.processExistingRecoveries(allocation);
  3. replicaShardAllocator.allocateUnassigned(allocation);

执行数据分片分配BalancedShardsAllocator.allocate(allocation)。该类基于WeightFunction重新分配集群节点node持有shard的分配关系。allocate方法主要分三步:

  1. final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
  2. balancer.allocateUnassigned();
  3. balancer.moveShards();
  4. balancer.balance();

①:allocateUnassigned,根据WeightFunction算法和所有AllocationDecider把所有给定的shard分配一个最小化匹配的node
②:moveShards,根据第一步的结果对需要移动的节点进行移动,移动过程中为RELOCATING,移动过去初始化INITIALIZING
③:负载均衡,rebalance其实是从负载高的node向负载低的做转移。

5)分发集群状态:publish()

        分片分配结束后,再由主节点调用相关服务将集群状态cluster state分发到集群的其他节点上,完成集群状态同步;主节点将集群状态分发到集群中的其他节点上,具体调用栈详见该入口方法,在调用栈-1的步骤13、后的代码中

  1. try {
  2. ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
  3. // new cluster state, notify all listeners
  4. final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
  5. if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
  6. String nodesDeltaSummary = nodesDelta.shortSummary();
  7. if (nodesDeltaSummary.length() > 0) {
  8. logger.info("{}, term: {}, version: {}, delta: {}",
  9. summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary);
  10. }
  11. }
  12. logger.debug("publishing cluster state version [{}]", newClusterState.version());
  13. publish(clusterChangedEvent, taskOutputs, publicationStartTime);
  14. } catch (Exception e) {
  15. handleException(summary, publicationStartTime, newClusterState, e);
  16. }

6)监听器监听创建结果

        等待集群中Active shards恢复到指定数目或者超时返回,将结果返回客户端

* Creates an index in the cluster state and waits for the specified number of shard copies to
* become active (as specified in {@link CreateIndexClusterStateUpdateRequest#waitForActiveShards()})
* before sending the response on the listener.

默认情况下:只要Primary Shard是Active的,也就是wait_for_active_shards指定的分片数量(默认为1),就可以创建索引。这里有两个获取结果的方式,即isAcknowledged()和shardsAcknowledged(),如果cluster state创建成功,isAcknowledged()会返回true(然后等待shardsAcknowledged,如果超时,shardsAcknowledged返回false),否则返回false(不会等待已经started的分片,isShardsAcknowledged也会返回false)。如果Active shards未达到指定的数目,则创建索引请求会阻塞,直到集群中Active shards恢复到指定数目或者超时返回。可参考:ActiveShardsObserver#waitForActiveShards(...)方法

  1. clusterService.submitStateUpdateTask(
  2. "create-index [" + request.index() + "], cause [" + request.cause() + "]",
  3. new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
  4. @Override
  5. protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
  6. // 2、clusterState更新返回
  7. return new ClusterStateUpdateResponse(acknowledged);
  8. }
  9. @Override
  10. public ClusterState execute(ClusterState currentState) throws Exception {
  11. // 1、适配创建索引请求
  12. return applyCreateIndexRequest(currentState, request, false);
  13. }
  14. @Override
  15. public void onFailure(String source, Exception e) {
  16. if (e instanceof ResourceAlreadyExistsException) {
  17. logger.trace(() -> new ParameterizedMessage("[{}] failed to create", request.index()), e);
  18. } else {
  19. logger.debug(() -> new ParameterizedMessage("[{}] failed to create", request.index()), e);
  20. }
  21. super.onFailure(source, e);
  22. }
  23. });
  24. onlyCreateIndex(request, ActionListener.wrap(response -> {
  25. // 检查isAcknowledged
  26. if (response.isAcknowledged()) {
  27. activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
  28. shardsAcknowledged -> {
  29. // 检查shardsAcknowledged;
  30. if (shardsAcknowledged == false) {
  31. logger.debug("[{}] index created, but the operation timed out while waiting for " +
  32. "enough shards to be started.", request.index());
  33. }
  34. listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged));
  35. }, listener::onFailure);
  36. } else {
  37. listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));
  38. }
  39. }, listener::onFailure));

索引创建成功后,客户端响应结果如下,至此,索引创建流程结束。

{
    "acknowledged": true,
    "shards_acknowledged": true,
    "index": "cool3"
}

7)针对创建索引调用栈中的实现细节做一些解释说明

创建索引过程中,如果索引名不规范、或者创建配置不合理,都会在创建过程中进行校验,并且返回对应的报错信息

比如 调用栈-2 中的 索引相关校验

  1. /**
  2. * Validate the name for an index or alias against some static rules.
  3. */
  4. public static void validateIndexOrAliasName(String index, BiFunction<String, String, ? extends RuntimeException> exceptionCtor) {
  5. if (!Strings.validFileName(index)) {
  6. // ('\\', '/', '*', '?', '"', '<', '>', '|', ' ', ','))
  7. throw exceptionCtor.apply(index, "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS);
  8. }
  9. if (index.contains("#")) {
  10. throw exceptionCtor.apply(index, "must not contain '#'");
  11. }
  12. if (index.contains(":")) {
  13. throw exceptionCtor.apply(index, "must not contain ':'");
  14. }
  15. if (index.charAt(0) == '_' || index.charAt(0) == '-' || index.charAt(0) == '+') {
  16. throw exceptionCtor.apply(index, "must not start with '_', '-', or '+'");
  17. }
  18. int byteCount = 0;
  19. try {
  20. byteCount = index.getBytes("UTF-8").length;
  21. } catch (UnsupportedEncodingException e) {
  22. // UTF-8 should always be supported, but rethrow this if it is not for some reason
  23. throw new ElasticsearchException("Unable to determine length of index name", e);
  24. }
  25. if (byteCount > MAX_INDEX_NAME_BYTES) {
  26. throw exceptionCtor.apply(index, "index name is too long, (" + byteCount + " > " + MAX_INDEX_NAME_BYTES + ")");
  27. }
  28. if (index.equals(".") || index.equals("..")) {
  29. throw exceptionCtor.apply(index, "must not be '.' or '..'");
  30. }
  31. }
  1. 校验索引名--validateIndexName(),
    1. )索引名必须小写 -- must be lowercase
    2. )路由表是否已经存在该索引;!抛出索引已经存在异常 -- index {} already exists
    3. )元数据表是否已经存在该索引;同上
    4. )别名是否存在;!抛出别名已经存在异常 -- already exists as alias
       
      1. /**
      2. * Validate the name for an index against some static rules and a cluster state.
      3. */
      4. public void validateIndexName(String index, ClusterState state) {
      5. validateIndexOrAliasName(index, InvalidIndexNameException::new);
      6. if (!index.toLowerCase(Locale.ROOT).equals(index)) {
      7. throw new InvalidIndexNameException(index, "must be lowercase");
      8. }
      9. // NOTE: dot-prefixed index names are validated after template application, not here
      10. if (state.routingTable().hasIndex(index)) {
      11. throw new ResourceAlreadyExistsException(state.routingTable().index(index).getIndex());
      12. }
      13. if (state.metadata().hasIndex(index)) {
      14. throw new ResourceAlreadyExistsException(state.metadata().index(index).getIndex());
      15. }
      16. if (state.metadata().hasAlias(index)) {
      17. throw new InvalidIndexNameException(index, "already exists as alias");
      18. }
      19. }
  2. 校验settings是否正常--validateIndexSettings()
    1. )index.data_path及path.shared_data的设置是否合理(验证配置的索引数据路径(如果有)是否是配置的共享数据路径(如果有)的子路径)
    2. )参数值(非ES自身参数,例如index.merge.enabled、index.uuid)是否有key、value
  1. public void validateIndexSettings(String indexName, final Settings settings, final boolean forbidPrivateIndexSettings)
  2. throws IndexCreationException {
  3. List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings);
  4. if (validationErrors.isEmpty() == false) {
  5. ValidationException validationException = new ValidationException();
  6. validationException.addValidationErrors(validationErrors);
  7. throw new IndexCreationException(indexName, validationException);
  8. }
  9. }

3、Ending

        至此,整个索引创建逻辑大体上分析结束,可以通过流程图,对照调用栈做分析,相关部分,比如重新路由和状态同步,后续再详细补充,包括创建索引后,具体的数据路径生成原理。

详见Elasticsearch-7.8.0集群方式运行源码,并集成IK分词器 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/651565
推荐阅读
相关标签
  

闽ICP备14008679号