1.概念
- 用Java和Scala编写的流处理框架和分布式处理引擎
- 对于无界和有界数据流进行有状态计算(无界,实时、有界,离线 批处理数据)
- 在所有常见集群环境运行,以内存速度和任何规模执行计算
- 达到实时流处理引擎全部标准要求(低延迟、高吞吐量、容错性、窗口时间语义化)
- 实时和批处理数据过程,抽象成三个过程 Source->Transform->Sink
2.架构设计
物理部署层-deploy层
- 支持多种部署模式:本地部署、集群部署(Standalone/Yarn/Mesos)、云(GCE/EC2)以及kubernetes。
Runtime核心层
- 是Flink分布式计算框架的核心实现层,负责对上层不同接口提供基础服务。
- 支持分布式Stream作业的执行、JobGraph到ExecutionGraph的映射转换以及任务调度等。
- 将DataStream和DataSet转成统一的可执行的Task Operator,达到在流式计算引擎下同时处理批量计算和流式计算的目的。
API & Libraries层
负责更好的开发用户体验,包括易用性、开发效率、执行效率、状态管理等方面。
- Flink同时提供了支撑流计算和批处理的接口,同时在这基础上抽象出不同的应用类型的组件库,如:
- 基于流处理的CEP(Complex Event Process,复杂事件处理库)
- Table & Sql库
- 基于批处理的FlinkML(机器学习库)
- 图处理库(Gelly,凝胶-凝冻,取其意是指图是各个事务的统一整合体抽象)
API层包括两部分
- 流计算应用的DataStream API
- 批处理应用的DataSet API
统一的API,方便用于直接操作状态和时间等底层数据
提供了丰富的数据处理高级API,例如Map、FllatMap操作等,
并提供了比较低级的Process Function API
2.1.运行模式
区分点:main()方法是在客户端还是在集群上执行
运行模式分类:
- 本地运行模式-local(一机器一进程的多线程模拟分布式计算)
- standalone模式-独立Flink集群(各模式由Flink自己搞)
- 集群运行模式
- Flink Session集群(会话模式)
- 生命周期(预先存在,长期运行的集群,接收作业提交,完成后仍运行直到手动结束)
- 资源隔离(TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。)
- 长预备短执行 场景(端到端用户体验有较大好处)
- 工作模式(附加模式:客户端与Flink集群相互同步 分离模式:相互异步,客户端提交完成后即退出)
- Flink Job集群(per-job模式)
- 生命周期(为每个提交的作业启动一个集群,该集群仅可用于该作业)
- 资源隔离(仅影响Flink Job集群中运行的一个作业)
- 时计算性没有session模式强、因此更适合长期运行。(高稳定性,对启动作业不敏感的大型作业)
- Flink Application集群(应用模式)
- 生命周期(与Flink作业执行直接相关的运行模式,main()方法在集群上不在客户端)
- 提交作业是一个单步骤过程:无需启动Flink集群,逻辑和依赖打包成一个可执行的作业JAR中,由入口机客户端提交jar包和相关资源到hdfs中。因此,集群寿命与应用程序的寿命有关。
- 资源隔离(ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。)
- 该模式为yarn session和yarn per-job模式的折中选择。
- 生命周期(与Flink作业执行直接相关的运行模式,main()方法在集群上不在客户端)
- Flink Session集群(会话模式)
总结
- 本地:demo、代码测试场景
- Session:频繁任务提交、小作业居多、实时性要求高的场景。少
- Per-Job:作业少、大作业、实时性要求低的场景。
- Application:实时性要求不太高、安全性有一定要求均可以使用,普遍适用性最强。
3.运行流程图
- 核心角色
两种类型的进程组成,一个JobManager和一个或多个TaskManager。 - actor system
- 各个角色组件互相通信的消息传递系统中间件
- 所有线程(进程)通过消息传递的方式进行合作(通信),这些线程称为Actor
- 缺点:不能实现真正意义上的并行,而是通过并发实现并行。纯消息,实时性和粒度控制上略低于共享内存的方式
- 核心组件
Job Manager:①何时调度下一个task ②task完成和失败做出反应 ③协调checkpoint
- ResourceManager
给Flink集群中的资源提供回收、分配它管理的task slots(Flink集群中资源调度最小单位)。只能分配可用TaskManaer的slots不能自行启动新的TaskManager。
- Dispatch:提供REST接口,用来提交Flink应用程序执行,为每个提交作业启动新的JobMaster。运行Flink WebUI提供作业执行信息。
- JobMaster:负责管理单个JobGraph的执行,集群可以同时运行多个作业,每个作业有自己的JobMaster。始终至少有一个JobManager。
TaskManager(worker):执行作业流的task,且缓存和交换数据流。必须始终有一个TaskManager。
4.执行方式
- Flink实现离线数据DataSet版本的WordCount经典案例
- 创建Flink代码执行离线数据流上下文环境变量(ExecutionEnvironment)
- 定义本地文件系统当中文件路径
- 获取输入文件对应的DataSet对象(DataSet)
- 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计(DataSet<Tuple2<String, Integer>>)
- 打印
-
- 优点:简单、易操作,和MapReduce一样
- 缺点:仅支持local模式,将Flink依赖去掉provided,将依赖包全部打入,会很大。
- 命令格式:yarn jar jarPath Main_class_args
-
Flink建议执行方式
- 优点:支持与flink交互的所有方式,灵活强大。不用讲flin包打入,依赖包减小。
- 缺点:复杂。下载编译flink源码包,执行下载flink发布包使用
- https://flink.apache.org/zh/downloads.html。解压即可使用 tar -xzvf flink-1.13.1-bin-scala_2.11.tgz
- 注意:是hadoop环境变量生效(1.设置到linux profile中 cat /etc/profile 2.先执行export命令,再执行flink代码)
- fink的三种运行模式实践
- yarn application运行方式
- 首先,进入Flink安装路径$Flink_HOME。
- run-application:即运行作业类型,专指flink application类型的作业类型。-t:指定运行模式,此处是指用yarn方式来运行flink application作业。-c: 指定入口主类。app.jar : 后边的上传的jar包。参数:最后的入参为传给入口主类的参数
./bin/flink run-application -t yarn-application
-c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet
../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt
- 1
- 2
- 3
- yarn per-job运行方式
#设置不需要进行classloader leaked check,在配置文件路径设置./conf/flink-conf.yaml,有则修改,无则新加即可
#注意yaml参数文件修改,请在value前加上一个空格
classloader.check-leaked-classloader: false
./bin/flink run -t yarn-per-job -c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet ../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt
- 1
- 2
- 3
- 4
- yarn session运行方式
先在yarn上提前启动flink session会话任务,并得到session task任务的yarn app-id
./bin/yarn-session.sh
- 1
#第1种提交: 多加入-t yarn-session参数,此时必须指定app-id参数,即提前启动的session作业任务id
./bin/flink run -t yarn-session -Dyarn.application.id=application_1627998129686_0475 -c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet ../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt
#第2种提交: 不加入-t yarn-session参数,则不需要手动指定app-id,其是自行寻找提前启动的session作业任务id
./bin/flink run -c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet ../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt
- 1
- 2
- 3
- 4
- 分离模式-detached模式
./bin/yarn-session.sh --detached
- 1
#第1种提交: 多加入-t yarn-session参数,此时必须指定app-id参数,即提前启动的session作业任务id
./bin/flink run -t yarn-session -Dyarn.application.id=application_1627998129686_0475
-c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet ../FirstFlink-0.0.1-SNAPSHOT.jar
hdfs:///user/zel/input.txt
#第2种提交: 不加入-t yarn-session参数,则不需要手动指定app-id,其是自行寻找提前启动的session作业任务id
./bin/flink run -c com.tl.bigdata.flink.demo.FlinkWordCount4DataSet
../FirstFlink-0.0.1-SNAPSHOT.jar hdfs:///user/zel/input.txt
- 1
- 2
- 3
- 4
- 5
- 6
- 7