赞
踩
Flink的官网主页地址:https://flink.apache.org/
Flink核心目标,是“数据流上的有状态计算”(Stateful Computations over Data Streams)。
具体说明:Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
有界流和无界流
无界数据流:
有定义流的开始,但没有定义流的结束;
它们会无休止的产生数据;
无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的。
有界数据流:
有状态流处理
把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态。这就是所谓的“有状态的流处理”。
Flink的发展历史
Flink起源于一个叫作Stratosphere的项目,它是由3所地处柏林的大学和欧洲其他一些大学在2010~2014年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl)领衔开发。2014年4月,Stratosphere的代码被复制并捐赠给了Apache软件基金会,Flink就是在此基础上被重新设计出来的。
在德语中,“flink”一词表示“快速、灵巧”。项目的logo是一只彩色的松鼠。
我们处理数据的目标是:低延迟、高吞吐、结果的准确性和良好的容错性。
Flink主要特点如下:
Spark以批处理为根本。
Flink以流处理为根本。
表 Flink 和 Streaming对比
Flink | Streaming | |
---|---|---|
计算模型 | 流计算 | 微批处理 |
时间语义 | 事件时间、处理时间 | 处理时间 |
窗口 | 多、灵活 | 少、不灵活(窗口必须是批次的整数倍) |
状态 | 有 | 没有 |
流式SQL | 有 | 没有 |
Flink在国内各个企业中大量使用。一些行业中的典型应用有:
1)电商和市场营销
举例:实时数据报表、广告投放、实时推荐
2)物联网(IOT)
举例:传感器实时数据采集和显示、实时报警,交通运输业
3)物流配送和服务业
举例:订单状态实时更新、通知信息推送
4)银行和金融业
举例:实时结算和通知推送,实时检测异常行为
有状态流处理:通过底层API(处理函数),对最原始数据加工处理。底层API与DataStream API相集成,可以处理复杂的计算。
DataStream API(流处理)和DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括map、flatmap等),连接(joins),聚合(aggregations),窗口(windows)操作等。注意:Flink1.12以后,DataStream API已经实现真正的流批一体,所以DataSet API已经过时。
Table API 是以表为中心的声明式编程,其中表可能会动态变化。Table API遵循关系模型:表有二维数据结构,类似于关系数据库中的表;同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。我们可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
SQL这一层在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。
Flink提交作业和执行任务,需要几个关键组件:
注意:Flink是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成。所以接下来我们会先做一个简单的介绍,让大家有一个初步的认识,之后再展开讲述不同情形下的Flink部署。
bin/start-cluster.sh
在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:
jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。
taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
访问WebUI http://hadoop102:8081
环境准备
nc -lk 7777
作业提交
bin/flink run -m hadoop102:8081 -c com.gavin.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。
它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行——客户端(Client)还是JobManager。
会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。
会话模式比较适合于单个规模小、执行时间短的大量作业。
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。
作业完成后,集群就会关闭,所有资源也会释放。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。
需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如YARN、Kubernetes(K8S)。
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
所以解决办法就是,我们不要客户端了,直接把应用提交到JobManger上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager也就关闭了,这就是所谓的应用模式。
应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的。
实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。
独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。
提前启动集群,并通过Web页面客户端提交任务(可以多个任务,但是集群资源固定)。
Flink的Standalone集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台。
应用模式下不会提前创建集群,所以不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager。
具体步骤如下:
- 执行以下命令启动 netcat
nc -lk 7777
- 1
将应用程序的jar包放到lib/目录下
启动 JobManager
bin/standalone-job.sh start --job-classname com.gavin.wc.SocketStreamWordCount
- 1
- 启动 TaskManager
bin/taskmanager.sh start
- 1
- 模拟发送数据
- 在 WebUI 查看输出数据
YARN上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。
在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。
具体配置步骤如下:
- 配置环境变量
sudo vim /etc/profile.d/my_env.sh HADOOP_HOME=/opt/module/hadoop-3.3.4 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin # 增加如下配置 export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATH=`hadoop classpath`
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 启动 hadoop 集群
start-dfs.sh start-yarn.sh
- 1
- 2
- 启动 netcat
nc -lk 7777
- 1
YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。具体步骤如下:
1)启动集群
(1)启动Hadoop集群(HDFS、YARN)。
(2)执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。
bin/yarn-session.sh -nm test
- 1
可用参数解读:
-d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
-jm(–jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(–name):配置在YARN UI界面上显示的任务名。
-qu(–queue):指定YARN队列名。
-tm(–taskManager):配置每个TaskManager所使用内存。
注意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的。
YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID,用户可以通过Web UI或者命令行两种方式提交作业。
2)提交作业
(1)通过Web UI提交作业
这种方式比较简单,与上文所述Standalone部署模式基本相同。
(2)通过命令行提交作业
① 将FlinkTutorial-1.0-SNAPSHOT.jar任务上传至集群。
② 执行以下命令将该任务提交到已经开启的Yarn-Session中运行。
bin/flink run -c com.gavin.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
- 1
客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到。
③ 任务提交成功后,可在YARN的Web UI界面查看运行情况。hadoop103:8088。
从上图中可以看到我们创建的Yarn-Session实际上是一个Yarn的Application,并且有唯一的Application ID。
④也可以通过Flink的Web UI页面查看提交任务的运行情况。
在YARN环境中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群。
(1)执行命令提交作业。
bin/flink run -d -t yarn-per-job -c com.gavin.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
- 1
注意:如果启动过程中报如下异常。
Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’. at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
- 1
- 2
解决办法:在flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中设置
vim flink-conf.yaml classloader.check-leaked-classloader: false
- 1
- 2
- 3
(2)在YARN的ResourceManager界面查看执行情况。
点击可以打开Flink Web UI页面进行监控。
(3)可以使用命令行查看或取消作业,命令如下。
bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
- 1
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
- 1
这里的application_XXXX_YY是当前应用的ID,是作业的ID。注意如果取消作业,整个Flink集群也会停掉。
应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。
1)命令行提交
(1)执行命令提交作业。
bin/flink run-application -t yarn-application -c com.gavin.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
- 1
(2)在命令行中查看或取消作业。
bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
- 1
bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
- 1
2)上传HDFS 提交
可以通过yarn.provided.lib.dirs配置选项指定位置,将flink的依赖上传到远程。
(1)上传flink的lib和plugins到HDFS上
hadoop fs -mkdir /flink-dist hadoop fs -put lib/ /flink-dist hadoop fs -put plugins/ /flink-dist
- 1
- 2
- 3
(2)上传自己的jar包到HDFS
hadoop fs -mkdir /flink-jars hadoop fs -put FlinkTutorial-1.0-SNAPSHOT.jar /flink-jars
- 1
- 2
(3)提交作业
bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flink-dist" -c com.gavin.wc.SocketStreamWordCount hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar
- 1
这种方式下,flink本身的依赖和用户jar可以预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。
运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。
Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。
此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。
1)创建存储目录
hadoop fs -mkdir -p /logs/flink-job
- 1
2)在 flink-config.yaml 中添加如下配置
jobmanager.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job historyserver.web.address: hadoop102 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job historyserver.archive.fs.refresh-interval: 5000
- 1
- 2
- 3
- 4
- 5
3)启动历史服务器
bin/historyserver.sh start
- 1
4)停止历史服务器
bin/historyserver.sh stop
- 1
5)在 WebUI 查看已经停止的 job 的统计信息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。