赞
踩
第五章就来从源码层面学习一下Flink的控制任务调度——作业链与处理槽共享组。
问题整理:
1. 什么是任务链?作业链怎么操作?
2. 什么是槽共享组?slot共享机制是怎么实现的?
3. 如何通过调整默认行为以及控制作业链与作业分配(处理槽共享组)来提高应用的性能?
为了实现并行执行,Flink应用会将算子划分为不同任务,然后将这些任务分配到集群中的不同进程上去执行。和很多其他分布式系统一样,Flink应用的性能很大程度上取决于任务的调度方式。任务被分配到的工作进程、任务间的共存情况以及工作进程中的任务数都会对应用的性能产生显著影响。
本博客我们就讨论一下如何通过 调整默认行为以及控制作业链与作业分配(处理槽共享组)来提高应用的性能。
其实这两个概念我们可以看作:资源共享链与资源共享组。
当我们编写完一个Flink程序,从Client开始执行——>JobManager——>TaskManager——>Slot启动并执行Task的过程中,会对我们提交的执行计划进行优化,其中有两个比较重要的优化过程是:任务链与处理槽共享组,前者是对执行效率的优化,后者是对内存资源的优化。
这里可以参考Flink内核源码(三)任务调度机制了解一下Flink作业提交过程中图的转换过程。
可见,StreamGraph转换为JobGraph过程中,实际上是逐条审查每一个StreamEdge和该SteamEdge两头连接的两个StreamNode的特性,来决定该StreamEdge两头的StreamNode是不是可以合并在一起形成算子链。
这个判断过程flink给出了明确的规则,我们看一下StreamingJobGraphGenerator
中的isChainable()
方法:
该方法返回true时两个端点才可以合并到一起,根据源码我们可以得出形成作业链的规则如下:
用户能够通过禁用全局作业链的操作来关闭整个Flink的作业链,但是这个操作会影响到这个作业的执行情况,除非我们非常清楚作业的执行过程,否则不建议这么做:StreamExecutionEnvironment.disableOperatorChaining()
。
全局作业链关闭之后,如果想创建对应Operator的作业链,可以使用startNewChain()
方法:someStream.filter(...).map(...).startNewChain().map(...)
。
注意该方法只对当前操作符及之后的操作符有效,所以上述代码只对两个map进行链条绑定。
如果我们只想对某个算子执行禁用作业链,只需调用disableChaining()
方法:someSteam.map().disableChaining().filter()
,该方法只会禁用当前算子的链条(上述代码中就是map),对其他算子操作不产生影响。
槽共享组:出于某中目的将多个Task放到同一个slot中执行
TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念,通过 Task Slot 来定义Flink 中的计算资源。solt 对TaskManager内存进行平均分配,每个solt内存都相同,加起来和等于TaskManager可用内存,但是仅仅对内存做了隔离,并没有对cpu进行隔离。将资源 slot 化意味着来自不同job的task不会为了内存而竞争,而是每个task都拥有一定数量的内存储备。
通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。
问题:
解决方案:
默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。允许槽共享,会有以下两个方面的好处:
Flink决定哪些任务需要共享slot 以及哪些任务必须放入特定slot。虽然task共享Slot提升资源利用率,但是如果一个Slot中容纳过多task反而会造成资源低下(比如极端情况下所有task都分布在一个Slot内)。所以在Flink中task需要按照一定规则共享Slot ,主要通过SlotSharingGroup和CoLocationGroup定义:
CoLocationGroup
:强制将subTasksk放到同一个slot中,是一种硬约束:
SlotSharingGroup
:它是Flink中用来实现slot共享的类,尽可能的允许不同的JobVertices部署在相同的Slot中,但这是一种宽约束,只是尽量做到不能完全保证。
Flink在调度任务分配Slot的时候遵循两个重要原则:
假设有两个TM:TM1、TM2,每个TM有3个Slot:S1,S2,S3。假设source/map的并行度为2,keyBy/window/sink的并行度为4,那么调度的顺序依次为source/map[1] ->source/map[2] ->keyBy/window/sink[1]->keyBy/window/sink[2]->keyBy/window/sink[3]->keyBy/window/sink[4]。那么Flink调度任务时(使用默认共享分组):
首先调度子任务source/map[1]到TM1.S1;
然后调度子任务source/map[2] ,根据Flink的调度原则:source/map[1] 和source/map[2] 属于同一个Task下的两个SubTask,所以他们不能放到同一个Slot中,所以source/map[2]被调度到TM1.S2;
然后调度keyBy/window/sink,keyBy/window/sink的子任务会被依次调度到TM1.S1、TM1.S2、TM2.S1、TM2.S2。但是如果source/map与keyBy/window/sink属于不同分组,那么keyBy/window/sink会被调度到TM1.S3、TM2.S1、TM2.S2、TM2.S3。
参考:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。