赞
踩
注意:
Flink的自带调度器统一实现了一个叫SchedulerNG的接口,继承关系如下:
再来看看这个接口里面有什么功能
在Job提交过程中,JobMaster会调用内部的startScheduling()方法开启调度过程,本文章就围绕着这一方法展开。
startScheduling()这个方法写在抽象类SchedulerBase中,我们看看他干了什么
我们再看看DefaultScheduler.startSchedulingInternal()干了什么吧
这里转到了SchedulingStrategy,我们再看看这个类
这是一个接口,有以上实现类,我们以EagerSchedulingStrategy为例继续看看他的startScheduling干了什么,这个类会对Job立即进行部署。
上面两段代码显示了这一段的调用关系,然后出现了几个新的类和名词,我在下面的引用框中解释一下,如果你知道,可以跳过这一段,直接看allocateSlotsAndDeploy干了什么。
其中要注意,这里生成的ExecutionVertexDeploymentOption的列表是拓扑排序的顺序。
1、什么是Vertex?
ExecutionGraph中节点就叫Vertex。
2、ExecutionVertexDeploymentOption是什么?
是存储需要被调度的task的组件,里面只包装了VertexId和DeploymentOption,而DeploymentOption中只包含一个信息,就是“这个task是否需要发送scheduleOrUpdateConsumer给master”3、什么是SchedulerOperations?
顾名思义就是定义了具体的调度和部署操作(本文章只讲调度部分),里面只有一个方法。
allocateSlotsAndDeploy()上面的绿字翻译过来大概是
* 分配slots和当slots返回的时候进行部署 * 会根据List的顺序进行分配(优先分配靠前的) * 只有Vertex状态为CREATED的才会被接受,如果不是CREATED状态会报错
好的,打完岔我们继续看ScheduleStrategy的默认实现类DefaultScheduler中的allocateSlotsAndDeploy干了啥(可以放大看
可以看到,调度的最后一步被框起来了,我们继续进去看看
先通过executionVertexDeploymentOptions中的VertexId生成ExecutionVertexSchedulingRequirements(包含调度一个Vertex的需求),再通过ExecutionSlotAllocator.allocator继续套皮
再看看这个ExecutionSlotAllocator是什么
官方说它是把slots分配到execution的组件(也就是调度),里面有一个最重要的方法就是allocator,其中定义了调度的逻辑
我们快进到这个类的默认实现中看一看
- @Override
- public List<SlotExecutionVertexAssignment> allocateSlotsFor(
- List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
-
- //查看是否有vertex已经分配了(检查有效性
- validateSchedulingRequirements(executionVertexSchedulingRequirements);
-
- //生成初始的SlotExecutionVertexAssignment的List
- List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
- new ArrayList<>(executionVertexSchedulingRequirements.size());
-
- //计算并返回一个集合,集合包含所有调度的execution vertex的上一次的分配id
- Set<AllocationID> allPreviousAllocationIds = computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
-
- //对每一个Requirement单独处理
- for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
- final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
- final SlotRequestId slotRequestId = new SlotRequestId();
- final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();
-
- LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
-
- //计算一个execution的合适分配地点
- //先根据state算,如果没有就根据input算
- CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
- executionVertexId,
- schedulingRequirements.getPreferredLocations(),
- inputsLocationsRetriever).thenCompose(
- (Collection<TaskManagerLocation> preferredLocations) ->
- slotProviderStrategy.allocateSlot( //⭐根据SlotRequestId、ScheduledUnit、SlotProfile进行分配
- slotRequestId, //ScheduledUnit指需要分配slot的task,SlotProfile描述了task想要的slot
- new ScheduledUnit(
- executionVertexId.getJobVertexId(),
- slotSharingGroupId,
- schedulingRequirements.getCoLocationConstraint()),
- SlotProfile.priorAllocation(
- schedulingRequirements.getTaskResourceProfile(),
- schedulingRequirements.getPhysicalSlotResourceProfile(),
- preferredLocations,
- Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
- allPreviousAllocationIds)));
-
- SlotExecutionVertexAssignment slotExecutionVertexAssignment =
- new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
- // add to map first to avoid the future completed before added.
- pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
-
- slotFuture.whenComplete(
- (ignored, throwable) -> {
- pendingSlotAssignments.remove(executionVertexId);
- if (throwable != null) {
- slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
- }
- });
-
- slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
- }
-
- return slotExecutionVertexAssignments;
- }
其中带⭐的那一行标注了进行分配的逻辑,我们再快进到slotProviderStrategy的默认实现中看一看
原来是又调用了SlotProvider的方法,我们再进去看一看SlotProvider
slotprovider有这么多实现类,到底系统默认用的哪个呢?
经过我的不屑翻找,终于找到了
默认使用的实现类是SchedulerImpl,所以我们要去这个类的allocateSlot方法里看一看,具体的跳转我就不一个个看了,直接看最后的
然后又涉及到SlotSelectionStrategy类,又是个没见过的类,我们再去看看
下面直接写答案,代码就不复制了,怕弄乱
以上,schedule过程(为task分配合适的slot)就完毕了,后面可能还会写一篇补充一下TaskManager
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。