赞
踩
Flink 调度源码分析1:拓扑图创建与提交过程
Flink 调度源码分析2:调度过程
Flink 调度源码分析3:Slot 分配策略
Flink 调度源码分析4:Physical Slot 分配过程
系统已经实现的分配策略有:SimpleExecutionSlotAllocator、SlotSharingExecutionSlotAllocator。流计算使用的是 SlotSharingExecutionSlotAllocator,故本文讲述该分配器。
从物理共享 Slot 分配逻辑 Slot 。该分配器为每个 ExecutionSlotSharingGroup 维护一个共享 Slot。它为共享 Slot 分配一个物理 Slot,然后从中为计划任务分配逻辑 Slot。在任何托管子任务请求共享 Slot 时,物理 Slot 会被分配给共享 Slot。随后的每个共享子任务都会从现有的共享 Slot 中分配一个逻辑 Slot。只有在所有请求的逻辑 Slot 都被释放或取消后,共享/物理 Slot 才能被释放。
Slot 分配过程:
以下是代码编写逻辑,为了方便理解,讲述的过程的划分与下面不完全相符,但内容是一样的。
代码:SlotSharingExecutionSlotAllocator.allocateSlotsForVertices():
private List<SlotExecutionVertexAssignment> allocateSlotsForVertices( List<ExecutionVertexID> executionVertexIds) { SharedSlotProfileRetriever sharedSlotProfileRetriever = sharedSlotProfileRetrieverFactory.createFromBulk(new HashSet<>(executionVertexIds)); // 获取每个 ExecutionVertex 的共享组 Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup = executionVertexIds.stream() .collect( Collectors.groupingBy( slotSharingStrategy::getExecutionSlotSharingGroup)); Map<ExecutionSlotSharingGroup, SharedSlot> slots = new HashMap<>(executionsByGroup.size()); Set<ExecutionSlotSharingGroup> groupsToAssign = new HashSet<>(executionsByGroup.keySet()); // 检查共享组是否有 slot Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots = tryAssignExistingSharedSlots(groupsToAssign); slots.putAll(assignedSlots); groupsToAssign.removeAll(assignedSlots.keySet()); // 对没有 slot 的共享组分配 slot if (!groupsToAssign.isEmpty()) { Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever); slots.putAll(allocatedSlots); groupsToAssign.removeAll(allocatedSlots.keySet()); // 所有的共享组一定有共享 slot Preconditions.checkState(groupsToAssign.isEmpty()); } // 分配逻辑slot Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments = allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup); // we need to pass the slots map to the createBulk method instead of using the allocator's // 'sharedSlots' // because if any physical slots have already failed, their shared slots have been removed // from the allocator's 'sharedSlots' by failed logical slots. // 创建批量请求,并检查是否成功注册 SharingPhysicalSlotRequestBulk bulk = createBulk(slots, executionsByGroup); bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, allocationTimeout); return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList()); }
(分配过程1. 使用 SlotSharingStrategy 将 exection 映射到 ExecutionSlotSharingGroups。)
这部分功能需要使用到 SlotSharingStrategy,其创建代码如下所示:
SlotSharingStrategy slotSharingStrategy =
slotSharingStrategyFactory.create(
// 这里的 context 为 DefaultScheduler.DefaultExecutionSlotAllocationContext。
context.getSchedulingTopology(),
context.getLogicalSlotSharingGroups(),
context.getCoLocationGroups());
流程图:
代码:
private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() { // 按照拓扑顺序,记录每个 JobVertes 的 ExecutionVertes final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices = getExecutionVertices(); // loop on job vertices so that an execution vertex will not be added into a group // if that group better fits another execution vertex for (List<SchedulingExecutionVertex> executionVertices : allVertices.values()) { // 尝试根据 co-location 限制和上游节点找到共享组 final List<SchedulingExecutionVertex> remaining = tryFindOptimalAvailableExecutionSlotSharingGroupFor(executionVertices); // 对上一步没找到共享组的节点,找到可用的或创建新的共享组 findAvailableOrCreateNewExecutionSlotSharingGroupFor(remaining); // 更新 constraintToExecutionSlotSharingGroupMap // 以便与该节点在同一 CoLocationGroup 的节点可以选择同一 slot updateConstraintToExecutionSlotSharingGroupMap(executionVertices); } return executionSlotSharingGroupMap; }
说明:
这部分比较简单,不做过多讲述。
代码:
private Map<ExecutionSlotSharingGroup, SharedSlot> tryAssignExistingSharedSlots(
Set<ExecutionSlotSharingGroup> executionSlotSharingGroups) {
Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =
new HashMap<>(executionSlotSharingGroups.size());
for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) {
// 检查是否有对应的 slot
SharedSlot sharedSlot = sharedSlots.get(group);
if (sharedSlot != null) {
assignedSlots.put(group, sharedSlot);
}
}
return assignedSlots;
}
说明:
流程图:
代码:
private Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots( Set<ExecutionSlotSharingGroup> executionSlotSharingGroups, SharedSlotProfileRetriever sharedSlotProfileRetriever) { List<PhysicalSlotRequest> slotRequests = new ArrayList<>(); Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = new HashMap<>(); Map<SlotRequestId, ExecutionSlotSharingGroup> requestToGroup = new HashMap<>(); Map<SlotRequestId, ResourceProfile> requestToPhysicalResources = new HashMap<>(); // 对每个共享组创建 slot 请求 for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) { SlotRequestId physicalSlotRequestId = new SlotRequestId(); // 使用 SharedSlotProfileRetriever 创建 slot 配置文件 ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group); SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile); // 创建 PhysicalSlot 请求 PhysicalSlotRequest request = new PhysicalSlotRequest( physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely); // 存储 slot 请求,以及该请求与共享组和物理资源配置的对应 slotRequests.add(request); requestToGroup.put(physicalSlotRequestId, group); requestToPhysicalResources.put(physicalSlotRequestId, physicalSlotResourceProfile); } // 对于上一步创建的所有请求 slotRequests,从 PhysicalSlotProvider 分配一个 PhysicalSlot Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult = slotProvider.allocatePhysicalSlots(slotRequests); // 根据返回的 PhysicalSlot 创建 SharedSlot allocateResult.forEach( (slotRequestId, resultCompletableFuture) -> { ExecutionSlotSharingGroup group = requestToGroup.get(slotRequestId); // 获取新创建的 PhysicalSlot CompletableFuture<PhysicalSlot> physicalSlotFuture = resultCompletableFuture.thenApply( PhysicalSlotRequest.Result::getPhysicalSlot); // 根据创建的新的 PhysicalSlot 创建共享组,并维护相关信息 SharedSlot slot = new SharedSlot( slotRequestId, requestToPhysicalResources.get(slotRequestId), group, physicalSlotFuture, slotWillBeOccupiedIndefinitely, this::releaseSharedSlot); allocatedSlots.put(group, slot); Preconditions.checkState(!sharedSlots.containsKey(group)); sharedSlots.put(group, slot); }); return allocatedSlots; }
说明:
代码:
CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID executionVertexId) { Preconditions.checkArgument( executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId), "Trying to allocate a logical slot for execution %s which is not in the ExecutionSlotSharingGroup", executionVertexId); // 检查当前 executionVertex 是否已经分配过 LogicalSlot CompletableFuture<SingleLogicalSlot> logicalSlotFuture = requestedLogicalSlots.getValueByKeyA(executionVertexId); if (logicalSlotFuture != null) { LOG.debug("Request for {} already exists", getLogicalSlotString(executionVertexId)); } else { // 如果没分配过 LogicalSlot,则为其分配 logicalSlotFuture = allocateNonExistentLogicalSlot(executionVertexId); } return logicalSlotFuture.thenApply(Function.identity()); } // 分配 slot 需要调用下面的函数 private CompletableFuture<SingleLogicalSlot> allocateNonExistentLogicalSlot( ExecutionVertexID executionVertexId) { CompletableFuture<SingleLogicalSlot> logicalSlotFuture; SlotRequestId logicalSlotRequestId = new SlotRequestId(); String logMessageBase = getLogicalSlotString(logicalSlotRequestId, executionVertexId); LOG.debug("Request a {}", logMessageBase); // 分配逻辑 slot // 等待 PhysicalSlot 申请完成,便创建 LogicalSlot 并返回 logicalSlotFuture = slotContextFuture.thenApply( physicalSlot -> { LOG.debug("Allocated {}", logMessageBase); return createLogicalSlot(physicalSlot, logicalSlotRequestId); }); // 记录已分配的逻辑 slot requestedLogicalSlots.put(executionVertexId, logicalSlotRequestId, logicalSlotFuture); // If the physical slot request fails (slotContextFuture), it will also fail the logicalSlotFuture. // Therefore, the next `exceptionally` callback will call removeLogicalSlotRequest and do // the cleanup in requestedLogicalSlots and eventually in sharedSlots // 逻辑 slot 分配异常时执行 logicalSlotFuture.exceptionally( cause -> { LOG.debug("Failed {}", logMessageBase, cause); removeLogicalSlotRequest(logicalSlotRequestId); return null; }); return logicalSlotFuture; }
说明:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。