当前位置:   article > 正文

Flink源码漫游指南<陆>Flink的默认资源调度过程_flink 计算资源的调度是如何实现的

flink 计算资源的调度是如何实现的

注意:

  • 本章源码基于1.10版本,由于这部分重构频繁,太早(1.8)和太近(1.14)的版本都会和本版本有较大出入。
  • 本章所指的调度过程指的是从生成ExecutionGraph之后,为其分配slot的过程,前面transform的过程和后面deply的过程不包含在内。

一、Flink的默认调度器

Flink的自带调度器统一实现了一个叫SchedulerNG的接口,继承关系如下:

再来看看这个接口里面有什么功能

 

 在Job提交过程中,JobMaster会调用内部的startScheduling()方法开启调度过程,本文章就围绕着这一方法展开。

startScheduling()这个方法写在抽象类SchedulerBase中,我们看看他干了什么

 我们再看看DefaultScheduler.startSchedulingInternal()干了什么吧

这里转到了SchedulingStrategy,我们再看看这个类 

  这是一个接口,有以上实现类,我们以EagerSchedulingStrategy为例继续看看他的startScheduling干了什么,这个类会对Job立即进行部署。

 上面两段代码显示了这一段的调用关系,然后出现了几个新的类和名词,我在下面的引用框中解释一下,如果你知道,可以跳过这一段,直接看allocateSlotsAndDeploy干了什么。

其中要注意,这里生成的ExecutionVertexDeploymentOption的列表是拓扑排序的顺序。

 

1、什么是Vertex?

ExecutionGraph中节点就叫Vertex。

2、ExecutionVertexDeploymentOption是什么?

是存储需要被调度的task的组件,里面只包装了VertexId和DeploymentOption,而DeploymentOption中只包含一个信息,就是“这个task是否需要发送scheduleOrUpdateConsumer给master”

3、什么是SchedulerOperations?

 顾名思义就是定义了具体的调度和部署操作(本文章只讲调度部分),里面只有一个方法。

allocateSlotsAndDeploy()上面的绿字翻译过来大概是

* 分配slots和当slots返回的时候进行部署
* 会根据List的顺序进行分配(优先分配靠前的)
* 只有Vertex状态为CREATED的才会被接受,如果不是CREATED状态会报错

 好的,打完岔我们继续看ScheduleStrategy的默认实现类DefaultScheduler中的allocateSlotsAndDeploy干了啥(可以放大看

 可以看到,调度的最后一步被框起来了,我们继续进去看看

先通过executionVertexDeploymentOptions中的VertexId生成ExecutionVertexSchedulingRequirements(包含调度一个Vertex的需求),再通过ExecutionSlotAllocator.allocator继续套皮

再看看这个ExecutionSlotAllocator是什么

 

 官方说它是把slots分配到execution的组件(也就是调度),里面有一个最重要的方法就是allocator,其中定义了调度的逻辑

我们快进到这个类的默认实现中看一看

  1. @Override
  2. public List<SlotExecutionVertexAssignment> allocateSlotsFor(
  3. List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
  4. //查看是否有vertex已经分配了(检查有效性
  5. validateSchedulingRequirements(executionVertexSchedulingRequirements);
  6. //生成初始的SlotExecutionVertexAssignment的List
  7. List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
  8. new ArrayList<>(executionVertexSchedulingRequirements.size());
  9. //计算并返回一个集合,集合包含所有调度的execution vertex的上一次的分配id
  10. Set<AllocationID> allPreviousAllocationIds = computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
  11. //对每一个Requirement单独处理
  12. for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
  13. final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
  14. final SlotRequestId slotRequestId = new SlotRequestId();
  15. final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();
  16. LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
  17. //计算一个execution的合适分配地点
  18. //先根据state算,如果没有就根据input算
  19. CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
  20. executionVertexId,
  21. schedulingRequirements.getPreferredLocations(),
  22. inputsLocationsRetriever).thenCompose(
  23. (Collection<TaskManagerLocation> preferredLocations) ->
  24. slotProviderStrategy.allocateSlot( //⭐根据SlotRequestId、ScheduledUnit、SlotProfile进行分配
  25. slotRequestId, //ScheduledUnit指需要分配slot的task,SlotProfile描述了task想要的slot
  26. new ScheduledUnit(
  27. executionVertexId.getJobVertexId(),
  28. slotSharingGroupId,
  29. schedulingRequirements.getCoLocationConstraint()),
  30. SlotProfile.priorAllocation(
  31. schedulingRequirements.getTaskResourceProfile(),
  32. schedulingRequirements.getPhysicalSlotResourceProfile(),
  33. preferredLocations,
  34. Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
  35. allPreviousAllocationIds)));
  36. SlotExecutionVertexAssignment slotExecutionVertexAssignment =
  37. new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
  38. // add to map first to avoid the future completed before added.
  39. pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
  40. slotFuture.whenComplete(
  41. (ignored, throwable) -> {
  42. pendingSlotAssignments.remove(executionVertexId);
  43. if (throwable != null) {
  44. slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
  45. }
  46. });
  47. slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
  48. }
  49. return slotExecutionVertexAssignments;
  50. }

 其中带⭐的那一行标注了进行分配的逻辑,我们再快进到slotProviderStrategy的默认实现中看一看

原来是又调用了SlotProvider的方法,我们再进去看一看SlotProvider

slotprovider有这么多实现类,到底系统默认用的哪个呢?

经过我的不屑翻找,终于找到了

 默认使用的实现类是SchedulerImpl,所以我们要去这个类的allocateSlot方法里看一看,具体的跳转我就不一个个看了,直接看最后的

然后又涉及到SlotSelectionStrategy类,又是个没见过的类,我们再去看看

 

下面直接写答案,代码就不复制了,怕弄乱

  •  SlotSelectionStrategy
    • LocationPreferenceSlotSelectionStrategy  基于位置偏好进行选择,根据locality进行打分,打完了选候选中分最高的,打分方法写在子类中了
      • EvenlySpreadOutLocationPreferenceSlotSelectionStrategy  当没有位置偏好的时候,从符合task要求的slot中选择executor利用率最小的
      • DefaultLocationPreferenceSlotSelectionStrategy  当没有位置偏好的时候,从候选中选择第一个匹配的
    • PreviousAllocationSlotSelectionStrategy  基于之前的位置进行选择,如果没有这个参数,就按位置偏好进行选择

 

以上,schedule过程(为task分配合适的slot)就完毕了,后面可能还会写一篇补充一下TaskManager

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/659858
推荐阅读
相关标签
  

闽ICP备14008679号