赞
踩
官网:https:flink.apache.org
Flink核心目标,是“数据流上的有状态计算”(Stateful Computati ons over Data Streams)具体说明:Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
无界数据流
有界数据流
有状态流处理
把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态。这就是所谓的“有状态的流处理”
我们处理数据的目标是:低延迟、高吞吐、结果的准确性和良好的容错性。Flink主要特点如下:
F****link | Streaming | |
---|---|---|
计算模型 | 流计算 | 微批处理 |
时间语义 | 事件时间、处理时间 | 处理时间 |
窗口 | 多、灵活 | 少、不灵活(窗口必须是批次的整数倍) |
状态 | 有 | 没有 |
流式SQL | 有 | 没有 |
创建maven工程,引入依赖
<properties> <java.version>1.8</java.version> <flink.version>1.17.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。首先在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt,然后进行代码编写
public class BatchWordCount { public static void main(String[] args) throws Exception { // 1. 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本) DataSource<String> lineDS = env.readTextFile("input/words.txt"); // 3. 转换数据格式 FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word,1L)); } } }); // 4. 按照 word 进行分组 UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0); // 5. 分组内聚合统计 AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1); // 6. 打印结果 sum.print(); } } // 输出 // (flink,1) // (world,1) // (hello,3) // (java,1)
需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景
public class StreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文件 DataStreamSource<String> lineStream = env.readTextFile("input/words.txt"); // 3. 转换、分组、求和,得到统计结果 SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } } }).keyBy(data -> data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } } // 3> (java,1) // 5> (hello,1) // 5> (hello,2) // 5> (hello,3) // 13> (flink,1) // 9> (world,1)
主要观察与批处理程序BatchWordCount的不同:
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据
public class SocketStreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号 DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777); // 3. 转换、分组、求和,得到统计结果 SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(data -> data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } }
在Linux环境的主机hadoop102上,执行下列命令,发送数据进行测试:nc -lk 7777
,注意:要先启动端口,后启动StreamWordCount程序,否则会报超时连接异常。
启动StreamWordCount程序,我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。
注意:Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。
节点服务器 | hadoop****102 | hadoop****103 | hadoop****104 |
---|---|---|---|
角色 | JobManager TaskManager | TaskManager | TaskManager |
# 下载安装包 wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz tar -zxvf flink-1.17.1-bin-scala_2.12.tgz -C /opt/module/ mv /opt/module/flink-1.17.1/ /opt/module/flink cd /opt/module/flink # 修改集群配置 # 进入conf路径,修改flink-conf.yaml文件,指定hadoop102节点服务器为JobManager vim conf/flink-conf.yaml # 修改如下 # ===================================== # JobManager节点地址. jobmanager.rpc.address: hadoop102 jobmanager.bind-host: 0.0.0.0 rest.address: hadoop102 rest.bind-address: 0.0.0.0 # TaskManager节点地址.需要配置为当前机器名 taskmanager.bind-host: 0.0.0.0 taskmanager.host: hadoop102 # ===================================== # 修改workers文件,指定hadoop102、hadoop103和hadoop104为TaskManager vim conf/workers # 修改如下 # ===================================== hadoop102 hadoop103 hadoop104 # ===================================== # 修改masters文件 vim masters # 修改如下 hadoop102:8081 # =====================================
在flink-conf.yaml
文件中还可以对集群中的JobManager
和TaskManager
组件进行优化配置,主要配置项如下:
jobmanager.memory.process.size
:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整taskmanager.memory.process.size
:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整taskmanager.numberOfTaskSlots
:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源parallelism.default
:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。# 分发安装目录 # 配置修改完毕后,将Flink安装目录发给另外两个节点服务器 xsync flink/ # 修改hadoop103的 taskmanager.host vim flink-conf.yaml # TaskManager节点地址.需要配置为当前机器名 taskmanager.host: hadoop103 # 修改hadoop104的 taskmanager.host # TaskManager节点地址.需要配置为当前机器名 taskmanager.host: hadoop104 # 启动集群 # 在hadoop102节点服务器上执行start-cluster.sh启动Flink集群 bin/start-cluster.sh # 查看进程情况: jpsall # StandaloneSessionClusterEntrypoint # TaskManagerRunner # 最后访问web UI # 启动成功后,同样可以访问http://hadoop102:8081对flink集群和任务进行监控管理 # 这里可以明显看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为3
# 在hadoop102中执行以下命令启动netcat
nc -lk 7777
然后进行程序打包,在我们编写的Flink入门程序的pom.xml文件中添加打包插件的配置,然后进行打包
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers combine.children="append"> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
下一步是在Web UI上提交作业,任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包;点击该JAR包,出现任务配置页面,进行相应配置:主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如下图所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行
# 命令行提交作业
bin/flink run -m hadoop102:8081 -c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
# 在浏览器中打开Web UI,http://hadoop102:8081查看应用执行情况
在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。
它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行——客户端(Client)还是JobManager。
独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下
上面讲的就是该模式,提前启动集群,并通过Web页面客户端提交任务(可以多个任务,但是集群资源固定)
Flink的Standalone集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台
应用模式下不会提前创建集群,所以不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager
# 环境准备。在hadoop102中执行以下命令启动netcat
nc -lk 7777
# 进入到Flink的安装路径下,将应用程序的jar包放到lib/目录下
mv FlinkTutorial-1.0-SNAPSHOT.jar lib/
# 执行
bin/standalone-job.sh start --job-classname com.atguigu.wc.SocketStreamWordCount
# 同样是使用bin目录下的脚本,启动TaskManager。
bin/taskmanager.sh start
# 在hadoop102上模拟发送单词数据
# 在hadoop102:8081地址中观察输出数据
# 如果希望停掉集群,同样可以使用脚本,命令如下。
bin/taskmanager.sh stop
bin/standalone-job.sh stop
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源
# 在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务 # 配置环境变量,增加环境变量配置如下: sudo vim /etc/profile.d/my_env.sh # 修改 HADOOP_HOME=/opt/module/hadoop-3.3.4 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATH=`hadoop classpath` # 启动Hadoop集群,包括HDFS和YARN,102 dfs,103 yarn start-dfs.sh start-yarn.sh # hadoop102中执行以下命令启动netcat nc -lk 7777
YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群
# 启动Hadoop集群(HDFS、YARN) # 执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群 bin/yarn-session.sh -nm test # -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行 # -jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB # -nm(--name):配置在YARN UI界面上显示的任务名 # -qu(--queue):指定YARN队列名 # -tm(--taskManager):配置每个TaskManager所使用内存 # 注意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量, # YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的 # YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID,如下所示,用户可以通过Web UI或者命令行两种方式提交作业 # =================提交作业======== # 通过Web UI提交作业,和standalone相同 # 过命令行提交作业 # 将FlinkTutorial-1.0-SNAPSHOT.jar任务上传至集群 # 执行以下命令将该任务提交到已经开启的Yarn-Session中运行 # 客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到 bin/flink run -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar # 任务提交成功后,可在YARN的Web UI界面查看运行情况。hadoop103:8088 # yarn任务优雅停止,也可以yarn可视化面板直接kill echo "stop" | ./bin/yarn-session.sh -id application_1680702304497_0003
在YARN环境中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群
bin/flink run -d -t yarn-per-job -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
# 注意:如果启动过程中报如下异常(本身bug可忽略)
# Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader....
# 解决办法:在flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中设置
vim flink-conf.yaml
classloader.check-leaked-classloader: false
# 在命令行中查看或取消作业
# 这里的application_XXXX_YY是当前应用的ID,<jobId>是作业的ID。注意如果取消作业,整个Flink集群也会停掉
bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
应用模式非常简单,与单作业模式类似,直接执行flink run-application命令即可
# 执行命令提交作业 bin/flink run-application -t yarn-application -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar # 在命令行中查看或取消作业 bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId> # =========上传HDFS提交========= # 可以通过yarn.provided.lib.dirs配置选项指定位置,将flink的依赖上传到远程 # 上传flink的lib和plugins到HDFS上 hadoop fs -mkdir /flink-dist hadoop fs -put lib/ /flink-dist hadoop fs -put plugins/ /flink-dist # 上传自己的jar包到HDFS hadoop fs -mkdir /flink-jars hadoop fs -put FlinkTutorial-1.0-SNAPSHOT.jar /flink-jars # 提交作业 bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flink-dist" -c com.atguigu.wc.SocketStreamWordCount hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar # flink本身的依赖和用户jar可以预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量
容器化部署是如今业界流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。基本原理与YARN是类似的,具体配置可以参见官网说明
运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。
Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。
此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。
# 创建存储目录
hadoop fs -mkdir -p /logs/flink-job
# 在 flink-conf.yaml中添加如下配置
jobmanager.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job
historyserver.web.address: hadoop102
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
# 启动历史服务器
bin/historyserver.sh start
# 停止历史服务器
bin/historyserver.sh stop
# 在浏览器地址栏输入:http://hadoop102:8082 查看已经停止的 job 的统计信息
JobManager是一个Flink集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说,每个应用都应该被唯一的JobManager所控制执行。JobManger又包含3个不同的组件
JobMaster是JobManager中最核心的组件,负责处理单独的作业(Job)。所以JobMaster和具体的Job是一一对应的,多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster。需要注意在早期版本的Flink中,没有JobMaster的概念;而JobManager的概念范围较小,实际指的就是现在所说的JobMaster。
在作业提交时,JobMaster会先接收到要执行的应用。JobMaster会把JobGraph转换成一个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),它包含了所有可以并发执行的任务。JobMaster会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobMaster会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
ResourceManager主要负责资源的分配和管理,在Flink 集群中只有一个。所谓“资源”,主要是指TaskManager的任务槽(task slots)。任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。
这里注意要把Flink内置的ResourceManager和其他资源管理平台(比如YARN)的ResourceManager区分开。
Dispatcher主要负责提供一个REST接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的JobMaster 组件。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中并不是必需的,在不同的部署模式下可能会被忽略掉。
TaskManager是Flink中的工作进程,数据流的具体计算就是它来做的。Flink集群中必须至少有一个TaskManager;每一个TaskManager都包含了一定数量的任务槽(task slots)。Slot是资源调度的最小单位,slot的数量限制了TaskManager能够并行处理的任务数量。
启动之后,TaskManager会向资源管理器注册它的slots;收到资源管理器的指令后,TaskManager就会将一个或者多个槽位提供给JobMaster调用,JobMaster就可以分配任务来执行了。在执行过程中,TaskManager可以缓冲数据,还可以跟其他运行同一应用的TaskManager交换数据。
当要处理的数据量非常大时,我们可以把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。在Flink执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
例如:如上图所示,当前数据流中有source、map、window、sink四个算子,其中sink算子的并行度为1,其他算子的并行度都为2。所以这段流处理程序的并行度就是2。
# 设置并行度 # ===================代码中设置============= # 我们在代码中,可以很简单地在算子后跟着调用setParallelism()方法,来设置当前算子的并行度 stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2); # 这种方式设置的并行度,只针对当前算子有效。 # 直接调用执行环境的setParallelism()方法,全局设定并行度 # 如果在程序中对全局并行度进行硬编码,会导致无法动态扩容 env.setParallelism(2); # 注意,由于keyBy不是算子,所以无法对keyBy设置并行度 # =================提交应用时设置============= # 在使用flink run命令提交应用时,可以增加-p参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置 bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar # 如果我们直接在Web UI上提交作业,也可以在对应输入框中直接添加并行度 # =================配置文件中设置============== # 我们还可以直接在集群的配置文件flink-conf.yaml中直接更改默认并行度 parallelism.default: 2 # 这个设置对于整个集群上提交的所有作业有效,初始值为1。无论在代码中设置、还是提交时的-p参数,都不是必须的; # 所以在没有指定并行度的时候,就会采用配置文件中的集群默认并行度。在开发环境中,没有配置文件,默认并行度就是当前机器的CPU核心数
一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通(forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。
这种模式下,数据流维护着分区以及元素的顺序。比如图中的source和map算子,source算子读取数据之后,可以直接发送给map算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着map 算子的子任务,看到的元素个数和顺序跟source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。
在这种模式下,数据流的分区会发生改变。比如图中的map和后面的keyBy/window算子之间,以及keyBy/window算子和Sink算子之间,都是这样的关系。每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程,这一过程类似于Spark中的shuffle。
在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分。将算子链接成task是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。Flink默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置
// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()
Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)。很显然,TaskManager的计算资源是有限的,并行的任务越多,每个线程的资源就会越少。那一个TaskManager到底能并行处理多少个任务呢?为了控制并发量,我们需要在TaskManager上对每个任务运行所占用的资源做出明确的划分,这就是所谓的任务槽(task slots)。每个任务槽(task slot)其实表示了TaskManager拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。
在Flink的/opt/module/flink/conf/flink-conf.yaml
配置文件中,可以设置TaskManager的slot数量,默认是1个slot
taskmanager.numberOfTaskSlots: 8
# 需要注意的是,slot目前仅仅用来隔离内存,不会涉及CPU的隔离。
# 在具体应用时,可以将slot数量配置为机器的CPU核心数,尽量避免不同任务之间对CPU的竞争。
# 这也是开发环境默认并行度设为机器CPU数量的原因。
默认情况下,Flink是允许子任务共享slot的。如果我们保持sink任务并行度为1不变,而作业提交时设置全局并行度为6,那么前两个任务节点就会各自有6个并行子任务,整个流处理程序则有13个子任务。如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行。所以对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上,而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。当我们将资源密集型和非密集型的任务同时放到一个slot中,它们就可以自行分配对资源占用的比例,从而保证最重的活平均分配给所有的TaskManager。
slot共享另一个好处就是允许我们保存完整的作业管道。这样一来,即使某个TaskManager出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行。当然,Flink默认是允许slot共享的,如果希望某个算子对应的任务完全独占一个slot,或者只有某一部分算子共享slot,我们也可以通过设置“slot共享组”手动指定:.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1");
这样,只有属于同一个slot共享组的子任务,才会开启slot共享;不同组之间的任务是完全隔离的,必须分配到不同的slot上。在这种场景下,总共需要的slot数量,就是各个slot共享组最大并行度的总和。
任务槽和并行度都跟程序的并行执行有关,但两者是完全不同的概念。简单来说任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度是动态概念,也就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default
进行配置。
逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)
这是根据用户通过 DataStream API编写的代码生成的最初的DAG图,用来表示程序的拓扑结构。这一步一般在客户端完成
StreamGraph经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。主要的优化为:将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph一般也是在客户端生成的,在作业提交时传递给JobMaster。我们提交作业之后,打开Flink自带的Web UI,点击作业就能看到对应的作业图
JobMaster收到JobGraph后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式
JobMaster生成执行图后,会将它分发给TaskManager;各个TaskManager会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是具体执行层面的图,并不是一个具体的数据结构。物理图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager就可以对传递来的数据进行处理计算了
DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换:Enviroment(获取执行环境)→Source(读取数据源)→Transformation(转换操作)→Sink(输出)
Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系
// 直接调用getExecutionEnvironment方法 // 根据当前运行的方式,自行决定该返回什么样的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // createLocalEnvironment // 这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数 StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(); // createRemoteEnvironment // 这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包 StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment .createRemoteEnvironment( "host", // JobManager主机名 1234, // JobManager进程端口号 "path/to/jarFile.jar" // 提交给JobManager的JAR包 ); // 在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制
从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream API执行模式包括:流执行模式、批执行模式和自动模式
// 流执行模式(Streaming):这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。
// 批执行模式(Batch):专门用于批处理的执行模式
// 自动模式(AutoMatic):在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
// 批执行模式的使用。主要有两种方式:
// 通过命令行配置,在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。
bin/flink run -Dexecution.runtime-mode=BATCH ...
// 通过代码配置,在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)
env.execute();
# 上面方法是同步执行的,等待任务来,当然也有异步执行executeAsync,这个方法可以在一个main函数提交多个job,不过不推荐
// 在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法
DataStream<String> stream = env.addSource(...);
// 方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口
// 从Flink1.12开始,主要使用流批统一的新Source架构
DataStreamSource<String> stream = env.fromSource(…)
最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<Integer> data = Arrays.asList(1, 22, 3);
DataStreamSource<Integer> ds = env.fromCollection(data);
stream.print();
env.execute();
}
真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。读取文件,需要添加文件连接器依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
示例
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();
env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();
env.execute();
}
论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
DataStream<String> stream = env.socketTextStream("localhost", 7777);
link官方提供了连接工具flink-connector-kafka
,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
代码如下
public class SourceKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092")
.setTopics("topic_1")
.setGroupId("atguigu")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
stream.print("Kafka");
env.execute();
}
}
Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
代码
public class DataGeneratorDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 如果有n个并行度, 最大值设为a // 将数值 均分成 n份, a/n ,比如,最大100,并行度2,每个并行度生成50个 // 其中一个是 0-49,另一个50-99 env.setParallelism(2); /** * 数据生成器Source,四个参数: * 第一个: GeneratorFunction接口,需要实现, 重写map方法, 输入类型固定是Long * 第二个: long类型, 自动生成的数字序列(从0自增)的最大值(小于),达到这个值就停止了 * 第三个: 限速策略, 比如 每秒生成几条数据 * 第四个: 返回的类型 */ DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>( new GeneratorFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "Number:" + value; } }, 100, RateLimiterStrategy.perSecond(1), Types.STRING ); env .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator") .print(); env.execute(); } }
Flink支持的数据类型
Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器
Flink支持的数据类型
对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:
(1)基本类型
所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。
(2)数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。
(3)复合数据类型
(4)辅助类型
Option、Either、List、Map等。
(5)泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。Flink对POJO类型的要求如下:
类型提示(Type Hints)
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
映射(map)
map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。
public class WaterSensor { public String id; public Long ts; public Integer vc; // 一定要提供一个 空参 的构造器 public WaterSensor() { } public WaterSensor(String id, Long ts, Integer vc) { this.id = id; this.ts = ts; this.vc = vc; } // ... // getter//setter } public class TransMap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1, 1), new WaterSensor("sensor_2", 2, 2) ); // 方式一:传入匿名类,实现MapFunction stream.map(new MapFunction<WaterSensor, String>() { @Override public String map(WaterSensor e) throws Exception { return e.id; } }).print(); // 方式二:传入MapFunction的实现类 // stream.map(new UserMap()).print(); env.execute(); } public static class UserMap implements MapFunction<WaterSensor, String> { @Override public String map(WaterSensor e) throws Exception { return e.id; } } }
过滤(filter)
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉
public class TransFilter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1, 1), new WaterSensor("sensor_1", 2, 2), new WaterSensor("sensor_2", 2, 2), new WaterSensor("sensor_3", 3, 3) ); // 方式一:传入匿名类实现FilterFunction stream.filter(new FilterFunction<WaterSensor>() { @Override public boolean filter(WaterSensor e) throws Exception { return e.id.equals("sensor_1"); } }).print(); // 方式二:传入FilterFunction实现类 // stream.filter(new UserFilter()).print(); env.execute(); } public static class UserFilter implements FilterFunction<WaterSensor> { @Override public boolean filter(WaterSensor e) throws Exception { return e.id.equals("sensor_1"); } } }
扁平映射(flatMap)
latMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理
public class TransFlatmap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1, 1), new WaterSensor("sensor_1", 2, 2), new WaterSensor("sensor_2", 2, 2), new WaterSensor("sensor_3", 3, 3) ); stream.flatMap(new MyFlatMap()).print(); env.execute(); } public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> { @Override public void flatMap(WaterSensor value, Collector<String> out) throws Exception { if (value.id.equals("sensor_1")) { out.collect(String.valueOf(value.vc)); } else if (value.id.equals("sensor_2")) { out.collect(String.valueOf(value.ts)); out.collect(String.valueOf(value.vc)); } } } }
按键分区(keyBy)
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区
在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法
public class TransKeyBy { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1, 1), new WaterSensor("sensor_1", 2, 2), new WaterSensor("sensor_2", 2, 2), new WaterSensor("sensor_3", 3, 3) ); // 方式一:使用Lambda表达式 KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id); // 方式二:使用匿名类实现KeySelector KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor e) throws Exception { return e.id; } }); env.execute(); } }
需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。
KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)
简单聚合(sum/min/max/minBy/maxBy)
有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:
// 简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称 // 对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以f0、f1、f2、…来命名的 // 如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了 public class TransAggregation { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> stream = env.fromElements( new WaterSensor("sensor_1", 1, 1), new WaterSensor("sensor_1", 2, 2), new WaterSensor("sensor_2", 2, 2), new WaterSensor("sensor_3", 3, 3) ); stream.keyBy(e -> e.id).max("vc"); // 指定字段名称 env.execute(); } }
归约聚合(reduce)
reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算,reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的
// 调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口 public interface ReduceFunction<T> extends Function, Serializable { T reduce(T value1, T value2) throws Exception; } public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) ); } } // 案例演示 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .keyBy(WaterSensor::getId) .reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { System.out.println("Demo7_Reduce.reduce"); int maxVc = Math.max(value1.getVc(), value2.getVc()); //实现max(vc)的效果 取最大值,其他字段以当前组的第一个为主 //value1.setVc(maxVc); //实现maxBy(vc)的效果 取当前最大值的所有字段 if (value1.getVc() > value2.getVc()){ value1.setVc(maxVc); return value1; }else { value2.setVc(maxVc); return value2; } } }) .print(); env.execute(); // reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上
函数类(Function Classes)
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口
富函数类(Rich Function Classes)
富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能
// 需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用 public class RichFunctionExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env .fromElements(1,2,3,4) .map(new RichMapFunction<Integer, Integer>() { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始"); } @Override public Integer map(Integer integer) throws Exception { return integer + 1; } @Override public void close() throws Exception { super.close(); System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束"); } }) .print(); env.execute(); } }
常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)
随机分区(shuffle)
最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同
public class ShuffleExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Integer> stream = env.socketTextStream("hadoop102", 7777);;
stream.shuffle().print()
env.execute();
}
}
轮询分区(Round-Robin)
轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去
stream.rebalance()
重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌
stream.rescale()
广播(broadcast)
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去
stream.broadcast()
全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力
stream.global()
自定义分区(Custom)
// 自定义分区器 public class MyPartitioner implements Partitioner<String> { @Override public int partition(String key, int numPartitions) { return Integer.parseInt(key) % numPartitions; } } // 使用自定义分区 public class PartitionCustomDemo { public static void main(String[] args) throws Exception { // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(2); DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777); DataStream<String> myDS = socketDS .partitionCustom( new MyPartitioner(), value -> value); myDS.print(); env.execute(); } }
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里
简单实现
public class SplitStreamByFilter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator<Integer> ds = env.socketTextStream("hadoop102", 7777) .map(Integer::valueOf); //将ds 分为两个流 ,一个是奇数流,一个是偶数流 //使用filter 过滤两次 SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0); SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1); ds1.print("偶数"); ds2.print("奇数"); env.execute(); } } // ,是将原始数据流stream复制三份,然后对每一份分别做筛选
使用侧输出流
public class SplitStreamByOutputTag { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class)){}; OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class)){}; //返回的都是主流 SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>() { @Override public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception { if ("s1".equals(value.getId())) { ctx.output(s1, value); } else if ("s2".equals(value.getId())) { ctx.output(s2, value); } else { //主流 out.collect(value); } } }); ds1.print("主流,非s1,s2的传感器"); SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1); SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2); s1DS.printToErr("s1"); s2DS.printToErr("s2"); env.execute(); } }
联合(Union)
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变
public class UnionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);
DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);
DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");
ds1.union(ds2,ds3.map(Integer::valueOf))
.print();
env.execute();
}
}
连接(Connect)
流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)
public class ConnectDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3); // DataStreamSource<String> source2 = env.fromElements("a", "b", "c"); SingleOutputStreamOperator<Integer> source1 = env .socketTextStream("hadoop102", 7777) .map(i -> Integer.parseInt(i)); DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 8888); /** * TODO 使用 connect 合流 * 1、一次只能连接 2条流 * 2、流的数据类型可以不一样 * 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的 */ ConnectedStreams<Integer, String> connect = source1.connect(source2); SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() { @Override public String map1(Integer value) throws Exception { return "来源于数字流:" + value.toString(); } @Override public String map2(String value) throws Exception { return "来源于字母流:" + value; } }); result.print(); env.execute(); } }
当然还有一个函数是CoProcessFunction,与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction
Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的
// link1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的
// addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用
stream.addSink(new SinkFunction(…));
// Flink1.12开始,同样重构了Sink架构
// 之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器
stream.sinkTo(…)
具体的输出连接器可以参考官网
Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
public class SinkFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每个目录中,都有 并行度个数的 文件在写入 env.setParallelism(2); // 必须开启checkpoint,否则一直都是 .inprogress env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>( new GeneratorFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "Number:" + value; } }, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1000), Types.STRING ); DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator"); // 输出到文件系统 FileSink<String> fieSink = FileSink // 输出行式存储的文件,指定路径、指定编码 .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8")) // 输出文件的一些配置: 文件名的前缀、后缀 .withOutputFileConfig( OutputFileConfig.builder() .withPartPrefix("atguigu-") .withPartSuffix(".log") .build() ) // 按照目录分桶:如下,就是每个小时一个目录 .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault())) // 文件滚动策略: 1分钟 或 1m .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(Duration.ofMinutes(1)) .withMaxPartSize(new MemorySize(1024*1024)) .build() ) .build(); dataGen.sinkTo(fieSink); env.execute(); } }
添加号连接器依赖后,输出无key的record
public class SinkKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 如果是精准一次,必须开启checkpoint(后续章节介绍) env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); SingleOutputStreamOperator<String> sensorDS = env .socketTextStream("hadoop102", 7777); /** * Kafka Sink: * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可 * 1、开启checkpoint(后续介绍) * 2、设置事务前缀 * 3、设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟 */ KafkaSink<String> kafkaSink = KafkaSink.<String>builder() // 指定 kafka 的地址和端口 .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定序列化器:指定Topic名称、具体的序列化 .setRecordSerializer( KafkaRecordSerializationSchema.<String>builder() .setTopic("ws") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) // 写到kafka的一致性级别: 精准一次、至少一次 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 如果是精准一次,必须设置 事务的前缀 .setTransactionalIdPrefix("atguigu-") // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"") .build(); sensorDS.sinkTo(kafkaSink); env.execute(); } }
自定义序列化器,实现带key的record
public class SinkKafkaWithKey { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.setRestartStrategy(RestartStrategies.noRestart()); SingleOutputStreamOperator<String> sensorDS = env .socketTextStream("hadoop102", 7777); /** * 如果要指定写入kafka的key,可以自定义序列化器: * 1、实现 一个接口,重写 序列化 方法 * 2、指定key,转成 字节数组 * 3、指定value,转成 字节数组 * 4、返回一个 ProducerRecord对象,把key、value放进去 */ KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") .setRecordSerializer( new KafkaRecordSerializationSchema<String>() { @Nullable @Override public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) { String[] datas = element.split(","); byte[] key = datas[0].getBytes(StandardCharsets.UTF_8); byte[] value = element.getBytes(StandardCharsets.UTF_8); return new ProducerRecord<>("ws", key, value); } } ) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("atguigu-") .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "") .build(); sensorDS.sinkTo(kafkaSink); env.execute(); } }
运行代码,在Linux主机启动一个消费者,查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
添加MySQL驱动和flink驱动
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.27</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>1.17-SNAPSHOT</version> </dependency> <!--官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径--> <repositories> <repository> <id>apache-snapshots</id> <name>apache snapshots</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> </repository> </repositories> <!--如果不生效,还需要修改本地maven的配置文件--> <mirror> <id>aliyunmaven</id> <mirrorOf>*,!apache-snapshots</mirrorOf> <name>阿里云公共仓库</name> <url>https://maven.aliyun.com/repository/public</url> </mirror>
建表
CREATE TABLE `ws` (
`id` varchar(100) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
编写输出到MySQL的示例代码
public class SinkMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); /** * TODO 写入mysql * 1、只能用老的sink写法: addsink * 2、JDBCSink的4个参数: * 第一个参数: 执行的sql,一般就是 insert into * 第二个参数: 预编译sql, 对占位符填充值 * 第三个参数: 执行选项 ---》 攒批、重试 * 第四个参数: 连接选项 ---》 url、用户名、密码 */ SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink( "insert into ws values(?,?,?)", new JdbcStatementBuilder<WaterSensor>() { @Override public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException { //每收到一条WaterSensor,如何去填充占位符 preparedStatement.setString(1, waterSensor.getId()); preparedStatement.setLong(2, waterSensor.getTs()); preparedStatement.setInt(3, waterSensor.getVc()); } }, JdbcExecutionOptions.builder() .withMaxRetries(3) // 重试次数 .withBatchSize(100) // 批次的大小:条数 .withBatchIntervalMs(3000) // 批次的时间 .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8") .withUsername("root") .withPassword("000000") .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间 .build() ); sensorDS.addSink(jdbcSink); env.execute(); } }
如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction<String>());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用
在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。
Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)
**注意:**Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开
按照驱动类型分
按照窗口分配数据的规则分类
按键分区(Keyed)和非按键分区(Non-Keyed)
// 相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理
stream.keyBy(...)
.window(...)
// 如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1
stream.windowAll(...)
// 注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作
代码中窗口API的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)
时间窗口
时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种
// 滚动处理时间窗口 // 窗口分配器由类TumblingProcessingTimeWindows提供,需要调用它的静态方法.of() // 一个长度为5秒的滚动窗口 stream.keyBy(...) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(...) // 另外,.of()还有一个重载方法,可以传入两个Time类型的参数:size和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量 // 滑动处理时间窗口 // 窗口分配器由类SlidingProcessingTimeWindows提供,同样需要调用它的静态方法.of() stream.keyBy(...) .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .aggregate(...) // 这里.of()方法需要传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口 // 处理时间会话窗口 // 窗口分配器由类ProcessingTimeSessionWindows提供,需要调用它的静态方法.withGap()或者.withDynamicGap() // 这里.withGap()方法需要传入一个Time类型的参数size,表示会话的超时时间,也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口 stream.keyBy(...) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...) // 另外,还可以调用withDynamicGap()方法定义session gap的动态提取逻辑 // 滚动事件时间窗口 // 窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理事件窗口完全一致 stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...) // 滑动事件时间窗口 stream.keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))) .aggregate(...) // 事件时间会话窗口 stream.keyBy(...) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)
TumblingProcessingTimeWindows
和TumblingEventTimeWindows
是Apache Flink中两种不同类型的窗口,用于基于处理时间和事件时间进行窗口操作的区别如下:
TumblingProcessingTimeWindows
是根据处理时间对数据流进行窗口划分的方式。TumblingEventTimeWindows
是根据事件时间对数据流进行窗口划分的方式。总结: TumblingProcessingTimeWindows
适用于实时数据处理,基于处理时间划分窗口;而TumblingEventTimeWindows
适用于具有事件时间特性的数据处理,基于事件时间划分窗口,并根据水位线触发窗口计算。选择哪种窗口类型取决于数据流的特点和需求
计数窗口
// 滚动计数窗口
// 滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小
// 我们定义了一个长度为10的滚动计数窗口,当窗口中元素数量达到10的时候,就会触发计算执行并关闭窗口
stream.keyBy(...)
.countWindow(10)
// 滑动计数窗口
// 们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据,每隔3个数据就统计输出一次结果
stream.keyBy(...)
.countWindow(10,3)
全局窗口
// 全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由GlobalWindows类提供
// 需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用
stream.keyBy(...)
.window(GlobalWindows.create());
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。
增量聚合函数(ReduceFunction / AggregateFunction)
// 归约函数(ReduceFunction) public class WindowReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .keyBy(r -> r.getId()) // 设置滚动事件时间窗口 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { System.out.println("调用reduce方法,之前的结果:"+value1 + ",现在来的数据:"+value2); return new WaterSensor(value1.getId(), System.currentTimeMillis(),value1.getVc()+value2.getVc()); } }) .print(); env.execute(); } } // 聚合函数(AggregateFunction) // AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器); // 而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。 // 很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便 public class WindowAggregateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); // 1. 窗口分配器 WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); SingleOutputStreamOperator<String> aggregate = sensorWS .aggregate( new AggregateFunction<WaterSensor, Integer, String>() { @Override public Integer createAccumulator() { System.out.println("创建累加器"); return 0; } @Override public Integer add(WaterSensor value, Integer accumulator) { System.out.println("调用add方法,value="+value); return accumulator + value.getVc(); } @Override public String getResult(Integer accumulator) { System.out.println("调用getResult方法"); return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { System.out.println("调用merge方法"); return null; } } ); aggregate.print(); env.execute(); } } // Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的
全窗口函数(full window functions)
在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction
// 窗口函数(WindowFunction) // 不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用 stream .keyBy(<key selector>) .window(<window assigner>) .apply(new MyWindowFunction()); // 处理窗口函数(ProcessWindowFunction) // 可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context) public class WindowProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); // 1. 窗口分配器 WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); SingleOutputStreamOperator<String> process = sensorWS .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long count = elements.spliterator().estimateSize(); long windowStartTs = context.window().getStart(); long windowEndTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS"); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ); process.print(); env.execute(); } }
增量聚合和全窗口函数的结合使用
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction
// ReduceFunction与WindowFunction结合 public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) // ReduceFunction与ProcessWindowFunction结合 public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function) // AggregateFunction与WindowFunction结合 public <ACC,V,R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction) // AggregateFunction与ProcessWindowFunction结合 public <ACC,V,R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T,ACC,V> aggFunction, ProcessWindowFunction<V,R,K,W> windowFunction) // 这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。 // 需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入 public class WindowAggregateAndProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); // 1. 窗口分配器 WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // 2. 窗口函数: /** * 增量聚合 Aggregate + 全窗口 process * 1、增量聚合函数处理数据: 来一条计算一条 * 2、窗口触发时, 增量聚合的结果(只有一条) 传递给 全窗口函数 * 3、经过全窗口函数的处理包装后,输出 * * 结合两者的优点: * 1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少 * 2、全窗口函数: 可以通过 上下文 实现灵活的功能 */ // sensorWS.reduce() //也可以传两个 SingleOutputStreamOperator<String> result = sensorWS.aggregate( new MyAgg(), new MyProcess() ); result.print(); env.execute(); } public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{ @Override public Integer createAccumulator() { System.out.println("创建累加器"); return 0; } @Override public Integer add(WaterSensor value, Integer accumulator) { System.out.println("调用add方法,value="+value); return accumulator + value.getVc(); } @Override public String getResult(Integer accumulator) { System.out.println("调用getResult方法"); return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { System.out.println("调用merge方法"); return null; } } // 全窗口函数的输入类型 = 增量聚合函数的输出类型 public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{ @Override public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } }
// 触发器(Trigger)
// 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
// 移除器(Evictor)
// 移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())
在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
水位线特征
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
**注意:**Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略
水位线生成策略
// 用来为流中的数据分配时间戳 DataStream<Event> stream = env.addSource(new ClickSource()); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>); // 说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{ // 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。 @Override TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context); // 主要负责按照既定的方式,基于时间戳生成水位线 @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }
Flink内置水位线
// 有序流中内置水位线设置 // 对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景 public class WatermarkMonoDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); // TODO 1.定义Watermark策略 WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy // 1.1 指定watermark生成:升序的watermark,没有等待时间 .<WaterSensor>forMonotonousTimestamps() // 1.2 指定 时间戳分配器,从数据中提取 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { // 返回的时间戳,要 毫秒 System.out.println("数据=" + element + ",recordTs=" + recordTimestamp); return element.getTs() * 1000L; } }); // TODO 2. 指定 watermark策略 SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy); sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) // TODO 3.使用 事件时间语义 的窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ) .print(); env.execute(); } } // 乱序流中内置水位线设置 // 由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间 // 这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳 public class WatermarkOutOfOrdernessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); // TODO 1.定义Watermark策略 WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy // 1.1 指定watermark生成:乱序的,等待3s .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 1.2 指定 时间戳分配器,从数据中提取 .withTimestampAssigner( (element, recordTimestamp) -> { // 返回的时间戳,要 毫秒 System.out.println("数据=" + element + ",recordTs=" + recordTimestamp); return element.getTs() * 1000L; }); // TODO 2. 指定 watermark策略 SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy); sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) // TODO 3.使用 事件时间语义 的窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ) .print(); env.execute(); } }
自定义水位线生成器
// 周期性水位线生成器(Periodic Generator) // 周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线 // 自定义水位线的产生 public class CustomPeriodicWatermarkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(new ClickSource()) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) .print(); env.execute(); } public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> { @Override public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element,long recordTimestamp) { return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段 } }; } @Override public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new CustomBoundedOutOfOrdernessGenerator(); } } public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> { private Long delayTime = 5000L; // 延迟时间 private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳 @Override public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) { // 每来一条数据就调用一次 maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳 } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发射水位线,默认200ms调用一次 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } } } // 我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次 // 如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms // env.getConfig().setAutoWatermarkInterval(400L); // 断点式水位线生成器(Punctuated Generator) // 断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可 // 在数据源中发送水位线 // 我们也可以在自定义的数据源中抽取事件时间,然后发送水位线 env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource" )
多并行度下以最小的那个作为当前任务的事件时钟
推迟水印推进
在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
设置窗口延迟关闭
Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭
// 允许迟到只能运用在event time上
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
使用侧流接收迟到的数据
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateWS) public class WatermarkLateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L); SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy); OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class)); SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 推迟2s关窗 .sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流 .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } } ); process.print(); // 从主流获取侧输出流,打印 process.getSideOutput(lateTag).printToErr("关窗后的迟到数据"); env.execute(); } }
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理
// where()的参数是键选择器(KeySelector),用来指定第一条流中的key;而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
// 窗口join的调用语法和我们熟悉的SQL中表的join非常相似
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
举个例子
public class WindowJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); SingleOutputStreamOperator<Tuple3<String, Integer,Integer>> ds2 = env .fromElements( Tuple3.of("a", 1,1), Tuple3.of("a", 11,1), Tuple3.of("b", 2,1), Tuple3.of("b", 12,1), Tuple3.of("c", 14,1), Tuple3.of("d", 15,1) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer,Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); // TODO window join // 1. 落在同一个时间窗口范围内才能匹配 // 2. 根据keyby的key,来进行匹配关联 // 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join DataStream<String> join = ds1.join(ds2) .where(r1 -> r1.f0) // ds1的keyby .equalTo(r2 -> r2.f0) // ds2的keyby .window(TumblingEventTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { /** * 关联上的数据,调用join方法 * @param first ds1的数据 * @param second ds2的数据 * @return * @throws Exception */ @Override public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception { return first + "<----->" + second; } }); join.print(); env.execute(); } }
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围
// 间隔联结在代码中,是基于KeyedStream的联结(join)操作
// 通用调用
stream1
.keyBy(<KeySelector>)
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});
实例演示
// 正常使用 public class IntervalJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env .fromElements( Tuple3.of("a", 1, 1), Tuple3.of("a", 11, 1), Tuple3.of("b", 2, 1), Tuple3.of("b", 12, 1), Tuple3.of("c", 14, 1), Tuple3.of("d", 15, 1) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); // TODO interval join //1. 分别做keyby,key其实就是关联条件 KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0); KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0); //2. 调用 interval join ks1.intervalJoin(ks2) .between(Time.seconds(-2), Time.seconds(2)) .process( new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { /** * 两条流的数据匹配上,才会调用这个方法 * @param left ks1的数据 * @param right ks2的数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception { // 进入这个方法,是关联上的数据 out.collect(left + "<------>" + right); } }) .print(); env.execute(); } } // 处理迟到数据 public class IntervalJoinWithLateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .socketTextStream("hadoop102", 7777) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] datas = value.split(","); return Tuple2.of(datas[0], Integer.valueOf(datas[1])); } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env .socketTextStream("hadoop102", 8888) .map(new MapFunction<String, Tuple3<String, Integer, Integer>>() { @Override public Tuple3<String, Integer, Integer> map(String value) throws Exception { String[] datas = value.split(","); return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2])); } }) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); /** * TODO Interval join * 1、只支持事件时间 * 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后 * 3、process中,只能处理 join上的数据 * 4、两条流关联后的watermark,以两条流中最小的为准 * 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理 * => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流 */ //1. 分别做keyby,key其实就是关联条件 KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0); KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0); //2. 调用 interval join OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT)); OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT)); SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2) .between(Time.seconds(-2), Time.seconds(2)) .sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据,放入侧输出流 .sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流 .process( new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { /** * 两条流的数据匹配上,才会调用这个方法 * @param left ks1的数据 * @param right ks2的数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception { // 进入这个方法,是关联上的数据 out.collect(left + "<------>" + right); } }); process.print("主流"); process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据"); process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据"); env.execute(); } }
之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。
在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)
处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。
处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑
stream.process(new MyProcessFunction())
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
...
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
...
}
抽象方法.processElement()
用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器out来定义的。
非抽象方法.onTimer()
定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。既然有.onTimer()方法做定时触发,我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说ProcessFunction其实可以实现一切功能
注意:在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作。
Flink提供了8个不同的处理函数
只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction
在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”来实现的。定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口
// 获取当前的处理时间 long currentProcessingTime(); // 获取当前的水位线(事件时间) long currentWatermark(); // 注册处理时间定时器,当处理时间超过time时触发 void registerProcessingTimeTimer(long time); // 注册事件时间定时器,当水位线超过time时触发 void registerEventTimeTimer(long time); // 删除触发时间为time的处理时间定时器 void deleteProcessingTimeTimer(long time); // 删除触发时间为time的处理时间定时器 void deleteEventTimeTimer(long time);
六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。需要注意,尽管处理函数中都可以直接访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的DataStream不支持定时器操作,只能获取当前时间。
TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。
public class KeyedProcessTimerDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId()); // TODO Process:keyed SingleOutputStreamOperator<String> process = sensorKS.process( new KeyedProcessFunction<String, WaterSensor, String>() { /** * 来一条数据调用一次 * @param value * @param ctx * @param out * @throws Exception */ @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { //获取当前数据的key String currentKey = ctx.getCurrentKey(); // TODO 1.定时器注册 TimerService timerService = ctx.timerService(); // 1、事件时间的案例 Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间 timerService.registerEventTimeTimer(5000L); System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器"); // 2、处理时间的案例 // long currentTs = timerService.currentProcessingTime(); // timerService.registerProcessingTimeTimer(currentTs + 5000L); // System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器"); // 3、获取 process的 当前watermark // long currentWatermark = timerService.currentWatermark(); // System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark); // 注册定时器: 处理时间、事件时间 // timerService.registerProcessingTimeTimer(); // timerService.registerEventTimeTimer(); // 删除定时器: 处理时间、事件时间 // timerService.deleteEventTimeTimer(); // timerService.deleteProcessingTimeTimer(); // 获取当前时间进展: 处理时间-当前系统时间, 事件时间-当前watermark // long currentTs = timerService.currentProcessingTime(); // long wm = timerService.currentWatermark(); } /** * TODO 2.时间进展到定时器注册的时间,调用该方法 * @param timestamp 当前时间进展,就是定时器被触发时的时间 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); String currentKey = ctx.getCurrentKey(); System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发"); } } ); process.print(); env.execute(); } }
进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数
窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()
stream.keyBy( t -> t.f0 )
.window( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessWindowFunction())
**案例需求:**实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题
不区分不同水位,而是将所有访问数据都收集起来,统一进行统计计算。所以可以不做keyBy,直接基于DataStream开窗,然后使用全窗口函数ProcessAllWindowFunction来进行处理
在窗口中可以用一个HashMap来保存每个水位的出现次数,只要遍历窗口中的所有数据,自然就能得到所有水位的出现次数。最后把HashMap转成一个列表ArrayList,然后进行排序、取出前两名输出就可以了
public class ProcessAllWindowTopNDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); // 最近10秒= 窗口长度, 每5秒输出 = 滑动步长 // TODO 思路一: 所有数据到一起, 用hashmap存, key=vc,value=count值 sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .process(new MyTopNPAWF()) .print(); env.execute(); } public static class MyTopNPAWF extends ProcessAllWindowFunction<WaterSensor, String, TimeWindow> { @Override public void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { // 定义一个hashmap用来存,key=vc,value=count值 Map<Integer, Integer> vcCountMap = new HashMap<>(); // 1.遍历数据, 统计 各个vc出现的次数 for (WaterSensor element : elements) { Integer vc = element.getVc(); if (vcCountMap.containsKey(vc)) { // 1.1 key存在,不是这个key的第一条数据,直接累加 vcCountMap.put(vc, vcCountMap.get(vc) + 1); } else { // 1.2 key不存在,初始化 vcCountMap.put(vc, 1); } } // 2.对 count值进行排序: 利用List来实现排序 List<Tuple2<Integer, Integer>> datas = new ArrayList<>(); for (Integer vc : vcCountMap.keySet()) { datas.add(Tuple2.of(vc, vcCountMap.get(vc))); } // 对List进行排序,根据count值 降序 datas.sort(new Comparator<Tuple2<Integer, Integer>>() { @Override public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) { // 降序, 后 减 前 return o2.f1 - o1.f1; } }); // 3.取出 count最大的2个 vc StringBuilder outStr = new StringBuilder(); outStr.append("================================\n"); // 遍历 排序后的 List,取出前2个, 考虑可能List不够2个的情况 ==》 List中元素的个数 和 2 取最小值 for (int i = 0; i < Math.min(2, datas.size()); i++) { Tuple2<Integer, Integer> vcCount = datas.get(i); outStr.append("Top" + (i + 1) + "\n"); outStr.append("vc=" + vcCount.f0 + "\n"); outStr.append("count=" + vcCount.f1 + "\n"); outStr.append("窗口结束时间=" + DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS") + "\n"); outStr.append("================================\n"); } out.collect(outStr.toString()); } } }
我们可以从两个方面去做优化:一是对数据进行按键分区,分别统计vc的出现次数;二是进行增量聚合,得到结果最后再做排序输出。所以,我们可以使用增量聚合函数AggregateFunction进行浏览量的统计,然后结合ProcessWindowFunction排序输出来实现Top N的需求
具体实现可以分成两步:先对每个vc统计出现次数,然后再将统计结果收集起来,排序输出最终结果。由于最后的排序还是基于每个时间窗口的,输出的统计结果中要包含窗口信息,我们可以输出包含了vc、出现次数(count)以及窗口结束时间的Tuple3。之后先按窗口结束时间分区,然后用KeyedProcessFunction来实现。
用KeyedProcessFunction来收集数据做排序,这时面对的是窗口聚合之后的数据流,而窗口已经不存在了;我们需要确保能够收集齐所有数据,所以应该在窗口结束时间基础上再“多等一会儿”。具体实现上,可以采用一个延迟触发的事件时间定时器。基于窗口的结束时间来设定延迟,其实并不需要等太久——因为我们是靠水位线的推进来触发定时器,而水位线的含义就是“之前的数据都到齐了”。所以我们只需要设置1毫秒的延迟,就一定可以保证这一点。
而在等待过程中,之前已经到达的数据应该缓存起来,我们这里用一个自定义的HashMap来进行存储,key为窗口的标记,value为List。之后每来一条数据,就把它添加到当前的HashMap中,并注册一个触发时间为窗口结束时间加1毫秒(windowEnd + 1)的定时器。待到水位线到达这个时间,定时器触发,我们可以保证当前窗口所有vc的统计结果Tuple3都到齐了;于是从HashMap中取出进行排序输出。
public class KeyedProcessFunctionTopNDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); // 最近10秒= 窗口长度, 每5秒输出 = 滑动步长 /** * TODO 思路二: 使用 KeyedProcessFunction实现 * 1、按照vc做keyby,开窗,分别count * ==》 增量聚合,计算 count * ==》 全窗口,对计算结果 count值封装 , 带上 窗口结束时间的 标签 * ==》 为了让同一个窗口时间范围的计算结果到一起去 * * 2、对同一个窗口范围的count值进行处理: 排序、取前N个 * =》 按照 windowEnd做keyby * =》 使用process, 来一条调用一次,需要先存,分开存,用HashMap,key=windowEnd,value=List * =》 使用定时器,对 存起来的结果 进行 排序、取前N个 */ // 1. 按照 vc 分组、开窗、聚合(增量计算+全量打标签) // 开窗聚合后,就是普通的流,没有了窗口信息,需要自己打上窗口的标记 windowEnd SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS.keyBy(sensor -> sensor.getVc()) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate( new VcCountAgg(), new WindowResult() ); // 2. 按照窗口标签(窗口结束时间)keyby,保证同一个窗口时间范围的结果,到一起去。排序、取TopN windowAgg.keyBy(r -> r.f2) .process(new TopN(2)) .print(); env.execute(); } public static class VcCountAgg implements AggregateFunction<WaterSensor, Integer, Integer> { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(WaterSensor value, Integer accumulator) { return accumulator + 1; } @Override public Integer getResult(Integer accumulator) { return accumulator; } @Override public Integer merge(Integer a, Integer b) { return null; } } /** * 泛型如下: * 第一个:输入类型 = 增量函数的输出 count值,Integer * 第二个:输出类型 = Tuple3(vc,count,windowEnd) ,带上 窗口结束时间 的标签 * 第三个:key类型 , vc,Integer * 第四个:窗口类型 */ public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> { @Override public void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception { // 迭代器里面只有一条数据,next一次即可 Integer count = elements.iterator().next(); long windowEnd = context.window().getEnd(); out.collect(Tuple3.of(key, count, windowEnd)); } } public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> { // 存不同窗口的 统计结果,key=windowEnd,value=list数据 private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap; // 要取的Top数量 private int threshold; public TopN(int threshold) { this.threshold = threshold; dataListMap = new HashMap<>(); } @Override public void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception { // 进入这个方法,只是一条数据,要排序,得到齐才行 ===》 存起来,不同窗口分开存 // 1. 存到HashMap中 Long windowEnd = value.f2; if (dataListMap.containsKey(windowEnd)) { // 1.1 包含vc,不是该vc的第一条,直接添加到List中 List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd); dataList.add(value); } else { // 1.1 不包含vc,是该vc的第一条,需要初始化list List<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>(); dataList.add(value); dataListMap.put(windowEnd, dataList); } // 2. 注册一个定时器, windowEnd+1ms即可( // 同一个窗口范围,应该同时输出,只不过是一条一条调用processElement方法,只需要延迟1ms即可 ctx.timerService().registerEventTimeTimer(windowEnd + 1); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); // 定时器触发,同一个窗口范围的计算结果攒齐了,开始 排序、取TopN Long windowEnd = ctx.getCurrentKey(); // 1. 排序 List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd); dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() { @Override public int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) { // 降序, 后 减 前 return o2.f1 - o1.f1; } }); // 2. 取TopN StringBuilder outStr = new StringBuilder(); outStr.append("================================\n"); // 遍历 排序后的 List,取出前 threshold 个, 考虑可能List不够2个的情况 ==》 List中元素的个数 和 2 取最小值 for (int i = 0; i < Math.min(threshold, dataList.size()); i++) { Tuple3<Integer, Integer, Long> vcCount = dataList.get(i); outStr.append("Top" + (i + 1) + "\n"); outStr.append("vc=" + vcCount.f0 + "\n"); outStr.append("count=" + vcCount.f1 + "\n"); outStr.append("窗口结束时间=" + vcCount.f2 + "\n"); outStr.append("================================\n"); } // 用完的List,及时清理,节省资源 dataList.clear(); out.collect(outStr.toString()); } } }
侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了
DataStream<Integer> stream = env.fromSource(...); OutputTag<String> outputTag = new OutputTag<String>("side-output") {}; SingleOutputStreamOperator<Long> longStream = stream.process(new ProcessFunction<Integer, Long>() { @Override public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception { // 转换成Long,输出到主流中 out.collect(Long.valueOf(value)); // 转换成String,输出到侧输出流中 ctx.output(outputTag, "side-output: " + String.valueOf(value)); } }); // 这里output()方法需要传入两个参数,第一个是一个“输出标签”OutputTag,用来标识侧输出流,一般会在外部统一声明;第二个就是要输出的数据。
举例对每个传感器,水位超过10的输出告警信息
public class SideOutputDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING); SingleOutputStreamOperator<WaterSensor> process = sensorDS.keyBy(sensor -> sensor.getId()) .process( new KeyedProcessFunction<String, WaterSensor, WaterSensor>() { @Override public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception { // 使用侧输出流告警 if (value.getVc() > 10) { ctx.output(warnTag, "当前水位=" + value.getVc() + ",大于阈值10!!!"); } // 主流正常 发送数据 out.collect(value); } } ); process.print("主流"); process.getSideOutput(warnTag).printToErr("warn"); env.execute(); } }
Flink的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。通常我们采用Flink托管状态来实现需求
托管状态分为两类:算子状态和按键分区状态
另外,也可以通过**富函数类(Rich Function)**来自定义Keyed State,所以只要提供了富函数类接口的算子,也都可以使用Keyed State。所以即使是map、filter这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State。比如RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是Keyed State。从这个角度讲,Flink中所有的算子都可以是有状态的。
无论是Keyed State还是Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享
使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State
public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}
// 这里的T是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是ValueState<Long>
// T value():获取当前状态的值;
// update(T value):对状态进行更新,传入的参数value就是要覆写的状态值
**案例需求:**检测每种传感器的水位值,如果连续的两个水位值超过10,就输出报警
public class KeyedValueStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); sensorDS.keyBy(r -> r.getId()) .process( new KeyedProcessFunction<String, WaterSensor, String>() { // TODO 1.定义状态 ValueState<Integer> lastVcState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // TODO 2.在open方法中,初始化状态 // 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型 lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // lastVcState.value(); // 取出 本组 值状态 的数据 // lastVcState.update(); // 更新 本组 值状态 的数据 // lastVcState.clear(); // 清除 本组 值状态 的数据 // 1. 取出上一条数据的水位值(Integer默认值是null,判断) int lastVc = lastVcState.value() == null ? 0 : lastVcState.value(); // 2. 求差值的绝对值,判断是否超过10 Integer vc = value.getVc(); if (Math.abs(vc - lastVc) > 10) { out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!"); } // 3. 更新状态里的水位值 lastVcState.update(vc); } } ) .print(); env.execute(); } }
将需要保存的数据,以列表(List)的形式组织起来。在ListState<T>接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似
类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致
案例:针对每种传感器输出最高的3个水位值
public class KeyedListStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); sensorDS.keyBy(r -> r.getId()) .process( new KeyedProcessFunction<String, WaterSensor, String>() { ListState<Integer> vcListState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 1.来一条,存到list状态里 vcListState.add(value.getVc()); // 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的 Iterable<Integer> vcListIt = vcListState.get(); // 2.1 拷贝到List中 List<Integer> vcList = new ArrayList<>(); for (Integer vc : vcListIt) { vcList.add(vc); } // 2.2 对List进行降序排序 vcList.sort((o1, o2) -> o2 - o1); // 2.3 只保留最大的3个(list中的个数一定是连续变大,一超过3就立即清理即可) if (vcList.size() > 3) { // 将最后一个元素清除(第4个) vcList.remove(3); } out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + vcList.toString()); // 3.更新list状态 vcListState.update(vcList); // vcListState.get(); //取出 list状态 本组的数据,是一个Iterable // vcListState.add(); // 向 list状态 本组 添加一个元素 // vcListState.addAll(); // 向 list状态 本组 添加多个元素 // vcListState.update(); // 更新 list状态 本组数据(覆盖) // vcListState.clear(); // 清空List状态 本组数据 } } ) .print(); env.execute(); } }
把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。对应的MapState<UK, UV>接口中,就会有UK、UV两个泛型,分别表示保存的key和value的类型。同样,MapState提供了操作映射状态的方法,与Map的使用非常类似
另外,MapState也提供了获取整个映射相关信息的方法;
**案例需求:**统计每种传感器每种水位值出现的次数
public class KeyedMapStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); sensorDS.keyBy(r -> r.getId()) .process( new KeyedProcessFunction<String, WaterSensor, String>() { MapState<Integer, Integer> vcCountMapState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 1.判断是否存在vc对应的key Integer vc = value.getVc(); if (vcCountMapState.contains(vc)) { // 1.1 如果包含这个vc的key,直接对value+1 Integer count = vcCountMapState.get(vc); vcCountMapState.put(vc, ++count); } else { // 1.2 如果不包含这个vc的key,初始化put进去 vcCountMapState.put(vc, 1); } // 2.遍历Map状态,输出每个k-v的值 StringBuilder outStr = new StringBuilder(); outStr.append("======================================\n"); outStr.append("传感器id为" + value.getId() + "\n"); for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) { outStr.append(vcCount.toString() + "\n"); } outStr.append("======================================\n"); out.collect(outStr.toString()); // vcCountMapState.get(); // 对本组的Map状态,根据key,获取value // vcCountMapState.contains(); // 对本组的Map状态,判断key是否存在 // vcCountMapState.put(, ); // 对本组的Map状态,添加一个 键值对 // vcCountMapState.putAll(); // 对本组的Map状态,添加多个 键值对 // vcCountMapState.entries(); // 对本组的Map状态,获取所有键值对 // vcCountMapState.keys(); // 对本组的Map状态,获取所有键 // vcCountMapState.values(); // 对本组的Map状态,获取所有值 // vcCountMapState.remove(); // 对本组的Map状态,根据指定key,移除键值对 // vcCountMapState.isEmpty(); // 对本组的Map状态,判断是否为空 // vcCountMapState.iterator(); // 对本组的Map状态,获取迭代器 // vcCountMapState.clear(); // 对本组的Map状态,清空 } } ) .print(); env.execute(); } }
**案例需求:**计算每种传感器的平均水位
public class KeyedAggregatingStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); sensorDS.keyBy(r -> r.getId()) .process( new KeyedProcessFunction<String, WaterSensor, String>() { AggregatingState<Integer, Double> vcAvgAggregatingState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); vcAvgAggregatingState = getRuntimeContext() .getAggregatingState( new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>( "vcAvgAggregatingState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() { @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0, 0); } @Override public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) { return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1); } @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { return accumulator.f0 * 1D / accumulator.f1; } @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { // return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1); return null; } }, Types.TUPLE(Types.INT, Types.INT)) ); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 将 水位值 添加到 聚合状态中 vcAvgAggregatingState.add(value.getVc()); // 从 聚合状态中 获取结果 Double vcAvg = vcAvgAggregatingState.get(); out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg); // vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果 // vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据,会自动进行聚合 // vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据 } } ) .print(); env.execute(); } }
**案例需求:**计算每种传感器的平均水位
public class KeyedAggregatingStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); sensorDS.keyBy(r -> r.getId()) .process( new KeyedProcessFunction<String, WaterSensor, String>() { AggregatingState<Integer, Double> vcAvgAggregatingState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); vcAvgAggregatingState = getRuntimeContext() .getAggregatingState( new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>( "vcAvgAggregatingState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() { @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0, 0); } @Override public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) { return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1); } @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { return accumulator.f0 * 1D / accumulator.f1; } @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { // return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1); return null; } }, Types.TUPLE(Types.INT, Types.INT)) ); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 将 水位值 添加到 聚合状态中 vcAvgAggregatingState.add(value.getVc()); // 从 聚合状态中 获取结果 Double vcAvg = vcAvgAggregatingState.get(); out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg); // vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果 // vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据,会自动进行聚合 // vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据 } } ) .print(); env.execute(); } }
在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。
具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。
配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间。
设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。
设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。
除此之外,TTL配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对RocksDB状态后端使用压缩过滤器(compaction filter)进行后台清理。这里需要注意,目前的TTL设置只支持处理时间。
public class StateTTLDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, ts) -> element.getTs() * 1000L) ); sensorDS.keyBy(r -> r.getId()) .process( new KeyedProcessFunction<String, WaterSensor, String>() { ValueState<Integer> lastVcState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // TODO 1.创建 StateTtlConfig StateTtlConfig stateTtlConfig = StateTtlConfig .newBuilder(Time.seconds(5)) // 过期时间5s // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间 .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值 .build(); // TODO 2.状态描述器 启用 TTL ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT); stateDescriptor.enableTimeToLive(stateTtlConfig); this.lastVcState = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 先获取状态值,打印 ==》 读取状态 Integer lastVc = lastVcState.value(); out.collect("key=" + value.getId() + ",状态值=" + lastVc); // 如果水位大于10,更新状态值 ===》 写入状态 if (value.getVc() > 10) { lastVcState.update(value.getVc()); } } } ) .print(); env.execute(); } }
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState
与Keyed State中的ListState一样,将状态表示为一组数据的列表。与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的rebanlance数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。
算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
**案例实操:**在map算子中计算数据的个数。
public class OperatorListStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env .socketTextStream("hadoop102", 7777) .map(new MyCountMapFunction()) .print(); env.execute(); } // TODO 1.实现 CheckpointedFunction 接口 public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction { private Long count = 0L; private ListState<Long> state; @Override public Long map(String value) throws Exception { return ++count; } /** * TODO 2.本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用 * * @param context * @throws Exception */ @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { System.out.println("snapshotState..."); // 2.1 清空算子状态 state.clear(); // 2.2 将 本地变量 添加到 算子状态 中 state.add(count); } /** * TODO 3.初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次 * * @param context * @throws Exception */ @Override public void initializeState(FunctionInitializationContext context) throws Exception { System.out.println("initializeState..."); // 3.1 从 上下文 初始化 算子状态 state = context .getOperatorStateStore() .getListState(new ListStateDescriptor<Long>("state", Types.LONG)); // 3.2 从 算子状态中 把数据 拷贝到 本地变量 if (context.isRestored()) { for (Long c : state.get()) { count += c; } } } } }
与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。
UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。
state = context
.getOperatorStateStore()
.getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。
因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
**案例实操:**水位超过指定的阈值发送告警,阈值可以动态修改
public class OperatorBroadcastStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); // 数据流 SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()); // 配置流(用来广播配置) DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888); // TODO 1. 将 配置流 广播 MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT); BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState); // TODO 2.把 数据流 和 广播后的配置流 connect BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS); // TODO 3.调用 process sensorBCS .process( new BroadcastProcessFunction<WaterSensor, String, String>() { /** * 数据流的处理方法: 数据流 只能 读取 广播状态,不能修改 * @param value * @param ctx * @param out * @throws Exception */ @Override public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception { // TODO 5.通过上下文获取广播状态,取出里面的值(只读,不能修改) ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState); Integer threshold = broadcastState.get("threshold"); // 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来 threshold = (threshold == null ? 0 : threshold); if (value.getVc() > threshold) { out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!"); } } /** * 广播后的配置流的处理方法: 只有广播流才能修改 广播状态 * @param value * @param ctx * @param out * @throws Exception */ @Override public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { // TODO 4. 通过上下文获取广播状态,往里面写数据 BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState); broadcastState.put("threshold", Integer.valueOf(value)); } } ) .print(); env.execute(); } }
在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置
状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend
HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。
RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。
EmbeddedRocksDBStateBackend始终执行的是异步快照,所以不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率
HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。
HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级
在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml
中指定的,配置的键名称为state.backend
。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值
在flink-conf.yaml
中,可以使用state.backend来配置默认状态后端。配置项的可能值为hashmap,这样配置的就是HashMapStateBackend;如果配置项的值是rocksdb,这样配置的就是EmbeddedRocksDBStateBackend
# 默认状态后端
state.backend: hashmap
# 存放检查点的文件路径
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints
# 这里的state.checkpoints.dir配置项,定义了检查点和元数据写入的目录
// 通过执行环境设置,HashMapStateBackend StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new HashMapStateBackend()); // 通过执行环境设置,EmbeddedRocksDBStateBackend StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend()); // 需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖 // 而由于Flink发行版中默认就包含了RocksDB(服务器上解压的Flink),所以只要我们的代码中没有使用RocksDB的相关内容,就不需要引入这个依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>${flink.version}</version> </dependency
我们应该在所有任务(算子)都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。这样做可以实现一个数据被所有任务(算子)完整地处理完,状态得到了保存。
如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。
检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。
简单来说,就是当flink重启时,会重新定位到最近的检查点,并从该检查点开始重新计算,实现精准一次
借鉴水位线的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。
这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的“分界线”(Checkpoint Barrier)。
watermark指示的是“之前的数据全部到齐了”,而barrier指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。
下面是执行顺序:
(1)触发检查点:JobManager向Source发送Barrier;
(2)Barrier发送:向下游广播发送;
(3)Barrier对齐:下游需要收到上游所有并行度传递过来的Barrier才做自身状态的保存;
(4)状态保存:有状态的算子将状态保存至持久化。
(5)先处理缓存数据,然后正常继续处理
和精准一次类似,但是保存的数据在宕机重启时会重复计算
默认情况下,Flink程序是禁用检查点的。如果想要为Flink应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 这里需要传入一个长整型的毫秒数,表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点,默认的间隔周期为500毫秒,这种方式已经被弃用
// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);
检查点具体的持久化存储位置,取决于“检查点存储”的设置。默认情况下,检查点存储在JobManager的堆内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口
// 配置存储检查点到JobManager堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
// 对于实际生产应用,我们一般会将CheckpointStorage配置为高可用的分布式文件系统(HDFS,S3等)。
// 检查点还有很多可以配置的选项,可以通过获取检查点配置(CheckpointConfig)来进行设置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
检查点模式(CheckpointingMode)
设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为exactly-once,而对于大多数低延迟的流处理程序,at-least-once就够用了,而且处理效率会更高。
超时时间(checkpointTimeout)
用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间。
最小间隔时间(minPauseBetweenCheckpoints)
用于指定在上一个检查点完成之后,检查点协调器最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,实际并发为1。
最大并发检查点数量(maxConcurrentCheckpoints)
用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。
开启外部持久化存储(enableExternalizedCheckpoints)
用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数ExternalizedCheckpointCleanup指定了当作业取消的时候外部的检查点该如何清理。
检查点连续失败次数(tolerableCheckpointFailureNumber)
用于指定检查点连续失败的次数,当达到这个次数,作业就失败退出。默认为0,这意味着不能容忍检查点失败,并且作业将在第一次报告检查点失败时失败。
开启非对齐检查点
非对齐检查点(enableUnalignedCheckpoints)
不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为exctly-once,并且最大并发的检查点个数为1。
对齐检查点超时时间(alignedCheckpointTimeout)
该参数只有在启用非对齐检查点的时候有效。参数默认是0,表示一开始就直接用非对齐检查点。如果设置大于0,一开始会使用对齐的检查点,当对齐时间超过该参数设定的时间,则会自动切换成非对齐检查点
public class CheckpointConfigDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); // 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名 System.setProperty("HADOOP_USER_NAME", "atguigu"); // TODO 检查点配置 // 1、启用检查点: 默认是barrier对齐的,周期为5s, 精准一次 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // 2、指定检查点的存储位置 checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk"); // 3、checkpoint的超时时间: 默认10分钟 checkpointConfig.setCheckpointTimeout(60000); // 4、同时运行中的checkpoint的最大数量 checkpointConfig.setMaxConcurrentCheckpoints(1); // 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔,设置了>0,并发就会变成1 checkpointConfig.setMinPauseBetweenCheckpoints(1000); // 6、取消作业时,checkpoint的数据 是否保留在外部系统 // DELETE_ON_CANCELLATION:主动cancel时,删除存在外部系统的chk-xx目录 (如果是程序突然挂掉,不会删) // RETAIN_ON_CANCELLATION:主动cancel时,外部系统的chk-xx目录会保存下来 checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 7、允许 checkpoint 连续失败的次数,默认0--》表示checkpoint一失败,job就挂掉 checkpointConfig.setTolerableCheckpointFailureNumber(10); // TODO 开启 非对齐检查点(barrier非对齐) // 开启的要求: Checkpoint模式必须是精准一次,最大并发必须设为1 checkpointConfig.enableUnalignedCheckpoints(); // 开启非对齐检查点才生效: 默认0,表示一开始就直接用 非对齐的检查点 // 如果大于0, 一开始用 对齐的检查点(barrier对齐), 对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐) checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1)); env .socketTextStream("hadoop102", 7777) .flatMap( (String value, Collector<Tuple2<String, Integer>> out) -> { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } ) .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(value -> value.f0) .sum(1) .print(); env.execute(); } }
在 1.15 之前,只有RocksDB 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
// Rocksdb状态后端启用增量checkpoint:
EmbeddedRocksDBStateBackend backend = newEmbeddedRocksDBStateBackend(true);
从 1.15 开始,不管hashmap还是rocksdb 状态后端都可以通过开启changelog实现通用的增量checkpoint(实验室功能)
如果数据源是有界的,就可能出现部分Task已经处理完所有数据,变成finished状态,不继续工作。从 Flink 1.14 开始,这些finished状态的Task,也可以继续执行检查点。自 1.15 起默认启用此功能,并且可以通过功能标志禁用它
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
除了检查点外,Flink还提供了另一个非常独特的镜像保存功能——保存点(savepoint)。从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据
保存点与检查点最大的区别,就是触发的时机。检查点是由Flink自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。
保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启。它适用的具体场景有:
需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子ID-状态名称这样的key-value组织起来的,算子ID可以在代码中直接调用SingleOutputStreamOperator的.uid()方法来进行指定
DataStream<String> stream = env
.addSource(new StatefulSource()).uid("source-id")
.map(new StatefulMapper()).uid("mapper-id")
.print();
对于没有设置ID的算子,Flink默认会自动进行设置,所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定ID
# =============创建保存点=============== # 要在命令行中为运行的作业创建一个保存点镜像,只需要执行 # 这里jobId需要填充要做镜像保存的作业ID,目标路径targetDirectory可选,表示保存点存储的路径 bin/flink savepoint :jobId [:targetDirectory] # 于保存点的默认路径,可以通过配置文件flink-conf.yaml中的state.savepoints.dir项来设定 # 当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置 env.setDefaultSavepointDir("hdfs:///flink/savepoints"); # 由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点 bin/flink stop --savepointPath [:targetDirectory] :jobId # ============从保存点重启应用======== # 从保存点重启一个应用 bin/flink run -s :savepointPath [:runArgs] # 这里只要增加一个-s参数,指定保存点的路径就可以了,其它启动时的参数还是完全一样的,如果是基于yarn的运行模式还需要加上 -yid application-id # Flink的Web UI,这里只要增加一个-s参数,指定保存点的路径就可以了,其它启动时的参数还是完全一样的,如果是基于yarn的运行模式还需要加上 -yid application-id
使用savepoint恢复状态的时候,也可以更换状态后端。但是有一点需要注意的是,不要在代码中指定状态后端了, 通过配置文件来配置或者-D 参数配置
# 打包时,服务器上有的就provided,可能遇到依赖问题,报错:javax.annotation.Nullable找不到,可以导入如下依赖 <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <version>1.3.9</version> </dependency> # 提交flink作业 bin/flink run-application -d -t yarn-application -Dstate.backend=hashmap -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar # 停止flink作业时,触发保存点 # 方式一:stop优雅停止并触发保存点,要求source实现StoppableFunction接口 bin/flink stop -p savepoint路径 job-id -yid application-id # 方式二:cancel立即停止并触发保存点 bin/flink cancel -s savepoint路径 job-id -yid application-id # 案例中source是socket,不能用stop bin/flink cancel -s hdfs://hadoop102:8020/sp cffca338509ea04f38f03b4b77c8075c -yid application_1681871196375_0001 # 从savepoint恢复作业,同时修改状态后端 bin/flink run-application -d -t yarn-application -s hdfs://hadoop102:8020/sp/savepoint-267cc0-47a214b019d5 -Dstate.backend=rocksdb -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar # 从保存下来的checkpoint恢复作业 bin/flink run-application -d -t yarn-application -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 -c com.atguigu.checkpoint.SavepointDemo ./FlinkTutorial-1.0-SNAPSHOT.jar # 如果停止作业时,忘了触发保存点也不用担心,现在版本的flink支持从保留在外部系统的checkpoint恢复作业,但是恢复时不支持切换状态后端。
一致性其实就是结果的正确性,一般从数据丢失、数据重复来评估。流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”。
一般说来,状态一致性有三种级别:
我们已经知道检查点可以保证Flink内部状态的一致性,而且可以做到精确一次。那是不是说,只要开启了检查点,发生故障进行恢复,结果就不会有任何问题呢?
在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。而用户或者外部应用不会直接从Flink内部的状态读取数据,往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑Flink内部数据的处理转换,还涉及到从外部数据源读取,以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。
所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫做“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。一般来说,能否达到at-least-once一致性级别,主要看数据源能够重放数据;而能否达到exactly-once级别,流处理器内部、数据源、外部存储都要有相应的保证机制
[外链图片转存中…(img-Ecz48jRp-1713257353325)]
输入端主要指的就是Flink读取的外部数据源。对于一些数据源来说,并不提供数据的缓冲或是持久化保存,数据被消费之后就彻底不存在了,例如socket文本流。对于这样的数据源,故障后我们即使通过检查点恢复之前的状态,可保存检查点之后到发生故障期间的数据已经不能重发了,这就会导致数据丢失。所以就只能保证at-most-once的一致性语义,相当于没有保证
想要在故障恢复后不丢数据,外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是Kafka。在Flink的Source任务中将数据读取的偏移量保存为状态,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据
数据源可重放数据,或者说可重置读取数据偏移量,加上Flink的Source算子将偏移量作为状态保存进检查点,就可以保证数据不丢。这是达到at-least-once一致性语义的基本要求,当然也是实现端到端exactly-once的基本要求
为了实现端到端exactly-once,我们还需要对外部存储系统、以及Sink连接器有额外的要求。能够保证exactly-once一致性的写入方式有两种:幂等写入和事务写入
所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改
具体来说,又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)
[外链图片转存中…(img-3ANCMsjN-1713257353326)]
Flink内部可以通过检查点机制保证状态和处理结果的exactly-once语义。
输入数据源端的Kafka可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我们可以在Source任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中;当发生故障时,从检查点中读取恢复状态,并由连接器FlinkKafkaConsumer向Kafka重新提交偏移量,就可以重新消费数据、保证结果的一致性了。
输出端保证exactly-once的最佳实现,当然就是两阶段提交(2PC)。作为与Flink天生一对的Kafka,自然需要用最强有力的一致性保证来证明自己。也就是说,我们写入Kafka的过程实际上是一个两段式的提交:处理完毕得到结果,写入Kafka时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”。如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。
需要的配置
(1)必须启用检查点
(2)指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE
(3)配置Kafka读取数据的消费者的隔离级别
这里所说的Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而Kafka中默认的隔离级别isolation.level是read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置为read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。
(4)事务超时配置
Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms
默认是1小时,而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms
默认是15分钟。所以在检查点保存时间很长时,有可能出现Kafka已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者
public class KafkaEOSDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名 System.setProperty("HADOOP_USER_NAME", "atguigu"); // TODO 1、启用检查点,设置为精准一次 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk"); checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // TODO 2.读取kafka KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") .setGroupId("atguigu") .setTopics("topic_1") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) .build(); DataStreamSource<String> kafkasource = env .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"); /** * TODO 3.写出到Kafka * 精准一次 写入Kafka,需要满足以下条件,缺一不可 * 1、开启checkpoint * 2、sink设置保证级别为 精准一次 * 3、sink设置事务前缀 * 4、sink设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟 */ KafkaSink<String> kafkaSink = KafkaSink.<String>builder() // 指定 kafka 的地址和端口 .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定序列化器:指定Topic名称、具体的序列化 .setRecordSerializer( KafkaRecordSerializationSchema.<String>builder() .setTopic("ws") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) // TODO 3.1 精准一次,开启 2pc .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // TODO 3.2 精准一次,必须设置 事务的前缀 .setTransactionalIdPrefix("atguigu-") // TODO 3.3 精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"") .build(); kafkasource.sinkTo(kafkaSink); env.execute(); } }
后续读取“ws”这个topic的消费者,要设置事务的隔离级别为“读已提交”,如下
public class KafkaEOSDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 消费 在前面使用两阶段提交写入的Topic KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") .setGroupId("atguigu") .setTopics("ws") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) // TODO 作为 下游的消费者,要设置 事务的隔离级别 = 读已提交 .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") .build(); env .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource") .print(); env.execute(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。