当前位置:   article > 正文

Elasticsearch-Create Index_elasticsearch8 设置自动创建index

elasticsearch8 设置自动创建index

2.创建索引

  代码入口:TransportCreateIndexAction,继承自TransportMasterNodeAction#doExecute(Task task, final Request request, ActionListener listener)
  在上述bulk流程中,可以看到,开启了自动创建索引后,当有数据写入时,先会拿到所有索引,过滤、校验后获取到可以创建的索引,再去批量创建索引。下面看下索引创建的过程。

2.1 检查是否为master节点

  索引创建的请求会发送到master节点(实际选举的master,非master角色),如果当前节点为master,则继续执行,否则转发请求到master节点。

// 判断当前节点是否为master节点
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
    final ClusterBlockException blockException = checkBlock(request, clusterState);
    if (blockException != null) {...}
    else {
        /.../
        threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) {
            @Override
            protected void doRun() throws Exception {
              // 当前节点为master,且不存在异常,则往下执行,准备创建索引
                masterOperation(task, request, clusterState, delegate);
            }
        });
    }
} else {
  // 当前节点非master,需要校验获取的master节点是否有异常,即为null
    if (nodes.getMasterNode() == null) {
        retry(null, masterChangePredicate);
    } else {
        DiscoveryNode masterNode = nodes.getMasterNode();
        final String actionName = getMasterActionName(masterNode);
        // 获取对应的action name,注册监听器并转发请求到master节点
        transportService.sendRequest(masterNode, actionName, request,
            new ActionListenerResponseHandler<Response>(listener, TransportMasterNodeAction.this::read) {...});
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2.2 准备创建索引任务

2.2.1 转换Request

在这里插入图片描述
  每个request有对应的cause,即创建索引的原因。例如在bulk写入时自动创建索引,会将cause写为“auto(bulk api)”;客户端通过API创建索引,会将cause写为“api”。将CreateIndexRequest转为CreateIndexClusterStateUpdateRequest,传入alias、如wait_for_active_shards、mapping等。

2.2.2 提交创建索引任务

  在转换成CreateIndexClusterStateUpdateRequest后,进入到MetaDataCreateIndexService#createIndex准备开始创建索引,调用onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
final ActionListener listener)执行,注册监听器监听结果,等待wait_for_active_shards指定的分片数量(默认为1)。这里有两个获取结果的方式,即isAcknowledged()和shardsAcknowledged(),如果cluster state创建成功,isAcknowledged()会返回true(然后等待shardsAcknowledged,如果超时,shardsAcknowledged返回false),否则返回false(不会等待已经started的分片,isShardsAcknowledged也会返回false)。
  进入到onlyCreateIndex(…)后:1. 创建Settings,将request中的setting放进去;2. 对参数进行处理(如果指定的参数不是“index.”开头,且不是“*”结尾,则在参数前面加“index.”,例如 number_of_replicas 会变成 index.number_of_replicas);3. 参数校验(如果参数类型为这两种,则跳过校验:1.内部私有变量,甚至不支持外部指定的参数,且允许忽略校验;2.参数是“archived.”开头,且允许忽略校验),校验参数值是否为空,或者由ES自身维护,或参数不正确等,4.提交StateUpdateTasks。

2.2.3 创建索引

  提交任务时起线程调execute()执行,走到MetaDataCreateIndexService.IndexCreationTask#execute(ClusterState currentState)。

2.2.3.1 参数校验

  1. 首先校验索引名是否合法,包括:索引名大写、(元数据表、路由表中)已经存在该索引、别名已经存在;2. 校验settings是否正常,包括:index.data_path及path.shared_data的设置是否合理(前者不为空但后者为空等,5.2以后已经废弃了)、参数值(非ES自身参数,例如index.merge.enabled、index.uuid)为空、检查是否超过分片数限制;3. 校验别名,包括:指定别名有效(别名非空,长度大于0,且包含至少一个非空字符)、不包含特殊字符(冒号、井号、问号、引号、下划线开头等)、别名或索引名长度超过255字节、索引名或别名为“.”或者“…”、有多个索引关联到该别名、存在索引名与该别名一致的索引。

/**
 * 检查分片数是否已经超过限制
 */
public static Optional<String> checkShardLimit(int newShards, ClusterState state) {
    Settings theseSettings = state.metaData().settings();
    int nodeCount = state.getNodes().getDataNodes().size();
    // 如果没有数据节点,或者没有新增的分片,则正常,返回empty
    if (nodeCount == 0 || newShards < 0) {
        return Optional.empty();
    }
    // 每个节点最大的分片数
    int maxShardsPerNode = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(theseSettings);
    // 总共允许的最大分片数
    int maxShardsInCluster = maxShardsPerNode * nodeCount;
    // 当前所有的分片数
    int currentOpenShards = state.getMetaData().getTotalOpenIndexShards();
    // 分片数超过限制,不能正常建索引
    if ((currentOpenShards + newShards) > maxShardsInCluster) {
        String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
            currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
        return Optional.of(errorMessage);
    }
    return Optional.empty();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
2.2.3.2 整合创建索引的参数项

  查找该索引可以匹配到的模板(按优先级降序排),解析mapping,根据指定mapping与模板设定值进行合并。遍历每个模板,如果:1. 指定的mapping在模板中存在,需要合并;2. 模板指定了type,但是mapping没有;3. mapping指定了type,但是模板没有。合并完mapping后,需要将所有的settings整合,放入indexSettingsBuilder,indexSettingsBuilder是为了生成创建索引的Settings;indexSettingsBuilder检查:1. 是否有创建索引对应的ES版本,如果没有,则取一个版本号(如果当前集群中,存在多个版本的ES节点,获取oldest version,即最老版本的非客户端节点版本;如果都相同,则取当前集群版本);2. 是否设置了分片数,如果没有,7.0版本之前分片数置为5,否则置为1; 3. 副本数如果为空,则置为1;4. 如果模板、API都没有指定auto_expand_replicas参数(根据集群中节点数量,自动拓展副本数,范围 0~5),但settings中存在,则根据settings中值来设置;5. 创建日期,没有设置则取按UTC获取;6. 设置provided_name;7. 设置uuid;8. 判断是否指定了routing shards,即需要路由的分片数,如果指定了,直接使用,否则需要计算(7.0之前直接使用分片数;7.0及以后版本重新计算,计算方式直接附代码);9. 从settings中移除routing shards参数(这个参数当前只与创建索引有关,使用完后就无用了,因此从settings中移除,放到tmpBuilder中)。

// 计算routing shards
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) {
     if (indexVersionCreated.onOrAfter(Version.V_7_0_0)) {
         int log2MaxNumShards = 10; // logBase2(1024)
         int log2NumShards = 32 - Integer.numberOfLeadingZeros(numShards - 1); // ceil(logBase2(numShards))
         int numSplits = log2MaxNumShards - log2NumShards;
         numSplits = Math.max(1, numSplits); // Ensure the index can be split at least once
         return numShards * 1 << numSplits;
     } else {
         return numShards;
     }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
2.2.3.3 创建索引相关服务

  1. 先初始化一个IndexService。IndexService主要包含创建、删除、关闭分片等操作,即维护一个索引的基本操作。
   代码入口及流程:IndicesService#createIndex -> IndicesService#createIndexService -> IndexModule#newIndexService
   如果当前集群是7.0及以后版本,同时有index.optimize_auto_generated_id参数,则失败(即自动生成doc id以提高写入效率,该参数在7.x已经移除);创建对应的indexModule,进入到IndexModule;检查是否开启了query cache,对应生成IndexQueryCache 或 DisabledQueryChache(索引级别,默认开启);初始化一个IndexService;更新索引信息,即使用一个map,保存所有索引和与其相对应的IndexService。
  2. 获取mapperService,合并新老mapping(mapping合并策略只有update和recovery,即创建或更新mapping,和由于实例重启分片迁移等原因恢复时的合并);
  3. 构建IndexMetaData,并生成新的ClusterState
  4. 如果index状态open,执行allocationService.reroute将分片分配到其他节点
  5. 最后删除索引服务(indicesService.removeIndex,分片已经全部路由到其他节点,该节点需要清除)

2.2.4 同步集群状态

  1. 如果是master节点同步集群状态(如果是master)
  2. 通知集群状态监听器listener,其他节点接收到集群状态变化,启动indicesService服务
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/1009123
推荐阅读
相关标签
  

闽ICP备14008679号