赞
踩
Flink运行时架构中,最重要的就是两大组件:
JobManager
:管理者TaskManager
:工作者Flink 的作业提交和任务处理时的系统图如下:
注意:
- 一般情况下(会话模式或单节点模式下),客户端并不是处理系统的一部分,它只负责作业的提交。具体来说,就是调用程序的 main 方法,将代码转换成“数据流图”(Stream Graph),然后经过优化得到 作业图(JobGraph),并发送给 JobManager。提交之后,任务的执行其实就跟客户端没有关系了;我们可以在客户端选择断开与 JobManager 的连接, 也可以继续保持连接。之前我们在命令提交作业时,加上的-d 参数,就是表示分离模式(detached mode),也就是断开连接。
- TaskManager 启动之后,JobManager 会与它建立连接,并将作业图(JobGraph)转换成“执行图”(ExecutionGraph)分发给可用的 TaskManager,然后就由 TaskManager 具体执行任务。
一般情况下,一个Flink应用程序都应该被唯一的 JobManager 所控制执行。当然,在高可用(HA)的场景下,一个Flink应用程序含有多个job,因此会出现一个应用程序对应多个 JobManager的情况,此时只有一个是正在运行的领导节点(leader),其他都是备用节点(standby)。
TaskManager:负责数据计算,真正处理作业的地方。
注意:每一个算子对应一个任务(注意多个算子合并算一个算子,对应一个任务),而每一个任务 (task) 拆分成多个并行的子任务(subtask)执行。
注意:TaskManager的内存等资源大小可通过
flink-conf.yaml
文件中设置。
步骤如下:
注意:如果集群环境不同(例如 Standalone、YARN、K8S 等),其中一些步骤可能会不同。所以接下来分别介绍不同部署下的流程。
注意:
- 由于还没有提交作业,所以只有 JobManager中JobMaster没有运行
- TaskManager没有启动,后面是根据需要动态地启动
- 因为yarn集群是动态分配TaskManager,只要还有TaskManager能够启动且资源足够就能执行job
- 因为yarn集群是动态回收TaskManager,因此,不会出现任务槽空闲的情况
注意:可见,单作业模式与会话模式只是JobManager 的启动时间点不一样,会话模式是预先启动,应用模式则是在作业提交时启动。
应用模式与单作业模式的提交流程一样,只是初始提交给 YARN ResourceManager的不再是具体的作业,而是整个应用。所以 程序–>DataFlowGraph–>JobGraph发生在jobMaster中而不是客户端中。
注意: 一个应用中可能包含了多个job,这些job都将在 Flink 集群中启动各自对应的 JobMaster。
在Standalone集群下,只有会话模式和应用模式两种部署方式。
- 因为预先固定TaskManager个数,不足就会失败
- 因为预先固定TaskManager个数,不可动态分配也不可回收,所以可能出现任务槽空闲的情况
Flink 程序由算子组成,而算子分为3类:
注意:
- 每一条数据流(dataflow)以一个或多个source算子开始,经过一个或多个transformation算子转换,最后以一个或多个sink算子结束
- 可以根据返回值类型来判断是否是算子:若返回值类型为
SingleOutputStreamOperator
或其子类
。- 问:dataflow 中的算子和程序中基于 DataStream API 的方法是一一对应的关系吗?
答:大多数情况下一一对应的关系,但是也存在多个方法对应一个算子的情况,比如wordCount程序中的.keyBy()
返回的数据类型是KeyedStream
,而算子返回的数据类型是SingleOutputStreamOperator
,也就是说.keyBy().sum()
一起组成了一个算子。
在IDEA中设置并行度
为每个算子单独设置并行度:算子.setParallelism(并行度)
设置全局并行度(每个算子都是这个):执行环境..setParallelism(并行度)
提交应用时设置:只能设置全局并行度。【如果要设置全局并行度,一般用此方法,因为在代码中数字就写死了】
语法:flink run –p 并行度 –c 入口类全类名 jar包路径
eg:bin/flink run –p 2 –c com.atguigu.wc.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
配置文件中设置:只能设置全局并行度。在flink/conf/flink-conf.yaml文件中直接更改默认并行度
parallelism.default: 2
注意:这三种设置并行度的方式,如果组合设置,则从1, 2, 3优先级逐步降低。
算子在关系上分为两类:
一对一(One-to-one)算子:有map、filter、flatMap 等算子。这种算子如果相连,则最好将他们的并行度设置为相同,这样Flink在底层会做合并算子链的优化。
- 合并算子链:在 Flink 中,前后链接的并行度相同的一对一(one to one)算子操作,对应的子任务直接合并为一个“大”的子任务(subtask)
- 为什么合并?可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量
- Flink底层自动合并算子链,如果禁止合并算子链?
- 方式一:
前面的算子.disableChaining()
,比如env.readTextFile().disableChaining()
- 方式二:
后面的算子.startNewChain()
,比如env.readTextFile().map()..startNewChain()
重分区(Redistributing)算子:几个方法组成的算子会重写分区。比如.keyBy().sum()
算子,keyBy()是分组操作会基于标签重新分区。
- 重分区会怎样?涉及到subtask的交互,前一个算子的task会与当前重分区算子的所有tsak进行交互,而一对一算子不会。
数据流图(StreamGraph):在运行时,Flink 程序按照 算子逻辑 生成一个有向无环图DAG),这个图就是数据流图(StreamGraph)
- 提示:提交作业之后,打开 Flink 自带的 Web UI,点击作业就能看到对应的 dataflow。比如:
作业图(JobGraph):StreamGraph 经过优化后生成的就是JobGraph。主要的优化为: 合并算子链
执行图(ExecutionGraph):在JobGraph的基础上结合各算子的并行度生成的就是ExecutionGraph 【ExecutionGraph与 JobGraph 最大的区别就是按照并行度将每个算子的任务拆分成了多个并行的子任务,并明确了任务间数据传输的方式。】
物理图(Physical Graph):TaskManager收到ExecutionGraph后将其转化为可执行的物理图(二进制代码看不懂)
例子:
一个算子任务被拆分成了多个并行的“子任务”(即多个线程执行)。而子任务个数就是该算子的并行度。
什么是任务槽?
注意:建议将任务槽数目设置为主机的CPU核数,因为这样能避免对CPU资源的竞争
如何设置任务槽数值?
taskmanager.numberOfTaskSlots: 8
使得任务槽间就负载均衡,提高计算效率。
比如:
- 现有一执行图如下:
- 假设是相同任务的子任务放在同一个任务槽中,则会造成任务槽间负载不均衡:
- 假设是不同任务的子任务放在同一个任务槽中,这样任务槽间就负载均衡:
每一个任务槽都是完整的作业管道,这样一来,即使某个 TaskManage出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行。
比如:
.map(word -> Tuple2.of(word, 1L)).slotSharingGroup(“1”); // 共享组数值相同的算子任务间可以共享
如果job的并行度,即max{所有算子的并行度},<= 所有TaskManager的可用任务槽总数,则该job可以执行;否则,不可以执行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。