赞
踩
最近在学习了尚硅谷的Flink内核源码解析,内容很多,因此想要整理学习一下。Flink的版本是1.12.0
。
第三章就来从源码层面学习一下Flink的任务调度机制。主要分为两部分,一部分是图的详细转换过程,另一部分是任务调度执行。
问题整理:
1. Flink的任务是怎么调度的?
2. Flink内部的执行图是怎么转换的?
3. Flink的任务调度策略都有哪些?
首先看一下Task调度中执行图的转换:(根据图片能够讲解图的转换过程)
然后还是从一个Flink Task任务调度执行图开始学习:
再根据这个调度图分析一下图的转换。
接下来就是具体的源码分析了。
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
。
StreamGraph
:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。JobGraph
:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。ExecutionGraph
:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是JobGraph 的并行化版本,是调度层最核心的数据结构。物理执行图
: JobManager 根 据 ExecutionGraph 对 Job 进 行 调 度 后 , 在 各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。例如 example 里的 SocketTextStreamWordCount 并发度为 2(Source 为 1 个并发度)的四层执行图的演变过程如下图所示:
public static void main(String[] args) throws Exception {
// 检查输入
final ParameterTool params = ParameterTool.fromArgs(args);
...
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStream<String> text =
env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.sum(1);
counts.print();
// execute program
env.execute("WordCount from SocketTextStream Example");
}
名词解释:
1)StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。
(1)StreamNode
:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
(2)StreamEdge
:表示连接两个 StreamNode 的边。
2)JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。
(1)JobVertex
:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是IntermediateDataSet。
(2)IntermediateDataSet
:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
(3)JobEdge
:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。
3)ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是 JobGraph 的并行化版本,是调度层最核心的数据结构。
(1) ExecutionJobVertex
: 和 JobGraph中的 JobVertex一一对 应 。 每一个ExecutionJobVertex 都有和并发度一样多的 ExecutionVertex。
(2)ExecutionVertex
:表示 ExecutionJobVertex 的其中一个并发子任务,输入是ExecutionEdge,输出是 IntermediateResultPartition。
(3)IntermediateResult
:和 JobGraph 中的 IntermediateDataSet 一一对应。一个 IntermediateResult 包含多个 IntermediateResultPartition,其个数等于该 operator 的并发度。
(4)IntermediateResultPartition
:表示 ExecutionVertex 的一个输出分区,producer 是ExecutionVertex,consumer 是若干个 ExecutionEdge。
(5)ExecutionEdge
:表示 ExecutionVertex 的输入,source 是 IntermediateResultPartition,target 是 ExecutionVertex。source 和 target 都只能是一个。
(6)Execution
:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过ExecutionAttemptID 来唯一标识。JM 和 TM 之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。
从这些基本概念中,也可以看出以下几点:
4)物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
(1)Task
:Execution 被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。
(2)ResultPartition
:代表由一个 Task 的生成的数据,和 ExecutionGraph 中的IntermediateResultPartition 一一对应。
(3)ResultSubpartition
:是 ResultPartition 的一个子分区。每个 ResultPartition 包含多个ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定。
(4)InputGate
:代表 Task 的输入封装,和 JobGraph 中 JobEdge 一一对应。每个 InputGate消费了一个或多个的 ResultPartition。
(5)InputChannel
:每个 InputGate 会包含一个以上的 InputChannel,和 ExecutionGraph中的 ExecutionEdge 一一对应,也和 ResultSubpartition 一对一地相连,即一个 InputChannel接收一个 ResultSubpartition 的输出。
对于每一种图应该关注:
调用用户代码中的 StreamExecutionEnvironment.execute()
-> execute(getJobName())
-> execute(getStreamGraph(jobName))
-> getStreamGraph(jobName, true)
StreamExecutionEnvironment.java
首先进入 generate()
方法
可以看一下这些跳转到的源码。
主要内容就是:
map 转换将用户自定义的函数 MapFunction 包装到 StreamMap 这个 Operator 中,再将 StreamMap 包装到 OneInputTransformation,最后该 transformation 存 到 env 中,当调用 env.execute 时,遍历其中的 transformation 集合构造出 StreamGraph。其分层实现如下图所示:
另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition 等。如下图所示的转换树,在运行时会优化成下方的操作图。
接着分析StreamGraph 生成的源码:
StreamExecutionEnvironment.java -> generator() -> transform()
该函数首先会对该 transform 的上游 transform 进行递归转换,确保上游的都已经完成了转化。然后通过 transform 构造出 StreamNode,最后与上游的 transform 进行连接,构造出 StreamNode。
实例分析:
看一个实例:如下程序,是一个从 Source 中按行切分成单词并过滤输出的简单流程序,其中包含了逻辑转换:随机分区 shuffle。分析该程序是如何生成 StreamGraph 的。
DataStream<String> text = env.socketTextStream(hostName, port);
text.flatMap(new LineSplitter()).shuffle().filter(new HelloFilter()).print();
首先会在 env 中生成一棵 transformation 树,用 List<Transformation<?>>保存。其结构图如下:
其中符号*为 input 指针,指向上游的 transformation,从而形成了一棵 transformation树。然后,通过调用StreamGraphGenerator.generate(env, transformations)来生成StreamGraph。自底向上递归调用每一个 transformation,也就是说处理顺序是Source->FlatMap->Shuffle->Filter->Sink
。
1)首先处理的 Source,生成了 Source 的 StreamNode。
2)然后处理的 FlatMap,生成了 FlatMap 的 StreamNode,并生成 StreamEdge 连接上游 Source 和 FlatMap。由于上下游的并发度不一样(1:4),所以此处是 Rebalance 分区。
3)然后处理的 Shuffle,由于是逻辑转换,并不会生成实际的节点。将 partitioner 信息暂存在 virtuaPartitionNodes 中。
4)在处理 Filter 时,生成了 Filter 的 StreamNode。发现上游是 shuffle,找到 shuffle 的上游FlatMap,创建 StreamEdge 与 Filter 相连。并把 ShufflePartitioner 的信息写到 StreamEdge中。
5)最后处理 Sink,创建 Sink 的 StreamNode,并生成 StreamEdge 与上游 Filter 相连。由于上下游并发度一样(4:4),所以此处选择 Forward 分区。
StreamGraph 转变成 JobGraph 也是在 Client 完成,主要作了三件事:
核心逻辑
:根据 StreamGraph,生成 JobGraph:
private JobGraph createJobGraph() {
preValidate();
// make sure that all vertices start immediately
// streaming 模式下,调度模式是所有节点(vertices)一起启动
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
// 广度优先遍历 StreamGraph 并且为每个 SteamNode 生成 hash id,
// 保证如果提交的拓扑没有改变,则每次生成的 hash 都是一样的
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
// 最重要的函数,生成 JobVertex,JobEdge 等,并尽可能地将多个节点 chain 在一起
setChaining(hashes, legacyHashes);
// 将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中
// (出边集合已经在 setChaining 的时候写入了)
setPhysicalEdges();
// 根据 group name,为每个 JobVertex 指定所属的 SlotSharingGroup
// 以及针对 Iteration 的头尾设置 CoLocationGroup
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
// 配置 checkpoint
configureCheckpointing();
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
// 将 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
StreamingJobGraphGenerator 的成员变量都是为了辅助生成最终的 JobGraph。
为所有节点生成一个唯一的 hash id,如果节点在多次提交中没有改变(包括并发度、上下游等),那么这个 id 就不会改变,这主要用于故障恢复。
这里不能用 StreamNode.id 来代替,因为这是一个从 1 开始的静态计数变量,同样的 Job可能会得到不一样的 id,如下代码示例的两个 job 是完全一样的,但是 source 的 id 却不一样了。
client 生成 JobGraph 之后,就通过 submitJob 提交给 JobManager,JobManager 会根据JobGraph 生成对应的ExecutionGraph。
ExecutionGraph 是 Flink 作业调度时使用到的核⼀数据结构,它包含每一个并行的 task、每⼀个 intermediate stream 以及它们之间的关系。
以 per-job 模式为例,分析 ExecutionGraph 的生成逻辑:
在 Dispacher 创建 JobManagerRunner 时,调用 createJobManagerRunner: => createJobManagerRunner()
=> new JobManagerRunnerImpl()
=> createJobMasterService()
=> new JobMaster()
在创建 JobMaster 的时候,创建了 Scheduler 调度器
=> createScheduler()
=> createInstance()
=> new DefaultScheduler() #调度器
=> createAndRestoreExecutionGraph()
=> createExecutionGraph()
=> ExecutionGraphBuilder.buildGraph()
调度的源码分析:从JobMaster.java开始
调度器是 Flink 作业执行的核心组件,管理作业执行的所有相关过程,包括 JobGraph 到 ExecutionGraph 的转换、作业生命周期管理(作业的发布、取消、停止)、作业的 Task 生命周期管理(Task 的发布、取消、停止)、资源申请与释放、作业和 Task 的 Failover 等。
调度有几个重要的组件:
调度器作用:
1)作业的生命周期管理,如作业的发布、挂起、取消
2)作业执行资源的申请、分配、释放
3)作业的状态管理,作业发布过程中的状态变化和作业异常时的 FailOver 等
4)作业的信息提供,对外提供作业的详细信息
ScheduleMode 决定如何启动 ExecutionGraph 中的 Task。Flink 提供 3 中调度模式:
1)Eager 调度
适用于流计算。一次性申请需要的所有资源,如果资源不足,则作业启动失败。
2)分阶段调度
LAZY_FROM_SOURCES 适用于批处理。从 SourceTask 开始分阶段调度,申请资源的时候,一次性申请本阶段所需要的所有资源。上游 Task 执行完毕后开始调度执行下游的 Task,读取上游的数据,执行本阶段的计算任务,执行完毕之后,调度后一个阶段的 Task,依次进行调度,直到作业完成。
3)分阶段 Slot 重用调度
LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 适用于批处理。与分阶段调度基本一样,区别在于该模式下使用批处理资源申请模式,可以在资源不足的情况下执行作业,但是需要确保在本阶段的作业执行中没有 Shuffle 行为。
目前视线中的 Eager 模式和 LAZY_FROM_SOURCES 模式的资源申请逻辑一样,
LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 是单独的资源申请逻辑。
调度策略有三种实现:
PipelinedRegionSchedulingStrategy
:以流水线的局部为粒度进行调度PipelinedRegionSchedulingStrategy 是 1.11 加入的,从 1.12 开始,将以 pipelined region 为单位进行调度。pipelined region 是一组流水线连接的任务。这意味着,对于包含多个 region 的流作业,在开始部署任务之前,它不再等待所有任务获取 slot。取而代之的是,一旦任何region 获得了足够的任务 slot 就可以部署它。对于批处理作业,将不会为任务分配 slot,也不会单独部署任务。取而代之的是,一旦某个 region 获得了足够的 slot,则该任务将与所有其他任务一起部署在同一区域中。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。