elasticsearch源码分析-05分片分配_elasticsearch源码 id 分片号 算法

elasticsearch源码 id 分片号 算法



public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
                     ClusterInfoService clusterInfoService) {
    this.clusterPlugins = clusterPlugins;
    this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
    this.allocationDeciders = new AllocationDeciders(deciderList);
    this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
    this.clusterService = clusterService;
    this.indexNameExpressionResolver = new IndexNameExpressionResolver();
    this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService);
Map<Class, AllocationDecider> deciders = new LinkedHashMap<>();
addAllocationDecider(deciders, new MaxRetryAllocationDecider());
addAllocationDecider(deciders, new ResizeAllocationDecider());
addAllocationDecider(deciders, new ReplicaAfterPrimaryActiveAllocationDecider());
addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider());
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
  • canRebalance:分片是否可以平衡到给的的allocation
  • canAllocate:分片是否可以分配到给定的节点
  • canRemain:分片是否可以保留在给定的节点
  • shouldAutoExpandToNode:分片是否可以自动扩展到给定节点
  • canForceAllocatePrimary:主分片是否可以强制分配在给定节点
  • 负载均衡类
        this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
        allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings));
allocationService.reroute(newState, "state recovered");

public ClusterState reroute(ClusterState clusterState, String reason) {
    ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);

    RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
    // shuffle the unassigned nodes, just so we won't have things like poison failed shards
    RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
                                                         clusterInfoService.getClusterInfo(), currentNanoTime());
    if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
        return clusterState;
    return buildResultAndLogHealthChange(clusterState, allocation, reason);
