当前位置:   article > 正文

【Elasticsearch】分片和副本 allocation_elasticsearch的allocation

elasticsearch的allocation

什么是 allocation:

因为 lucene 是单机索引库以及存储引擎,ES在 lucene基础上加入了集群也就是进行了分布式封装,数据通过 shard 分片分散到不同的节点,并且提供了 replica 来备份 shard。而 allocation 就是 ES 管理分片的分配器并决策应该如何分片,不同分片保存在集群中哪个节点上,哪个作为主分片、哪个作为副分片。

触发 allocation 的时机:

(1)索引的增加和删除
(2)集群节点的增加和删除
(3)手动reroute指定分片到某个节点
(4)索引修改了replica
(5)集群重启        

allocation模块的结构:

(1)Allocators 分配器,分配器持有多个决策器。作用是查找最优的节点。
(2)Decider 决策器。作用是判断是否进行本次分配。  

Allocators 种类:

首先看分配器的种类,第一种:
BalancedShardsAllocator    继承自 ShardsAllocator 接口。用于查找 shard 数最少的节点。
ReplicaShardAllocator    继承自 BaseGatewayShardAllocator 类。用于查找有某个 shard 的节点。 子类是  InternalPrimaryShardAllocator()。是 GatewayAllocator 的内部类并创建。
PrimaryShardAllocator   继承自 BaseGatewayShardAllocator 类。用于查找有某个 shard 最新数据的节点。 子类是 InternalReplicaShardAllocator()。是 GatewayAllocator 的内部类并创建。

BalancedShardsAllocator 初始化流程:

负责判断负载来判断划分的某个 node。
ClusterModule.createShardsAllocator()
new BalancedShardsAllocator()
new AllocationService()    创建 AllocationService() 对象,这个是提供分片分配的入口服务。

GatewayAllocator 初始化流程:

负责通过已知 shard 查找划分的某个 node。
new Node()    节点对象创建时模块注入之前。
clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));    通过Guice获取对象实例,并提交给 ClusterModule。
ClusterModule.setExistingShardsAllocators()    会将 GatewayAllocator 进一步提交给 AllocationsService
AllocationService.setExistingShardsAllocators()    提交给 AllocationsService

Decider 初始化流程:

会调用 ClusterModule.addAllocationDecider() 初始化所有 Decider,得到 Decider 集合集合后,构造 AllocationDeciders, Collection<AllocationDecider> 保存所有决策器,流程如下:
  1. 【log】ClusterModule.createAllocationDeciders() 添加所有 Decider
  2. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider
  3. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider
  4. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider
  5. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider
  6. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider
  7. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider
  8. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider
  9. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider
  10. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider
  11. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider
  12. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider
  13. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider
  14. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider
  15. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider
  16. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider
  17. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider
  18. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
  19. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider
  20. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.ccr.allocation.CcrPrimaryFollowerAllocationDecider
  21. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotAllocationDecider
  22. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider
  23. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotEnableAllocationDecider
  24. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.HasFrozenCacheAllocationDecider
  25. 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider
  26. 【日志】new AllocationDeciders() 传入的AllocationDecider总数:24
可以发现一共注册了24个决策器,不同决策器作用如下:
  1. MaxRetryAllocationDecider
  2. ResizeAllocationDecider
  3. ReplicaAfterPrimaryActiveAllocationDecider    保证只会在主shard分配完毕后再对副shard进行分配。
  4. RebalanceOnlyWhenActiveAllocationDecider    当所有shard都处在active状态时,才能执行 rebalance。
  5. ClusterRebalanceAllocationDecider    通过集群汇总active状态的shard来决定是否可以执行rebalance。
  6. ConcurrentRebalanceAllocationDecider
  7. EnableAllocationDecider
  8. NodeVersionAllocationDecider
  9. SnapshotInProgressAllocationDecider
  10. RestoreInProgressAllocationDecider
  11. NodeShutdownAllocationDecider
  12. FilterAllocationDecider    通过接口,动态设置某个node分配策略时(必须、允许、排除),对这个node进行过滤。
  13. SameShardAllocationDecider    避免主副shard都分配到一个节点。
  14. DiskThresholdDecider        根据磁盘空间决策shard应该分配到某个node。
  15. ThrottlingAllocationDecider       并发控制,在 recovery 限速控制。
  16. ShardsLimitAllocationDecider    同一个节点限值同一个index的shard数量。
  17. AwarenessAllocationDecider
  18. DataTierAllocationDecider
  19. CcrPrimaryFollowerAllocationDecider
  20. SearchableSnapshotAllocationDecider
  21. SearchableSnapshotRepositoryExistsAllocationDecider
  22. SearchableSnapshotEnableAllocationDecider
  23. HasFrozenCacheAllocationDecider
  24. DedicatedFrozenNodeAllocationDecider
