赞
踩
本文主要分析allocation 模块的结构和原理,然后以集群启动过程为例分析 allocation 模块的工作过程
分片分配就是把一个分片指派到集群中某个节点的过程。分配决策由主节点完成,分配决策包含两方面:
对于新建索引和已有索引, 分片分配过程也不尽相同,不过不管哪种场景,ElasticSearch 都通过两个基础组件完成工作:allocators 分配者
和deciders 决定者
Allocators
尝试寻找最优的节点来分配分片,
Deciders
则负责判断并决定是否要进行这次分配.
对于新建索引和已有索引, 分片分配过程也不尽相同
1.index 增删
2.node 增删
3.replica数量改变
4.手工 reroute
5.集群重启
这个复杂的分配过程在一个叫 reroute 的函数中实现:AllocationService.reroute
此函数对外有两种重载,一种是通过接口调用的手工reroute,另一种是内部模块调用的reroute。本章以内部模块调用的reroute为例,手工reroute过程与此类似。
AllocationService.reroute
对一个或多个主分片或副分片执行分配,分配以后产生新的集群状态。Master 节点将新的集群状态广播下去,触发后续的流程。 对于内部模块调用,返回值为新产生的集群状态,对于手工执行的reroute 命令,返回命令执行结果。
Es 中有以下几个类型的 allocator:
Allocator 负责为某个特定的 shard 分配目的节点。每个Allocator的主要工作是根据某种逻辑得到一个节点列表,然后调用 deciders 去决策,根据决策结果选择一个目的 node。
Allocators 分为 gatewayAllocator 和 shardsAllocator 两种。gatewayAllocator 是为了找到现有分片,shardsAllocator 是根据权重策略在集群的各节点间均衡分片分布。其中 gatewayAllocator 又分主分片和副分片的 allocator。 下面概述每个allocator的作用。
对于这两类 Allocator,我的理解是 gatewayAllocator 是为了找到现有shard,shardsAllocator 是为了分配全新 shard。
目前有下列类型的决策器:
public static Collection<AllocationDecider> createAllocationDeciders(...) { Map<Class, AllocationDecider> deciders = new LinkedHashMap<>(); addAllocationDecider (deciders, new MaxRetryAllocationDecider(settings)); addAllocationDecider (deciders, new ResizeAllocationDecider(settings)); addAllocationDecider (deciders, new ReplicaAfterPrimaryActiveAllocation-Decider(settings)); addAllocationDecider (deciders, new RebalanceOnlyWhenActiveAllocation-Decider(settings)); addAllocationDecider (deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider (deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider (deciders, new EnableAllocationDecider(settings, clusterSettings)); addAllocationDecider (deciders, new NodeVersionAllocationDecider(settings)); addAllocationDecider (deciders, new SnapshotInProgressAllocationDecider(settings)); addAllocationDecider (deciders, new RestoreInProgressAllocationDecider(settings)); addAllocationDecider (deciders, new FilterAllocationDecider(settings, clusterSettings)); addAllocationDecider (deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider (deciders, new DiskThresholdDecider(settings, clusterSettings)); addAllocationDecider (deciders, new ThrottlingAllocationDecider(settings, clusterSettings)); addAllocationDecider (deciders, new ShardsLimitAllocationDecider(settings, clusterSettings)); addAllocationDecider (deciders, new AwarenessAllocationDecider(settings, clusterSettings)); return deciders.values(); }
它们继承自AllocationDecider,需要实现的接口有:
这些 deciders 在ClusterModule#createAllocationDeciders
中全部添加进去,decider 运行之后可能产生的结果有以下几种:
ALWAYS、YES、NO、THROTTLE
这些 deciders 大致可以分为以下几类。
SameShardAllocationDecider
避免主副分片分配到同一个节点
AwarenessAllocationDecider
感知分配器,感知服务器、机架等,尽量分散存储shard
有两种参数用于调整:
cluster.routing.allocation.awareness.attributes: rack_id
cluster.routing.allocation.awareness.attributes: zone
ShardsLimitAllocationDecider
同一个节点上允许存在的同一个index的shard数目
ThrottlingAllocationDecider
recovery阶段的限速配置,包括:
cluster.routing.allocation.node_concurrent_recoveries
cluster.routing.allocation.node_initial_primaries_recoveries
cluster.routing.allocation.node_concurrent_incoming_recoveries
cluster.routing.allocation.node_concurrent_outgoing_recoveries
ConcurrentRebalanceAllocationDecider
rebalance并发控制,可以通过下面的参数配置:
cluster.routing.allocation.cluster_concurrent_rebalance
DiskThresholdDecider
根据磁盘空间进行决策的分配器
RebalanceOnlyWhenActiveAllocationDecider 所有shard都处在active状态下,才可以执行rebalance操作 FilterAllocationDecider 可以调整的参数如下,可以通过接口动态设置: index.routing.allocation.require.* [ 必须 ] index.routing.allocation.include.* [ 允许 ] index.routing.allocation.exclude.* [ 排除 ] cluster.routing.allocation.require.* cluster.routing.allocation.include.* cluster.routing.allocation.exclude.* 配置的目标为节点IP或节点名等。cluster 级别设置会覆盖index级别设置。 ReplicaAfterPrimaryActiveAllocationDecider 保证只会在主分片分配完毕后才开始分配分片副本。 ClusterRebalanceAllocationDecider 通过集群中active的shard状态来决定是否可以执行rebalance,通过下面的配置控制,可以动态生效: cluster.routing.allocation.allow_rebalance
可配置的值如下表所示:
取值 | 含义 |
---|---|
indices_ all_active | 当集群所有的节点分配完毕,才认定集群rebalance完成(默认) |
indices_primaries_active | 只要所有主分片分配完毕,就可以认定集群rebalance完成 |
always | 即使当主分片和分片副本都没有分配,也允许rebalance操作 |
reroute 中主要实现两种 allocator:
private void reroute (RoutingAllocation allocation) {
if(allocation.routingNodes().unassigned().size() > 0) {
removeDelayMarkers (allocation) ;
//gateway分配器
gatewayAllocator.allocateUnassigned (allocation);
}
//分片均衡分配器
shardsAllocator.allocate(allocation);
}
reroute 流程全部运行于 masterService#updateTask
线程。
gateway结束前: 调用submitStateUpdateTask提交任务,任务被clusterService 放入队列,在Master节点顺序执行。
执行到任务中的:allocationService.reroute
收集各个节点的shard元数据,待某个 shard 的 Response 从所有节点全部返回后,执行 finishHim(),然后对收集到的数据进行处理:AsyncShardFetch#processAsyncFetch
allocationService.reroute
执行完毕返回新的集群状态。下面以集群启动时 gateway 之后的 reroute 为背景分析流程。
gateway 阶段恢复的集群状态中,我们已经知道集群一共有多少个索引,每个索引的主副分片各有多少个,但是不知道它们位于哪个节点,现在需要找到它们都位于哪个节点。集群完全重启的初始状态,所有分片都被标记为未分配状态,此处也被称作分片分配过程。因此分片分配的概念不仅仅是分配一个全新分片。对于索引某个特定分片的分配过程中,先分配其主分片,后分配其副分片。
gatewayAllocator分为主分片和副分片分配器:
primaryShardAllocator.allocateUnassigned (allocation) ;
replicaShardAllocator.processExistingRecoveries (allocation) ;
replicaShardAllocator.allocateUnassigned (allocation) ;
它们都继承自 BaseGatewayShardAllocator ,如下图所示。
primaryShardAllocator#allocateUnassigned
函数实现整个分配过程。
主分片分配器与副分片分配器都继承自BaseGatewayShardAllocator
, 执行相同的allocateUnassigned
函数,只是执行makeAllocationDecision
时,主副分片分配器各自执行自己的策略。
allocateUnassigned
的流程是:遍历所有unassigned shard
, 依次处理,通过decider
决策分配,期间可能需要fetchData
获取这个shard对应的元数据。如果决策结果为YES,则将其初始化。
public void allocateUnassigned (RoutingAllocation allocation) {
//遍历未分配的分片
while (unassignedIterator.hasNext() ) {
final ShardRouting shard = unassignedIterator.next() ;
//调用主分片分配器的makeAllocationDecision进行决策
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision (shard, allocation,logger);
//根据决策器结果决定是否初始化分片
if (allocateUnas signedDecision.getAllocationDecision() == AllocationDecision.YES) {
unassignedIterator. initialize(...);
} else {
unassignedIterator . removeAndIgnore(. ..) ;
}
}
}
主副分片执行同的 unassigned!terator.initialize
函数,将分片的 unassigned
状态改为 initialize
状态:
public ShardRouting initializeShard(. ..) {
//初始化一个ShardRouting
ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
//添加到目的节点的shard列表
node (nodeId).add(initializedShard);
addRecovery (initializedShard);
//添加到assignedshards列表
assignedShardsAdd (initial i zedShard);
//设置状态已更新
routingChangesObserver.shardInitialized (unassignedShard, initializedShard);
return initializedShard;
}
在routingChangesObserver.shardInitialized
中设置RoutingNodes
已更新。更新的内容大约就是某个shard被分配到了某个节点,这个shard是主还是副,副的话会设置recoverySource为PEER,但只是一个类型,并没有告诉节点recovery 的时候从哪个节点恢复,节点恢复时自己从集群状态中的路由表中查找。
reroute完成后构建新的集群状态:
protected ClusterState reroute (final ClusterState clusterState, String reason, boolean debug) {
reroute (allocation);
if (allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}
然后,Master 把新的集群状态广播下去,当数据节点发现某个分片分配给自己,开始执行分片的recovery。
PrimaryShardAllocator#makeAllocationDecision
主分片分配器的makeAllocationDecision
过程返回指定的分片是否可以被分配,如果还没有这个分片的信息,则向集群的其他节点去请求该信息;如果已经有了,则根据decider进行决策。
首次进入函数时,还没有任何分片的元信息,发起向集群所有数据节点获取某个shard元信息的fetchData请求。之所以把请求发到所有节点,是因为它不知道哪个节点有这个shard 的数据。集群启动的时候,遍历所有shard,再对每个shard向所有数据节点发fetchData请求。如果集群有100个节点、1000个分片,则总计需要请求100X 1000= 100000次。虽然是异步的,但仍然存在效率问题。当ES集群规模比较大、分片数非常多的时候,这个请求的总量就会很大。
从所有数据节点异步获取某个特定分片的信息,没有超时设置。
void asyncFetch (final DiscoveryNode\[\] nodes, long fetchingRound) { //nodes为节点列表 action.list(shardId, nodes, new ActionListener<BaseNodesResponse<T>>() { //收到成功的回复 public void onResponse (BaseNodesResponse<T> response) { processAsyncFetch (response .getNodes(),response. failures() , fetchingRound); } //收到失败的回复 public void onFailure (Exception e) { List<FailedNodeException> failures = new ArrayList<>(nodes.length); for (final Di scoveryNode node: nodes) { failures.add(. ..); } processAsyncFetch(null, failures, fetchingRound); } }); }
请求节点shard元信息的action为:
internal:gateway/local/started_ shards
遍历发送的过程主要实现位于:
TransportNodesAction.AsyncAction#start
对端对此响应的模块为:
gateway.TransportNodesListGatewayStartedShards#nodeOperation
其中读取本地shard元数据返回请求方。
对于一个特定shard,当Response达到期望数量(发出请求时的节点数)时执行finishHim,调用处理模块:
AsyncShardFetch#processAsyncFetch
在这里实现收到各节点返回的shard级别元数据的对应处理,将Response信息放到this.cache中,下次“reroute” 的时候从cache里取,然后再次执行reroute。
reroute(shardId, “post_ response”);
//接着调用–>
routingService.reroute (“async_ shard_ fetch”);
主要实现是对当前ClusterState 提交一个任务,再次执行allocationService.reroute
,此时this.cache中已经有了shard元数据,进入主分片分配过程,依据这些元信息选择主分片。
ES5之后的主分片选举与之前的版本机制是不一样的。ES5之前的版本依据分片元数据的版本号对比实现,选择分片元信息中版本号高的分片来选举主分片,ES5及之后的版本依据allocation id从inSyncAllocationIds列表中选择一个作为主分片。 这种改变的主要原因是依据版本号无法保证版本号最高的分片一定被选为主分片。 例如,当前只有一个活跃分片,那它一定被选为主分片,而拥有最新数据的分片尚未启动。具体请参考“数据模型”章节。
在makeAllocationDecision函数中,从inSyncAllocationIds集合中找到活跃的shard, 得到一个列表,把这个列表中的节点依次执行decider,决策结果:
//根据shard id从集群状态获取同步ID列表
final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(unassignedShard.id());
//构建匹配同步ID的目标节点列表
final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore, allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);
//对上一步得到的列表中的节点依次执行decider,决策结果
NodesToAllocate nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, false);
具体的决策过程会依次遍历全部 decider:
public Decision canAllocate (ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
//遍历全部decider
for(AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation);
if (decision == Decision.NO) {
if (!allocation. debugDecision()) {
return decision;
} else {
ret.add (decision);
} else if (decision != Decision.ALWAYS && (allocation.getDebugMode() != EXCLUDE_ YES_ DECISIONS || decision.type() != Decision.Type.YES)) {
ret.add (decision);
}
return ret;
}
只要有一个decider拒绝,就拒绝执行本次分配。决策之后的结果可能会有多个节点,取第一个。至此,主分片选取完成。
cluster.routing.allocation.enable
对主分片分配的影响,在集群完全重启的操作流程中,要求把这个选项先设置为none,然后重启集群。分片分配时,在EnableAllocationDecider中对这个选项进行判断和实施,主分片的分配会被拦截吗?答案是肯定的,主分片被这个decider拦截。但是在主分片的分配过程中有另外一层逻辑:如果被decider拦截,返回NO,则尝试强制分配。给buildNodesToAllocate
的最后一个参数传入true,接下来尝试强制分配,逻辑如下:
public Decision canForceAllocatePrimary (ShardRouting shardRouting, RoutingNode node, RoutingAl location allocation) {
Decision decision = canAllocate (shardRouting, node, allocation);
if (decision.type() == Type.NO) {
// On a NO decision, by default, we allow force allocating the primary.
return allocation. decision (Decision.YES, decision.label(),
"primary shard \[%s\] allowed to force allocate on node \[%s\]",
shardRouting. shardId() , node.nodeId());
} else {
// On a THROTTLE/ YES decision, we use the same decision instead of forcing allocation
return decision;
}
}
如果decider 返回NO,则直接设置成YES,这种情况只在分配一个磁盘上已经存在的unassigned primary shards时出现。
与主分片分配器同理,replicaShardAllocator.allocateUnassigned 函数实现了副分片的分配过程。其allocateUnasigned过程与primaryShardAllocator.allocateUnassigned执行的是基类的同一个函数。
ReplicaShardAllocator#makeAllocationDecision
副分片决策过程中也需要“fetchData”,只不过主分片分配节点已经“fetch”过,可以直接从结果中获取。但是在“fetchData”之前先运行一遍 allocation.deciders().canAllocate,来判断是否至少可以在一个node上分配(canBeAllocatedToAtLeastOneNode),如果分配不了就省略后面的逻辑了,例如,其主分片尚未就绪等。
然后根据“fetchData”到的shard元信息,分配到已经拥有这个shard副本的节点,如果没有相关节点,则判断是否需要“delay",否则返回NOT_TAKEN。分配成功之后一样进入INIT过程。
是否“delay”由下面的配置项控制,可以动态调整:
index.unassigned.node_left.delayed_timeout
shardsAllocator由BalancedShardsAllocator类完成,其中有三个配置项用于控制权重:
cluster.routing.allocation.balance.index
cluster.routing.allocation.balance.shard
cluster.routing.allocation.balance.threshold
这些权重参数的具体作用建议参考MasteringElasticsearch或官方手册。
具体实现如下:
public void allocate (RoutingAllocation allocation) {
final Balancer balancer = new Balancer (logger, allocation, weightFunction, threshold);
balancer.allocateUnassigned();
balancer.moveShards();
balancer.balance();
}
allocateUnassigned
:根据权重算法和decider决定把shard分配到哪个节点。同样将决策后的分配信息更新到集群状态,由Master广播下去。moveShards
:对状态为started的分片根据decider来判断是否需要“move",move过程中此shard的状态被设置为RELOCATING,在目标上创建这个shard时状态为INITIALIZING,同时版本号会加1。balance
:根据权重函数平衡集群模型上的节点。两者之间没有明显的界限,gateway的最后一步执行reroute,等待这个函数返回,然后打印gateway选举结果的日志,集群完全重启时,reroute向各节点发起的询问shard级元数据的操作基本还没执行完,因此一般只有少数主分片被选举完了,gateway流程的结束只是集群级和索引级的元数据已选举完毕,主分片的选举正在进行中。
makeAllocationDecision
成功后,unassignedIterator.initialize
初始化这个shard,创建一个新的ShardRouting对象,把相关信息添加到集群状态,设置routingChangesObserver
为已经发生变化,后面的流程就会把新的集群状态广播出去。到此,reroute 函数执行完毕。
节点收到广播下来的集群状态,进入IndicesClusterStateService#applyClusterState
处理所有相关操作,其中,createOrUpdateShards
执行到createShard时,准备recovery相关信息:
private void createShard (DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
if (shardRouting.recoverySource().getType() == Type.PEER) {
sourceNode = findSourceNodeForPeerRecovery (logger, routingTable, nodes, shardRouting);
try {
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService. createShard(...);
} catch (Exception e) {
failAndRemoveShard ( shardRouting, true, "failed to create shard", e, state);
}
}
}
接下来IndicesService.createShard
开始执行recovery
:
public IndexShard createShard(. . .) throws IOException { IndexService indexService = indexService(shardRouting. index()); IndexShard indexShard = indexService.createShard (shardRouting, globalCheckpointSyncer); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecover (recoveryState, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> { try ( //concrete index - no name clash, it uses uuid client.admin().indices().preparePutMapping().setConcreteIndex(shardRouting.index()) .setType (type) .setSource(mapping.source().string(), XContentType.JSON) .get () ; } catch ( IOException ex) { throw new ElasticsearchException("failed to stringify mapping source", exx); ), this); return indexShard; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。