private void reroute(RoutingAllocation allocation) {
    allocateExistingUnassignedShards(allocation);  // try to allocate existing shard copies first
    assert RoutingNodes.assertShardStats(allocation.routingNodes());
private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
        allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering

        for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
        final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
        while (primaryIterator.hasNext()) {
            final ShardRouting shardRouting = primaryIterator.next();
            if (shardRouting.primary()) {
                getAllocatorForShard(shardRouting, allocation)
                    .allocateUnassigned(shardRouting, allocation, primaryIterator);

        for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
        final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();
        while (replicaIterator.hasNext()) {
            final ShardRouting shardRouting = replicaIterator.next();
            if (shardRouting.primary() == false) {
                getAllocatorForShard(shardRouting, allocation)
                    .allocateUnassigned(shardRouting, allocation, replicaIterator);
public void allocateUnassigned(ShardRouting shardRouting, final RoutingAllocation allocation,UnassignedAllocationHandler unassignedAllocationHandler) {
    assert primaryShardAllocator != null;
    assert replicaShardAllocator != null;
    innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);

protected static void innerAllocatedUnassigned(RoutingAllocation allocation,
                                               PrimaryShardAllocator primaryShardAllocator,
                                               ReplicaShardAllocator replicaShardAllocator,
                                               ShardRouting shardRouting,
                                               ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
    assert shardRouting.unassigned();
    if (shardRouting.primary()) {//分配主shard
        primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler);
    } else {//分配副本shard
        replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler);
public void allocateUnassigned(ShardRouting shardRouting, RoutingAllocation allocation,
                                   ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
        final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);

        if (allocateUnassignedDecision.isDecisionTaken() == false) {
            // no decision was taken by this allocator
        if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
                shardRouting.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE :
                                         allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
        } else {
            unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
    public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unassignedShard,
                                                             final RoutingAllocation allocation,
                                                             final Logger logger) {
        if (isResponsibleFor(unassignedShard) == false) {
            // this allocator is not responsible for allocating this shard
            return AllocateUnassignedDecision.NOT_TAKEN;
        final boolean explain = allocation.debugDecision();
        final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
        if (shardState.hasData() == false) {
            List<NodeAllocationResult> nodeDecisions = null;
            if (explain) {
                nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
            return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);

        // don't create a new IndexSetting object for every shard as this could cause a lot of garbage
        // on cluster restart if we allocate a boat load of shards
        final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index());
        final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id());
        final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;

        assert inSyncAllocationIds.isEmpty() == false;
        // use in-sync allocation ids to select nodes
        //使用同步分配 ID 来选择节点
        final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore,
            allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);
        final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
        logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(),
            unassignedShard.id(), nodeShardsResult.orderedAllocationCandidates.size(), unassignedShard, inSyncAllocationIds);

        if (enoughAllocationsFound == false) {
            if (snapshotRestore) {
                // let BalancedShardsAllocator take care of allocating this shard
                logger.debug("[{}][{}]: missing local data, will restore from [{}]",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard.recoverySource());
                return AllocateUnassignedDecision.NOT_TAKEN;
            } else {
                // We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary.
                // We could just be waiting for the node that holds the primary to start back up, in which case the allocation for
                // this shard will be picked up when the node joins and we do another allocation reroute
                logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]",
                             unassignedShard.index(), unassignedShard.id(), nodeShardsResult.allocationsFound);
                return AllocateUnassignedDecision.no(AllocationStatus.NO_VALID_SHARD_COPY,
                    explain ? buildNodeDecisions(null, shardState, inSyncAllocationIds) : null);
        NodesToAllocate nodesToAllocate = buildNodesToAllocate(
            allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, false
        DiscoveryNode node = null;
        String allocationId = null;
        boolean throttled = false;
        if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
            DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
            logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation",
                         unassignedShard.index(), unassignedShard.id(), unassignedShard, decidedNode.nodeShardState.getNode());
            node = decidedNode.nodeShardState.getNode();
            allocationId = decidedNode.nodeShardState.allocationId();
        } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) {
            // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard
            // can be force-allocated to one of the nodes.
            nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);
            if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
                final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
                final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;
                logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeShardState.getNode());
                node = nodeShardState.getNode();
                allocationId = nodeShardState.allocationId();
            } else if (nodesToAllocate.throttleNodeShards.isEmpty() == false) {
                logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);
                throttled = true;
            } else {
                logger.debug("[{}][{}]: forced primary allocation denied [{}]",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard);
        } else {
            // we are throttling this, since we are allowed to allocate to this node but there are enough allocations
            // taking place on the node currently, ignore it for now
            logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation",
                         unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);
            throttled = true;

        List<NodeAllocationResult> nodeResults = null;
        if (explain) {
            nodeResults = buildNodeDecisions(nodesToAllocate, shardState, inSyncAllocationIds);
        if (allocation.hasPendingAsyncFetch()) {
            return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeResults);
        } else if (node != null) {
            return AllocateUnassignedDecision.yes(node, allocationId, nodeResults, false);
        } else if (throttled) {
            return AllocateUnassignedDecision.throttle(nodeResults);
        } else {
            return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, nodeResults, true);
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);

protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>
                                                                        fetchData(ShardRouting shard, RoutingAllocation allocation) {
            AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch =
                    shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId,
            AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState =
                    fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
            if (shardState.hasData()) {
            return shardState;
void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
    logger.trace("{} fetching [{}] from {}", shardId, type, nodes);
    action.list(shardId, customDataPath, nodes, new ActionListener<BaseNodesResponse<T>>() {
        public void onResponse(BaseNodesResponse<T> response) {
            processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);

        public void onFailure(Exception e) {
            List<FailedNodeException> failures = new ArrayList<>(nodes.length);
            for (final DiscoveryNode node: nodes) {
                failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e));
            processAsyncFetch(null, failures, fetchingRound);
private final TransportNodesListGatewayStartedShards startedAction;
shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId,
public static final String ACTION_NAME = "internal:gateway/local/started_shards";
  • 1


protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
    new AsyncAction(task, request, listener).start();
protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
    try {
        final ShardId shardId = request.getShardId();
        logger.trace("{} loading local shard state info", shardId);
        ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState(logger, namedXContentRegistry,
        if (shardStateMetadata != null) {
            if (indicesService.getShardOrNull(shardId) == null) {
                final String customDataPath;
                if (request.getCustomDataPath() != null) {
                    customDataPath = request.getCustomDataPath();
                } else {
                    // TODO: Fallback for BWC with older ES versions. Remove once request.getCustomDataPath() always returns non-null
                    final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex());
                    if (metadata != null) {
                        customDataPath = new IndexSettings(metadata, settings).customDataPath();
                    } else {
                        logger.trace("{} node doesn't have meta data for the requests index", shardId);
                        throw new ElasticsearchException("node doesn't have meta data for index " + shardId.getIndex());
                // we don't have an open shard on the store, validate the files on disk are openable
                ShardPath shardPath = null;
                try {
                    shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);
                    if (shardPath == null) {
                        throw new IllegalStateException(shardId + " no shard path found");
                    Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);
                } catch (Exception exception) {
                    final ShardPath finalShardPath = shardPath;
                    logger.trace(() -> new ParameterizedMessage(
                        "{} can't open index for shard [{}] in path [{}]",
                        (finalShardPath != null) ? finalShardPath.resolveIndex() : ""),
                    String allocationId = shardStateMetadata.allocationId != null ?
                        shardStateMetadata.allocationId.getId() : null;
                    return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetadata.primary,

            logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata);
            String allocationId = shardStateMetadata.allocationId != null ?
                shardStateMetadata.allocationId.getId() : null;
            return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetadata.primary);
        logger.trace("{} no local shard info found", shardId);
        return new NodeGatewayStartedShards(clusterService.localNode(), null, false);
    } catch (Exception e) {
        throw new ElasticsearchException("failed to load started shards", e);
这里加载本地shard级元数据然后返回。接收到对端返回数据后将数据放入cache中以便后面执行reroute的时候不用重复获取,然后执行reroute(shardId, “post_response”)方法

protected void reroute(ShardId shardId, String reason) {
    logger.trace("{} scheduling reroute for {}", shardId, reason);
    assert rerouteService != null;
    rerouteService.reroute("async_shard_fetch", Priority.HIGH, ActionListener.wrap(
        r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),
        e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
            clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
                new ClusterStateUpdateTask(priority) {

                    public ClusterState execute(ClusterState currentState) {
                        final boolean currentListenersArePending;
                        synchronized (mutex) {
                            assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners)
                                : "currentListeners=" + currentListeners + ", pendingRerouteListeners=" + pendingRerouteListeners;
                            currentListenersArePending = pendingRerouteListeners == currentListeners;
                            if (currentListenersArePending) {
                                pendingRerouteListeners = null;
                        if (currentListenersArePending) {
                            logger.trace("performing batched reroute [{}]", reason);
                            return reroute.apply(currentState, reason);
                        } else {
                            logger.trace("batched reroute [{}] was promoted", reason);
                            return currentState;

调用execute方法执行return reroute.apply(currentState, reason),reroute在node构造函数中进行设置就是AllocationService的reroute

final RerouteService rerouteService
                = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index());
        final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id());
//使用同步分配 ID 来选择节点
final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore,                                             allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);

protected static NodeShardsResult buildNodeShardsResult(ShardRouting shard, boolean matchAnyShard,
                                                            Set<String> ignoreNodes, Set<String> inSyncAllocationIds,
                                                            FetchResult<NodeGatewayStartedShards> shardState,
                                                            Logger logger) {
        List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
        int numberOfAllocationsFound = 0;
        for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
            DiscoveryNode node = nodeShardState.getNode();
            String allocationId = nodeShardState.allocationId();

            if (ignoreNodes.contains(node.getId())) {
            if (nodeShardState.storeException() == null) {
                if (allocationId == null) {
                    logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
                } else {
                    logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId);
            } else {
                final String finalAllocationId = allocationId;
                if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
                    logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be " +
                        "opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId),
                } else {
                    logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be " +
                        "opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId),
                    allocationId = null;

            if (allocationId != null) {
                assert nodeShardState.storeException() == null ||
                    nodeShardState.storeException() instanceof ShardLockObtainFailedException :
                    "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " +
                        "store throwing " + nodeShardState.storeException();
                if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) {

        final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
        if (matchAnyShard) {
            // prefer shards with matching allocation ids
            Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
                (NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed();
            comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
        } else {


        if (logger.isTraceEnabled()) {
            logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName())
                .collect(Collectors.joining(", ")));
        return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
构造节点列表,如果atchAnyShard 设置为 false只会将同步副本中的shard放入列表,否则任何具有分配的节点都会被添加到列表中,但是同步副本shard在列表前面

NodesToAllocate nodesToAllocate = buildNodesToAllocate(
allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, false

private static NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
                                                        List<NodeGatewayStartedShards> nodeShardStates,
                                                        ShardRouting shardRouting,
                                                        boolean forceAllocate) {
        List<DecidedNode> yesNodeShards = new ArrayList<>();
        List<DecidedNode> throttledNodeShards = new ArrayList<>();
        List<DecidedNode> noNodeShards = new ArrayList<>();
        for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {
            RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
            if (node == null) {
            Decision decision = forceAllocate ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) :
                                                allocation.deciders().canAllocate(shardRouting, node, allocation);
            DecidedNode decidedNode = new DecidedNode(nodeShardState, decision);
            if (decision.type() == Type.THROTTLE) {
            } else if (decision.type() == Type.NO) {
            } else {
        return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards),
        if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
            DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
            logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation",
                         unassignedShard.index(), unassignedShard.id(), unassignedShard, decidedNode.nodeShardState.getNode());
            node = decidedNode.nodeShardState.getNode();
            allocationId = decidedNode.nodeShardState.allocationId();
        } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) {
            // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard
            // can be force-allocated to one of the nodes.
            nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);
            if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
                final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
                final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;
                logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeShardState.getNode());
                node = nodeShardState.getNode();
                allocationId = nodeShardState.allocationId();
            } else if (nodesToAllocate.throttleNodeShards.isEmpty() == false) {
                logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);
                throttled = true;
            } else {
                logger.debug("[{}][{}]: forced primary allocation denied [{}]",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard);
        } else {
            // we are throttling this, since we are allowed to allocate to this node but there are enough allocations
            // taking place on the node currently, ignore it for now
            logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation",
                         unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);
            throttled = true;

        List<NodeAllocationResult> nodeResults = null;
        if (explain) {
            nodeResults = buildNodeDecisions(nodesToAllocate, shardState, inSyncAllocationIds);
        if (allocation.hasPendingAsyncFetch()) {
            return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeResults);
        } else if (node != null) {
            return AllocateUnassignedDecision.yes(node, allocationId, nodeResults, false);
        } else if (throttled) {
            return AllocateUnassignedDecision.throttle(nodeResults);
        } else {
            return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, nodeResults, true);
public void allocateUnassigned(ShardRouting shardRouting, RoutingAllocation allocation,
                                   ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
        final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);

        if (allocateUnassignedDecision.isDecisionTaken() == false) {
            // no decision was taken by this allocator
        if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
                shardRouting.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE :
                                         allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
        } else {
            unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId,
                                        long expectedSize, RoutingChangesObserver routingChangesObserver) {
        assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
        ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
        if (initializedShard.primary()) {
        routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
        return initializedShard;
public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId,
                                        long expectedSize, RoutingChangesObserver routingChangesObserver) {
        assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
        ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
        if (initializedShard.primary()) {
        routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
        return initializedShard;

public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) {
        assert state == ShardRoutingState.UNASSIGNED : this;
        assert relocatingNodeId == null : this;
        final AllocationId allocationId;
        if (existingAllocationId == null) {
            allocationId = AllocationId.newInitializing();
        } else {
            allocationId = AllocationId.newInitializing(existingAllocationId);
        return new ShardRouting(shardId, nodeId, null, primary, ShardRoutingState.INITIALIZING, recoverySource,
            unassignedInfo, allocationId, expectedShardSize);
public void allocate(RoutingAllocation allocation) {
        if (allocation.routingNodes().size() == 0) {
        final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