上面的调用实际上最终是为了生成  AllocationServic   对象:
ClusterModule 构造函数:
  1. public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
  2.                      ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, ThreadContext threadContext,
  3.                      SystemIndices systemIndices) {
  4.     this.clusterPlugins = clusterPlugins;
  5.     this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
  6.     // 创建 AllocationDeciders 里面是所有决策器
  7.     this.allocationDeciders = new AllocationDeciders(deciderList);
  8.     // 创建 BalancedShardsAllocator
  9.     this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
  10.     this.clusterService = clusterService;
  11.     this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext, systemIndices);
  12.     // 创建 AllocationService
  13.     // 提交 AllocationDeciders、BalancedShardsAllocator
  14.     // 在初始化 Node() 对象时会给 AllocationService 提交 GatewayAllocator 实例
  15.     // 最终 AllocationService 引用了 AllocationDeciders、BalancedShardsAllocator、GatewayAllocator
  16.     this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
  17.     this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService);
  18. }

AllocationServic   分配服务:

前面创建的两种决策器、分配器等等所有对象最终都会提交给 AllocationServic  这个服务。都保存在几个属性中。创建 AllocationServic.showAllocatorsInfo() 函数可以获取分配模块的详细信息,在 Node.new() 构造函数最后调用它:
AllocationDeciders allocationDeciders;    保存所有决策器
Map<String, ExistingShardsAllocator> existingShardsAllocators;    保存 GatewayAllocator 分配器
ShardsAllocator shardsAllocator;    保存 ShardsAllocator 分配器
  1. 【log】AllocationService.showAllocatorsInfo() 输出 allocators 信息
  2. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider
  3. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider
  4. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider
  5. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider
  6. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider
  7. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider
  8. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider
  9. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider
  10. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider
  11. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider
  12. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider
  13. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider
  14. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider
  15. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider
  16. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider
  17. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider
  18. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
  19. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider
  20. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.ccr.allocation.CcrPrimaryFollowerAllocationDecider
  21. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotAllocationDecider
  22. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider
  23. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotEnableAllocationDecider
  24. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.HasFrozenCacheAllocationDecider
  25. 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider
  26. 【log】AllocationService.shardsAllocator clazz=class org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator
  27. 【log】AllocationService.existingShardsAllocators key=gateway_allocator,clazz=class org.elasticsearch.gateway.GatewayAllocator
  28. 【log】AllocationService.existingShardsAllocators key=searchable_snapshot_allocator,clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.SearchableSnapshotAllocator
入口函数 AllocationService.reroute() 代码如下:
会触发 reroute 流程,获取到 reroute 结果,如果涉及到集群状态变更则会由上层调用的 MasterService 将状态广播出去。注意触发条件是集群提交任务,所以这个只在 master 节点执行。
当其他节点收到 reroute 结果,也就是一个node收到新集群状态发现自己被分配了某个 shard 时,这个节点会自动进入shard 的 recovery 流程。
  1. public CommandsResult reroute(final ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) {
  2.     RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
  3.     RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes(), currentNanoTime());
  4.     allocation.debugDecision(true);
  5.     allocation.ignoreDisable(true);
  6.     if (retryFailed) {
  7.         resetFailedAllocationCounter(allocation);
  8.     }
  9.     RoutingExplanations explanations = commands.execute(allocation, explain);
  10.     // we revert the ignore disable flag, since when rerouting, we want the original setting to take place
  11.     allocation.ignoreDisable(false);
  12.     // 执行 reroute
  13.     reroute(allocation);
  14.     // 返回新的集群状态到 TransportClusterRerouteAction.execute() 中
  15.     return new CommandsResult(explanations, buildResultAndLogHealthChange(clusterState, allocation, "reroute commands"));
  16. }
