赞
踩
代码入口:TransportCreateIndexAction,继承自TransportMasterNodeAction#doExecute(Task task, final Request request, ActionListener listener)
在上述bulk流程中,可以看到,开启了自动创建索引后,当有数据写入时,先会拿到所有索引,过滤、校验后获取到可以创建的索引,再去批量创建索引。下面看下索引创建的过程。
索引创建的请求会发送到master节点(实际选举的master,非master角色),如果当前节点为master,则继续执行,否则转发请求到master节点。
// 判断当前节点是否为master节点 if (nodes.isLocalNodeElectedMaster() || localExecute(request)) { final ClusterBlockException blockException = checkBlock(request, clusterState); if (blockException != null) {...} else { /.../ threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) { @Override protected void doRun() throws Exception { // 当前节点为master,且不存在异常,则往下执行,准备创建索引 masterOperation(task, request, clusterState, delegate); } }); } } else { // 当前节点非master,需要校验获取的master节点是否有异常,即为null if (nodes.getMasterNode() == null) { retry(null, masterChangePredicate); } else { DiscoveryNode masterNode = nodes.getMasterNode(); final String actionName = getMasterActionName(masterNode); // 获取对应的action name,注册监听器并转发请求到master节点 transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener, TransportMasterNodeAction.this::read) {...}); } }
每个request有对应的cause,即创建索引的原因。例如在bulk写入时自动创建索引,会将cause写为“auto(bulk api)”;客户端通过API创建索引,会将cause写为“api”。将CreateIndexRequest转为CreateIndexClusterStateUpdateRequest,传入alias、如wait_for_active_shards、mapping等。
在转换成CreateIndexClusterStateUpdateRequest后,进入到MetaDataCreateIndexService#createIndex准备开始创建索引,调用onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
final ActionListener listener)执行,注册监听器监听结果,等待wait_for_active_shards指定的分片数量(默认为1)。这里有两个获取结果的方式,即isAcknowledged()和shardsAcknowledged(),如果cluster state创建成功,isAcknowledged()会返回true(然后等待shardsAcknowledged,如果超时,shardsAcknowledged返回false),否则返回false(不会等待已经started的分片,isShardsAcknowledged也会返回false)。
进入到onlyCreateIndex(…)后:1. 创建Settings,将request中的setting放进去;2. 对参数进行处理(如果指定的参数不是“index.”开头,且不是“*”结尾,则在参数前面加“index.”,例如 number_of_replicas 会变成 index.number_of_replicas);3. 参数校验(如果参数类型为这两种,则跳过校验:1.内部私有变量,甚至不支持外部指定的参数,且允许忽略校验;2.参数是“archived.”开头,且允许忽略校验),校验参数值是否为空,或者由ES自身维护,或参数不正确等,4.提交StateUpdateTasks。
提交任务时起线程调execute()执行,走到MetaDataCreateIndexService.IndexCreationTask#execute(ClusterState currentState)。
1. 首先校验索引名是否合法,包括:索引名大写、(元数据表、路由表中)已经存在该索引、别名已经存在;2. 校验settings是否正常,包括:index.data_path及path.shared_data的设置是否合理(前者不为空但后者为空等,5.2以后已经废弃了)、参数值(非ES自身参数,例如index.merge.enabled、index.uuid)为空、检查是否超过分片数限制;3. 校验别名,包括:指定别名有效(别名非空,长度大于0,且包含至少一个非空字符)、不包含特殊字符(冒号、井号、问号、引号、下划线开头等)、别名或索引名长度超过255字节、索引名或别名为“.”或者“…”、有多个索引关联到该别名、存在索引名与该别名一致的索引。
/** * 检查分片数是否已经超过限制 */ public static Optional<String> checkShardLimit(int newShards, ClusterState state) { Settings theseSettings = state.metaData().settings(); int nodeCount = state.getNodes().getDataNodes().size(); // 如果没有数据节点,或者没有新增的分片,则正常,返回empty if (nodeCount == 0 || newShards < 0) { return Optional.empty(); } // 每个节点最大的分片数 int maxShardsPerNode = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(theseSettings); // 总共允许的最大分片数 int maxShardsInCluster = maxShardsPerNode * nodeCount; // 当前所有的分片数 int currentOpenShards = state.getMetaData().getTotalOpenIndexShards(); // 分片数超过限制,不能正常建索引 if ((currentOpenShards + newShards) > maxShardsInCluster) { String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" + currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open"; return Optional.of(errorMessage); } return Optional.empty(); }
查找该索引可以匹配到的模板(按优先级降序排),解析mapping,根据指定mapping与模板设定值进行合并。遍历每个模板,如果:1. 指定的mapping在模板中存在,需要合并;2. 模板指定了type,但是mapping没有;3. mapping指定了type,但是模板没有。合并完mapping后,需要将所有的settings整合,放入indexSettingsBuilder,indexSettingsBuilder是为了生成创建索引的Settings;indexSettingsBuilder检查:1. 是否有创建索引对应的ES版本,如果没有,则取一个版本号(如果当前集群中,存在多个版本的ES节点,获取oldest version,即最老版本的非客户端节点版本;如果都相同,则取当前集群版本);2. 是否设置了分片数,如果没有,7.0版本之前分片数置为5,否则置为1; 3. 副本数如果为空,则置为1;4. 如果模板、API都没有指定auto_expand_replicas参数(根据集群中节点数量,自动拓展副本数,范围 0~5),但settings中存在,则根据settings中值来设置;5. 创建日期,没有设置则取按UTC获取;6. 设置provided_name;7. 设置uuid;8. 判断是否指定了routing shards,即需要路由的分片数,如果指定了,直接使用,否则需要计算(7.0之前直接使用分片数;7.0及以后版本重新计算,计算方式直接附代码);9. 从settings中移除routing shards参数(这个参数当前只与创建索引有关,使用完后就无用了,因此从settings中移除,放到tmpBuilder中)。
// 计算routing shards
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) {
if (indexVersionCreated.onOrAfter(Version.V_7_0_0)) {
int log2MaxNumShards = 10; // logBase2(1024)
int log2NumShards = 32 - Integer.numberOfLeadingZeros(numShards - 1); // ceil(logBase2(numShards))
int numSplits = log2MaxNumShards - log2NumShards;
numSplits = Math.max(1, numSplits); // Ensure the index can be split at least once
return numShards * 1 << numSplits;
} else {
return numShards;
}
}
1. 先初始化一个IndexService。IndexService主要包含创建、删除、关闭分片等操作,即维护一个索引的基本操作。
代码入口及流程:IndicesService#createIndex -> IndicesService#createIndexService -> IndexModule#newIndexService
如果当前集群是7.0及以后版本,同时有index.optimize_auto_generated_id参数,则失败(即自动生成doc id以提高写入效率,该参数在7.x已经移除);创建对应的indexModule,进入到IndexModule;检查是否开启了query cache,对应生成IndexQueryCache 或 DisabledQueryChache(索引级别,默认开启);初始化一个IndexService;更新索引信息,即使用一个map,保存所有索引和与其相对应的IndexService。
2. 获取mapperService,合并新老mapping(mapping合并策略只有update和recovery,即创建或更新mapping,和由于实例重启分片迁移等原因恢复时的合并);
3. 构建IndexMetaData,并生成新的ClusterState
4. 如果index状态open,执行allocationService.reroute将分片分配到其他节点
5. 最后删除索引服务(indicesService.removeIndex,分片已经全部路由到其他节点,该节点需要清除)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。