赞
踩
注:本文主要介绍了Flink流式处理框架的定义内容、架构及作业提交流程等。
创作不易,希望大家一键三连支持!!!♥♥♥
创作不易,希望大家一键三连支持!!!♥♥♥
创作不易,希望大家一键三连支持!!!♥♥♥
Apache Flink是一个框架
和分布式处理引擎
,用于对无界
和有界
数据流进行有状态计算
。
有界流与无界流的区别:
(1)有界流
有定义流的开始
,也有定义流的结束
;无界流
只定义
了流的开始
,没有定义流的结束
。
(2)有界流
可以在获取所有数据后再计算
,无界流
必须在获取数据后立刻处理计算
。
(3)有界流
也叫做批处理
,无界流
也叫做流处理
。
有状态流处理:把流处理
需要的额外数据
保存成一个“状态”
,然后针对这条数据进行处理,并且更新状态
。
(1) 高吞吐和低延迟。每秒
处理数百万个事件
,毫秒级延迟
。
(2)结果的准确性。Flink提供了事件时间(event-time)
和处理时间(processing-time)
语义。对于乱序事件流
,事件时间语义
仍然能提供一致且准确
的结果。
(3) 精确一次(exactly-once)的状态一致性保证。
(4)可以连接到最常用的外部系统,如Kafka、Hive、JDBC、HDFS、Redis等。
(5) 高可用。本身高可用的设置,加上与K8s,YARN和Mesos的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力
,Flink能做到以极少的停机时间7×24全天候运行
。
从数据模型上:Flink
采用数据流
以及事件(Event)序列
;而Spark
采用RDD模型
,Spark Streaming的DStream
就是一组组小批数据RDD的集合
。
从运行架构上:Flink
是标准的流执行
模式,一个事件
在一个节点处理完后
可以直接发往下一个节点进行处理
;而Spark
是批计算
,将 DAG
划分为不同的 stage
,一个完成后才可以计算下一个
。
有状态流处理:通过底层API(处理函数),对最原始数据加工处理。底层API与DataStream API相集成,可以处理复杂的计算。
DataStream API(流处理)和DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换
(transformations,包括map、flatmap等),连接
(joins),聚合
(aggregations),窗口
(windows)操作等。注:Flink1.12以后,DataStream API已经实现真正的流批一体,所以DataSet API已经过时。
Table API 是以表为中心的声明式编程,其中表可能会动态变化。Table API
遵循关系模型
:表
有二维数据结构
,类似于关系数据库中的表;同时API
提供可比较的操作
,例如select、project、join、group-by、aggregate
等。我们可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
SQL这一层在语法与表达能力上与 Table API类似,但是是以SQL查询表达式的形式表现
程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。
Flink 为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。
区别主要在于:(1)集群的生命周期
以及资源的分配方式
;
(2)应用的 main 方法
到底在哪里执行
—客户端
还是JobManager
。
需要先启动一个集群,保持一个会话
,在这个会话中通过客户端提交作业
。集群启动时
所有资源
就都已经确定
,所有提交的作业
会竞争集群中的资源
。
其适合于单个规模小、执行时间短的大量作业。
会话模式因为资源共享会导致很多问题,为了更好地隔离资源
,我们可以考虑为每个提交的作业启动一个集群
,这就是所谓的单作业(Per-Job)模式。
作业完成
后,集群就会关闭
,所有资源也会释放
。
这些特性使得单作业模式
在生产环境运行更加稳定
,所以是实际应用的首选模式。
需要注意的是,Flink本身无法直接这样运行,所以单作业模式
一般需要借助一些资源管理框架
来启动集群,比如YARN、Kubernetes(K8S)。
之前
提到的两种模式下
,应用代码都是在客户端上执行
,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
所以解决办法就是直接把应用提交到JobManger上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在
,
执行结束
之后JobManager
也就关闭
了,这就是所谓的应用模式。
应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的。
独立模式
是独立运行
的,不依赖任何外部的资源管理平台
;当然独立也是有代价的:若资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下
。
客户端把 Flink 应用提交给 Yarn 的 ResourceManager
,Yarn 的ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink
会根据运行在 JobManger 上的作业
所需要的 Slot 数量动态分配 TaskManager 资源
。
容器化部署是如今业界流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对应用进行管理和运维,基本原理与 YARN 是类似的。
我们以Yarn运行模式
为例,首先介绍flink架构下
主要核心组件
1)作业管理器(JobManager)
JobManager
是一个 Flink集群
中任务管理和调度的核心
,是控制应用执行的主进程
。也就是说,每个应用
都应该被唯一的 JobManager
所控制执行
。
JobManger 又包含 3 个不同的组件:
(1)JobMaster
负责处理单独的作业(Job)
。所以 JobMaster
和具体的Job
是一一对应
的,多个 Job
可以同时运行在一个 Flink 集群
中, 每个 Job
都有一个自己的 JobMaster
。
在作业提交时,JobMaster 会先接收到要执行的应用。JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),它包含了所有可以并发执行的任务。JobMaster 会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager上。
而在运行过程中
,JobMaster
会负责
所有需要中央协调的操作
,比如说检查点 (checkpoints)的协调
。
(2)资源管理器(ResourceManager)
ResourceManager
主要负责资源的分配和管理
,在 Flink 集群中只有一个
。所谓“资源”,主要是指 TaskManager 的任务槽(task slots)
。任务槽
就是 Flink 集群中的资源调配单元
,包含了机器用来执行计算的一组 CPU 和内存资源
。每一个任务(Task)都需要分配到一个 slot 上执行。
注:应把 Flink 内置的 ResourceManager 和其他资源管理平台(比如 YARN)的
ResourceManager 区分开!!!
(3)分发器(Dispatcher)
Dispatcher
主要负责提供一个 REST 接口
,用来提交应用
,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件
。Dispatcher 也会启动
一个Web UI
,用来方便地展示和监控作业执行的信息
。Dispatcher 在架构中并不是必需的
,在不同的部署模式下可能会被忽略掉。
2)任务管理器(TaskManager)
TaskManager
是 Flink
中的工作进程
,数据流的具体计算
就是它来做的。Flink 集群
中必须至少有一个 TaskManager
;每一个 TaskManager
都包含
了一定数量的任务槽(task slots)
。slot
是资源调度的最小单位
,slot 的数量限制了 TaskManager 能够并行处理的任务数量。
启动之后,TaskManager 会向资源管理器注册它的 slots;收到资源管理器的指令后,TaskManager就会将一个或者多个槽位提供给 JobMaster调用,JobMaster就可以分配任务来执行了。
在执行过程中,TaskManager
可以缓冲数据
,还可以跟其他运行同一应用的 TaskManager交换数据
。
而后,整个yarn模式下的作业提交流程如下:
(1)Job任务提交至Yarn的ResourceManager(RM)
(2)RM选择一个NodeManager(NM)并开启一个Container,容器内开启一个Application Master(AM,相当于flink的JobManager)
(3)启动Dispatcher(分发器),启动flink内部的资源管理器RM,然后Dispatcher启动JobMaster,在JobMaster中完成解析参数->生成Streamgraph->生成Jobgraph->生成Executiongraph的流程
(4)JobMaster向flink内部的RM申请slot(槽),RM向Yarn的RM申请资源,之后Yarn开启多个NM,并在NM上启动TaskManager™,启动后TMX主动通过Actor通信系统向flink的RM注册slot
(5)flink的RM与TM通信,告知其分配slot,然后TM向JobManager提供slot,JobMaster收到slot后会根据之前生成的执行流图将任务进行分配分发,分发完毕后最终生成物理流图
1)并行子任务和并行度
当要处理的数据量非常大时,我们可以把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务
就被拆分
成了多个并行的“子任务”(subtasks)
,再将它们分发到不同节点
,就真正实现了并行计算
。
在 Flink 执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator
subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。
这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度
,可以认为就是其所有算子中最大的并行度
。一个程序中,不同的算子
可能具有不同的并行度
。
如上图所示,当前数据流中有 source、map、window、sink四个算子
,其中sink算子
的并行度为 1
,其他算子
的并行度都为 2
。所以这段流处理程序的并行度就是2
。
2)并行度的设置
Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。
(1)代码中设置
在算子后
跟着调用 setParallelism()方法
,来设置当前算子的并行度
:
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
这种方式设置的并行度,只针对当前算子有效。
另外,我们也可以直接调用执行环境的 setParallelism()方法
,全局设定并行度
:
env.setParallelism(2);
我们一般不会
在程序中设置全局并行度
,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
这里要注意的是,由于 keyBy 不是算子,所以无法对 keyBy 设置并行度。
(2)提交应用时设置
在使用 flink run 命令
提交应用时,可以增加-p 参数
来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:
bin/flink run –p 2 –c com.root.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
注:此处通过-p参数设置了全局并行度为2
(3)配置文件中设置
还可以直接在集群
的配置文件 flink-conf.yaml
中直接更改默认并行度:
parallelism.default: 2
设置对于整个集群上提交的所有作业有效,初始值为 1。无论在代码中设置、还是
提交时的-p 参数,都不是必须的;所以在没有指定并行度的时候
,就会采用配置文件中的集群默认并行度。在开发环境中
,没有配置文件,默认并行度
就是当前机器的CPU 核心数`
注意以上几类调整并行度的方式中,存在有优先等级的排序如下:
代码:特定算子>代码:env全局>提交时指定>配置conf文件
1)算子间的数据传输
一个数据流
在算子之间传输数据
的形式可以是一对一(one-to-one)
的直通(forwarding)模式
,也可以是打乱的重分区(redistributing)模式
,具体是哪一种形式,取决于算子的种类
。
(1)一对一(One-to-one,forwarding)
这种模式下,数据流维护着分区以及元素的顺序.。比如图中的source和map算子,source算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序
。这就意味着 map 算子的子任务,看到的元素个数和顺序跟 source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap 等算子都是这种 one-to-one 的对应关系。
这种关系类似于 Spark 中的窄依赖。
(2)重分区(Redistributing)
在这种模式下,数据流的分区会发生改变。比如图中的 map和后面的 keyBy/window算子之间
,以及 keyBy/window 算子和 Sink 算子之间
,都是这样的关系。
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程,这一过程类似于 Spark 中的 shuffle。
2)合并算子链
Flink中,并行度相同的一对一(one to one)算子操作
,可以直接链接在一起形成一个“大”的任务(task)
,这样原来的算子就成为了真正任务里的一部分,如下图所示。每个task 会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)
上图中 Source 和 map 之间满足了算子链的要求
,所以可以直接合并在一起
,形成了一个任务;因为并行度为 2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表示的作业最终会有 5 个任务,由 5 个线程并行执行。
将算子链接成 task 是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。
Flink 默认
会按照算子链的原则进行链接合并
,如果我们想要禁止合并或者自行定义
,也可以在代码中对算子做一些特定的设置
:
// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()
1)任务槽(Task Slots)
Flink
中每一个 TaskManager
都是一个 JVM进程
,它可以启动多个独立的线程
,来并行执行多个子任务(subtask)
。
很显然,TaskManager 的计算资源是有限的,并行的任务越多,每个线程的资源就会越少。那一个 TaskManager 到底能并行处理多少个任务呢?为了控制并发量,我们需要在TaskManager 上对每个任务运行所占用的资源做出明确的划分,这就是所谓的任务槽(task slots)。
每个任务槽(task slot)
其实表示了 TaskManager 拥有计算资源的一个固定大小的子集
。这些资源就是用来独立执行一个子任务的。
假如一个TaskManager有三个slot,那么它会将管理的内存平均分成三份
,每个slot独自占据一份。这样一来,我们在slot上执行一个子任务时,相当于划定了一块内存“专款专用”,就不需要跟来自其他作业的任务去竞争内存资源了。
所以现在我们只要2个TaskManager,就可以并行处理分配好的5个任务了。
2)任务槽数量的设置
在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件
中,可以设置TaskManager的 slot 数量,默认是 1 个 slot
。
taskmanager.numberOfTaskSlots: 9
slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot数量配置为机器的 CPU核心数,尽量避免不同任务之间对 CPU的竞争
。这也是开发环境默认并行度设为机器 CPU 数量的原因
3)任务对任务槽的共享
默认情况下,Flink 是允许子任务共享 slot
。如果我们保持 sink 任务并行度为 1 不变,而作业提交时设置全局并行度为 6,那么前两个任务节点就会各自有 6个并行子任务,整个流处理程序则有 13 个子任务。如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个 slot 上执行。所以对于第一个任务节点 source→map,它的 6 个并行子任务必须分到不同的 slot 上,而第二个任务节点 keyBy/window/apply 的并行子任务却可以和第一个任务节点共享 slot。
Flink 默认是允许 slot 共享的,如果希望某个算子对应的任务完全独占一个 slot
,
或者只有某一部分算子共享 slot
,我们也可以通过设置“slot 共享组”手动指定:
.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1");
这样,只有属于同一个 slot 共享组的子任务,才会开启 slot 共享;不同组之间的任务
完全隔离的,必须分配到不同的 slot 上。在这种场景下,总共需要的 slot 数量,就是各个 slot共享组最大并行度的总和。
slot 共享的优势:
(1)将资源密集型和非密集型的任务同时放到一个 slot 中,它们就可以自行分配对资
源占用的比例,从而保证最重的活平均分配给所有的 TaskManager。
(2)允许我们保存完整的作业管道。这样一来,即使某个TaskManager 出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行.slot的特点:
(1)均分隔离内存,不隔离CPU
(2)可以共享:同一Job的不同算子的子任务才可以共享;同一个slot同时在运行的前提是:属于同一个slot共享组(默认是default组)
任务槽
是静态
的概念,是 指 TaskManager 具有的并发执行能力
,可以通过参数taskmanager.numberOfTaskSlots
进行配置;
并行度
是动态
概念,也就是 TaskManager 运行程序时实际使用的并发能力
,可以通过参数 parallelism.default
进行配置。
举例:假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有 9 个 task slot,表示集群最多能并行执行 9 个同一算子的子任务。
而我们定义 word count 程序的处理操作是四个转换算子:source→ flatmap→ reduce→ sink
当所有算子并行度相同时,容易看出 source 和 flatmap 可以合并算子链,于是最终有三个任务节点。
通过设置flink的conf文件,调整每个TM容纳的最大slot数量:
之后对比不同并行度下的任务槽数量关系:
通过这个例子也可以明确地看到,整个流处理程序的并行度
,就应该是所有算子并行度中最大的那个
,这代表了运行程序需要的 slot 数量
,当并行度为9,子任务数为27时,这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。