会进入重载的reroute函数调用 GatewayAllocator、BalancedShardsAllocator 进行处理:
//核心逻辑是根据上述规则&分片权重(index、cluster)进行位置判断,然后进行数据移动、移动结束初始化启动、最后调整clusterstate完成分配。
  1. private void reroute(RoutingAllocation allocation) {
  2.     // 参数时 RoutingAllocation
  3.     // 该类持有当前集群shard分配的状态信息、决策信息、节点信息等,在后面的分配过程的主要操作类。
  4.     assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
  5.     assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty() :
  6.         "auto-expand replicas out of sync with number of nodes in the cluster";
  7.     assert assertInitialized();
  8.     removeDelayMarkers(allocation);
  9.     // 先调用 GatewayAllocator 查询现有未分配的shard
  10.     allocateExistingUnassignedShards(allocation);  
  11.     // 再调用 BalancedShardsAllocator 根据节点负载找一个node进行分配
  12.     shardsAllocator.allocate(allocation);
  13.     assert RoutingNodes.assertShardStats(allocation.routingNodes());
  14. }
其中传入的参数 allocation ,该类持有当前集群shard分配的状态信息、决策信息、节点信息等,在后面的分配过程的主要操作类。
RoutingAllocation allocation:创建如下:
RoutingAllocation allocation = new RoutingAllocation(
    allocationDeciders,    // 取 AllocationService 的裁决器
    routingNodes,        // 
    fixedClusterState,
    clusterInfoService.getClusterInfo(),    // 集群信息,里面有所有index、shard 信息
    snapshotsInfoService.snapshotShardSizes(),
    currentNanoTime()
);

GatewayAllocator 的处理过程:

AllocationService.reroute()    ->
AllocationService.allocateExistingUnassignedShards()    ->
  1. private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
  2.     // 将集群信息中的所有未分配 shard 优先级排序
  3.     allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
  4.     for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
  5.         existingShardsAllocator.beforeAllocation(allocation);
  6.     }
  7.     final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
  8.     // 处理未分配的主shard
  9.     while (primaryIterator.hasNext()) {
  10.         final ShardRouting shardRouting = primaryIterator.next();
  11.         if (shardRouting.primary()) {
  12.             // 从集合中查找 GatewayAllocator 并调用 allocateUnassigned()
  13.             ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
  14.             allocator.allocateUnassigned(shardRouting, allocation, primaryIterator);
  15.         }
  16.     }
  17.     for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
  18.         existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
  19.     }
  20.     // 处理未分配的副shard
  21.     final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();
  22.     while (replicaIterator.hasNext()) {
  23.         final ShardRouting shardRouting = replicaIterator.next();
  24.         if (shardRouting.primary() == false) {
  25.             ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
  26.             allocator.allocateUnassigned(shardRouting, allocation, replicaIterator);
  27.         }
  28.     }
  29. }
紧接着按照主副未分配shard分别调用:
GatewayAllocator.allocateUnassigned()    ->
GatewayAllocator.innerAllocatedUnassigned()    ->    判断主还是副shard,调用父类的 allocateUnassigned() 函数。
BaseGatewayShardAllocator.allocateUnassigned()    ->    主副shard都进入这里,只是执行 makeAllocationDecision() 时各自子类(PrimaryShardAllocator/ReplicaShardAllocator)有不同的 decision 策略。
ExistingShardsAllocator.UnassignedAllocationHandler.initialize()    ->    判断前面主、副未分配shard经过各自的 决策器返回 YES,则修改 shard 的状态,由 unassigend(未分配) 改为 initalizer(初始化)。否则直接忽略。
BaseGatewayShardAllocator.allocateUnassigned() 代码:
  1. public void allocateUnassigned(ShardRouting shardRouting, RoutingAllocation allocation, ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
  2.     // 主副 未分配shard都会进入这里, makeAllocationDecision 是静态函数,调用子类的实现
  3.     // PrimaryShardAllocator.makeAllocationDecision()
  4.     // ReplicaShardAllocator.makeAllocationDecision()
  5.     final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
  6.     if (allocateUnassignedDecision.isDecisionTaken() == false) {
  7.         return;
  8.     }
  9.     // 如果前面通过决策器后返回 YES
  10.     if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
  11.         // 修改 shard 的状态,由 unassigend(未分配) 改为 initalizer(初始化)
  12.         unassignedAllocationHandler.initialize(
  13.             allocateUnassignedDecision.getTargetNode().getId(),
  14.             allocateUnassignedDecision.getAllocationId(),
  15.             getExpectedShardSize(shardRouting, allocation),
  16.             allocation.changes());
  17.     }
  18.     // 忽略
  19.     else {
  20.         unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
  21.     }
  22. }
