赞
踩
- 【log】ClusterModule.createAllocationDeciders() 添加所有 Decider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.ccr.allocation.CcrPrimaryFollowerAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotEnableAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.HasFrozenCacheAllocationDecider
- 【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider
- 【日志】new AllocationDeciders() 传入的AllocationDecider总数:24
- MaxRetryAllocationDecider
- ResizeAllocationDecider
- ReplicaAfterPrimaryActiveAllocationDecider 保证只会在主shard分配完毕后再对副shard进行分配。
- RebalanceOnlyWhenActiveAllocationDecider 当所有shard都处在active状态时,才能执行 rebalance。
- ClusterRebalanceAllocationDecider 通过集群汇总active状态的shard来决定是否可以执行rebalance。
- ConcurrentRebalanceAllocationDecider
- EnableAllocationDecider
- NodeVersionAllocationDecider
- SnapshotInProgressAllocationDecider
- RestoreInProgressAllocationDecider
- NodeShutdownAllocationDecider
- FilterAllocationDecider 通过接口,动态设置某个node分配策略时(必须、允许、排除),对这个node进行过滤。
- SameShardAllocationDecider 避免主副shard都分配到一个节点。
- DiskThresholdDecider 根据磁盘空间决策shard应该分配到某个node。
- ThrottlingAllocationDecider 并发控制,在 recovery 限速控制。
- ShardsLimitAllocationDecider 同一个节点限值同一个index的shard数量。
- AwarenessAllocationDecider
- DataTierAllocationDecider
- CcrPrimaryFollowerAllocationDecider
- SearchableSnapshotAllocationDecider
- SearchableSnapshotRepositoryExistsAllocationDecider
- SearchableSnapshotEnableAllocationDecider
- HasFrozenCacheAllocationDecider
- DedicatedFrozenNodeAllocationDecider
- public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
- ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, ThreadContext threadContext,
- SystemIndices systemIndices) {
- this.clusterPlugins = clusterPlugins;
- this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
- // 创建 AllocationDeciders 里面是所有决策器
- this.allocationDeciders = new AllocationDeciders(deciderList);
- // 创建 BalancedShardsAllocator
- this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
- this.clusterService = clusterService;
- this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext, systemIndices);
- // 创建 AllocationService
- // 提交 AllocationDeciders、BalancedShardsAllocator
- // 在初始化 Node() 对象时会给 AllocationService 提交 GatewayAllocator 实例
- // 最终 AllocationService 引用了 AllocationDeciders、BalancedShardsAllocator、GatewayAllocator
- this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
- this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService);
- }
- 【log】AllocationService.showAllocatorsInfo() 输出 allocators 信息
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.ccr.allocation.CcrPrimaryFollowerAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotEnableAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.HasFrozenCacheAllocationDecider
- 【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider
- 【log】AllocationService.shardsAllocator clazz=class org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator
- 【log】AllocationService.existingShardsAllocators key=gateway_allocator,clazz=class org.elasticsearch.gateway.GatewayAllocator
- 【log】AllocationService.existingShardsAllocators key=searchable_snapshot_allocator,clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.SearchableSnapshotAllocator
- public CommandsResult reroute(final ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) {
- RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
- RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes(), currentNanoTime());
- allocation.debugDecision(true);
- allocation.ignoreDisable(true);
- if (retryFailed) {
- resetFailedAllocationCounter(allocation);
- }
- RoutingExplanations explanations = commands.execute(allocation, explain);
- // we revert the ignore disable flag, since when rerouting, we want the original setting to take place
- allocation.ignoreDisable(false);
- // 执行 reroute
- reroute(allocation);
- // 返回新的集群状态到 TransportClusterRerouteAction.execute() 中
- return new CommandsResult(explanations, buildResultAndLogHealthChange(clusterState, allocation, "reroute commands"));
- }
- private void reroute(RoutingAllocation allocation) {
- // 参数时 RoutingAllocation
- // 该类持有当前集群shard分配的状态信息、决策信息、节点信息等,在后面的分配过程的主要操作类。
- assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
- assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty() :
- "auto-expand replicas out of sync with number of nodes in the cluster";
- assert assertInitialized();
- removeDelayMarkers(allocation);
- // 先调用 GatewayAllocator 查询现有未分配的shard
- allocateExistingUnassignedShards(allocation);
- // 再调用 BalancedShardsAllocator 根据节点负载找一个node进行分配
- shardsAllocator.allocate(allocation);
- assert RoutingNodes.assertShardStats(allocation.routingNodes());
- }
- private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
- // 将集群信息中的所有未分配 shard 优先级排序
- allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
- for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
- existingShardsAllocator.beforeAllocation(allocation);
- }
- final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
- // 处理未分配的主shard
- while (primaryIterator.hasNext()) {
- final ShardRouting shardRouting = primaryIterator.next();
- if (shardRouting.primary()) {
- // 从集合中查找 GatewayAllocator 并调用 allocateUnassigned()
- ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
- allocator.allocateUnassigned(shardRouting, allocation, primaryIterator);
- }
- }
- for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
- existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
- }
- // 处理未分配的副shard
- final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();
- while (replicaIterator.hasNext()) {
- final ShardRouting shardRouting = replicaIterator.next();
- if (shardRouting.primary() == false) {
- ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
- allocator.allocateUnassigned(shardRouting, allocation, replicaIterator);
- }
- }
- }
- public void allocateUnassigned(ShardRouting shardRouting, RoutingAllocation allocation, ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
- // 主副 未分配shard都会进入这里, makeAllocationDecision 是静态函数,调用子类的实现
- // PrimaryShardAllocator.makeAllocationDecision()
- // ReplicaShardAllocator.makeAllocationDecision()
- final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
- if (allocateUnassignedDecision.isDecisionTaken() == false) {
- return;
- }
- // 如果前面通过决策器后返回 YES
- if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
- // 修改 shard 的状态,由 unassigend(未分配) 改为 initalizer(初始化)
- unassignedAllocationHandler.initialize(
- allocateUnassignedDecision.getTargetNode().getId(),
- allocateUnassignedDecision.getAllocationId(),
- getExpectedShardSize(shardRouting, allocation),
- allocation.changes());
- }
- // 忽略
- else {
- unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
- }
- }
- private final ShardId shardId; //分片id
- private final String currentNodeId; //当前所在节点id
- private final String relocatingNodeId; //重新分配的节点id
- private final boolean primary; //是否主
- private final ShardRoutingState state; //分片状态
- public void allocate(RoutingAllocation allocation) {
- if (allocation.routingNodes().size() == 0) {
- failAllocationOfNewPrimaries(allocation);
- return;
- }
- final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
- // 根据权重算法和decider决定把shard分配到哪个节点。同样将决策后的分配信息更新到集群状态,由Master广播下去。
- balancer.allocateUnassigned();
- // 对状态为started的分片根据decider来判断是否需要“move”,
- balancer.moveShards();
- // 根据权重函数平衡集群模型上的节点。
- balancer.balance();
- }
- public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId, long expectedSize, RoutingChangesObserver routingChangesObserver) {
- ensureMutable();
- assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
- // 初始化 ShardRouting
- ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
- // 添加到目的节点的节点信息,这个操作在master执行,生成了新的集群转台后续再广播出去
- node(nodeId).add(initializedShard);
- inactiveShardCount++;
- if (initializedShard.primary()) {
- inactivePrimaryCount++;
- }
- addRecovery(initializedShard);
- assignedShardsAdd(initializedShard);
- // 通知shard路由改变观察者,
- routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
- return initializedShard;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。