其中,在主副分片判断是否初始化时,主节点会调用 fetch.Data() 向其他节点询问这个 shard 的信息,确定它在哪个 shard。
PrimaryShardAllocator.makeAllocationDecision()    =>
ReplicaShardAllocator.makeAllocationDecision()    =>
发送的RPC请求如下:
internal : gateway / local/ started_shards
其他节点会执行下列方向从本地查询shard信息并响应:
gateway.TransportNodesListGatewaystartedShards#node0peration 
在确定了节点信息之后,会调用系列方法判断shard是否可以进行分配,如果可以则返回 AllocateUnassignedDecision.yes
调用 AllocationDeciders.canAllocate() 判断这个shard是否可以在某个节点上进行分配    =>
<br>
如果是YES会修改状态并调用初始化:
ExistingShardsAllocator.UnassignedAllocationHandler.initialize()     =>
RoutingNodes.UnassignedIterator.initialize()    =>    返回一个  ShardRouting 对象。
<br>
ShardRouting 对象记录着这个 shard 的详细信息:
  1. private final ShardId shardId;  //分片id
  2. private final String currentNodeId; //当前所在节点id
  3. private final String relocatingNodeId;  //重新分配的节点id
  4. private final boolean primary;  //是否主
  5. private final ShardRoutingState state;  //分片状态

BalancedShardsAllocator 的处理过程。

会创建 Balancer ,后续过程由 Balancer 接管。
Balancer.allocateUnassigned()    =>    未分配shard的分配判断,同样会调用 RoutingNodes.initializeShard() 修改初始化状态
BalancedShardsAllocator.decideAllocateUnassigned()    =>    判断是否有一个node可以对这个shard进行分配,根据负载来的。同样返回判断结果 Decision.YES。
BalancedShardsAllocator.allocate() 代码如下:
  1. public void allocate(RoutingAllocation allocation) {
  2.     if (allocation.routingNodes().size() == 0) {
  3.         failAllocationOfNewPrimaries(allocation);
  4.         return;
  5.     }
  6.     final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
  7.     // 根据权重算法和decider决定把shard分配到哪个节点。同样将决策后的分配信息更新到集群状态,由Master广播下去。
  8.     balancer.allocateUnassigned();
  9.     // 对状态为started的分片根据decider来判断是否需要“move”,
  10.     balancer.moveShards();
  11.          //  根据权重函数平衡集群模型上的节点。
  12.     balancer.balance();
  13. }
reroute 的结果最终时如何由master通知到集群出去的?
RoutingNodes.initializeShard()
  1. public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId, long expectedSize, RoutingChangesObserver routingChangesObserver) {
  2.     ensureMutable();
  3.     assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
  4.     // 初始化 ShardRouting
  5.     ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
  6.     // 添加到目的节点的节点信息,这个操作在master执行,生成了新的集群转台后续再广播出去
  7.     node(nodeId).add(initializedShard);
  8.     inactiveShardCount++;
  9.     if (initializedShard.primary()) {
  10.         inactivePrimaryCount++;
  11.     }
  12.     addRecovery(initializedShard);
  13.     assignedShardsAdd(initializedShard);
  14.     // 通知shard路由改变观察者,
  15.     routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
  16.     return initializedShard;
  17. }

会调用观察者通知函数:

RoutingChangesObserver.shardInitialized()    =>    观察者通知
RoutingNodesChangedObserver.setChanged()    =>    修改 changed==true
最终会判断 changed==true 的地方:
MasterService.runTasks()    ->  最终执行任务 TaskInputs
MasterService.calculateTaskOutputs()    -> 调用提交任务时定义的  execute() 函数
MasterService.executeTasks()    -> 调用提交任务时定义的  execute() 函数,并返回一个 ClusterTasksResult 对象,封装了执行成功与否以及执行完毕后集群状态 ClusterState
ClusterStateUpdateTask.execute    =>    执行集群任务
AllocationService.reroute()    =>    开始 reroute 流程,判断集群状态中记录的所有index的shard是否有未分配的,并判断是否能进行某个节点分配
RoutingAllocation.routingNodesChanged()    =>    判断路由信息是否改变。
RoutingNodesChangedObserver.isChanged()    =>    判断路由信息是否改变,如果改变了则返回新的集群状态。
MasterService.publish()     ->     判断任务结果是否有集群状态变更消息需要推送出去,调用 clusterStatePublisher 消息推送者发送消息

【集群重启】
集群启动造成的 allocation 过程。

【集群节点加入】
在集群节点加入时会造成 allocation 过程。

【索引新增】
在索引新增时也会造成 allocation 过程。
新增索引请求:

【修改replica】
在修改索引的 replica 的备份数量也会造成 allocation 过程。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/685625
推荐阅读
相关标签
  

闽ICP备14008679号