赞
踩
目录
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中。
但是当Task挂掉,那么这个Task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。
想要保证 At -least-once 和 Exactly-once,则需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
先来看下Flink提供的状态有哪些,Flink中状态可以分为两种类型:
Keyed State
基于KeyedStream上的状态,这个状态是跟特定的Key绑定,KeyedStream流上的每一个Key都对应一个State,每一个Operator可以启动多个Thread处理,但是相同Key的数据只能由同一个Thread处理,因此一个Keyed状态只能存在于某一个Thread中,一个Thread会有多个Keyed State。
Non-Keyed State(Operator State)
Operator State与Key无关,而是与Operator绑定,整个Operator只对应一个State。比如:Flink中的Kafka Connector就使用了Operator State,它会在每个Connector实例中,保存该实例消费Topic的所有(partition, offset)映射。
Flink针对Keyed State提供了以下可以保存State的数据结构:
ValueState:类型为T的单值状态,这个状态与对应的Key绑定,最简单的状态,通过update更新值,通过value获取状态值。
ListState:Key上的状态值为一个列表,这个列表可以通过add()
方法往列表中添加值,也可以通过get()
方法返回一个Iterable来遍历状态值。
ReducingState:每次调用add()
方法添加值的时候,会调用用户传入的reduceFunction
,最后合并到一个单一的状态值。
MapState:状态值为一个Map,用户通过put()
或putAll()
方法添加元素,get(key)通过指定的key获取value,使用entries()
、keys()
、values()
检索。
AggregatingState:保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState
相反的是, 聚合类型可能与添加到状态的元素的类型不同。使用 add(IN)
添加的元素会调用用户指定的 AggregateFunction
进行聚合。
FoldingState:已过时,建议使用AggregatingState 保留一个单值,表示添加到状态的所有值的聚合。 与 ReducingState
相反,聚合类型可能与添加到状态的元素类型不同。 使用add(T)
添加的元素会调用用户指定的 FoldFunction
折叠成聚合值。
案例1:使用ValueState统计每个键的当前计数
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.fromElements(Tuple2.of("user1", 1), Tuple2.of("user2", 1), Tuple2.of("user1", 1), Tuple2.of("user2", 1))
- .keyBy(0)
- .flatMap(new CountWithKeyedState())
- .print();
- env.execute("Flink ValueState example");
- }
-
- public static class CountWithKeyedState extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
- private transient ValueState<Integer> countState;
- @Override
- public void open(Configuration parameters) throws Exception {
- ValueStateDescriptor<Integer> descriptor =
- new ValueStateDescriptor<>("countState", Integer.class, 0);
- countState = getRuntimeContext().getState(descriptor);
- }
-
- @Override
- public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
- Integer currentCount = countState.value();
- currentCount += value.f1;
- countState.update(currentCount);
- out.collect(Tuple2.of(value.f0, currentCount));
- }
- }
在这段代码中,我们首先创建了一个 StreamExecutionEnvironment
,然后产生一些元素,每个元素都是指定用户的一个事件。keyBy(0)
表示我们以元组的第一个字段(即用户ID)为键进行分组。
然后,我们使用 flatMap
算子应用了 CountWithKeyedState
函数。这个函数使用了 Flink 的 ValueState 来存储和更新每个键的当前计数。
在 open
方法中,我们定义了一个名为 "countState" 的 ValueState,并把它初始化为 0。在 flatMap
方法中,我们从 ValueState 中获取当前计数,增加输入元素的值,然后更新 ValueState,并发出带有当前总数的元组。
注意:在真实的生产环境中,你可能需要从数据源(如 Kafka, HDFS等)读取数据,而不是使用 fromElements
方法
案例2:使用MapState 统计单词出现次数
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.socketTextStream("localhost", 9999)
- .flatMap(new Tokenizer())
- .keyBy(value -> value.f0)
- .flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
- private transient MapState<String, Integer> wordState;
- @Override
- public void open(Configuration parameters){
- MapStateDescriptor<String, Integer> descriptor =
- new MapStateDescriptor<>("wordCount", String.class, Integer.class);
- wordState = getRuntimeContext().getMapState(descriptor);
- }
- @Override
- public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
- Integer count = wordState.get(value.f0);
- if (count == null) {
- count = 0;
- }
- count += value.f1;
- wordState.put(value.f0, count);
- out.collect(Tuple2.of(value.f0, count));
- }
- })
- .print();
- env.execute("Word Count with MapState");
- }
-
- public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- String[] words = value.toLowerCase().split("\\W+");
- for (String word : words) {
- if (word.length() > 0) {
- out.collect(new Tuple2<>(word, 1));
- }
- }
- }
- }
在这个例子中,我们首先通过 socketTextStream
方法从本地的 socket 获取输入数据流。然后我们用 flatMap
操作将每行输入分解为单个单词,并且为每个单词赋予基础计数值(基数)1。
我们创建一个使用 RichFlatMapFunction
的 operator,它可以访问 MapState
。在 open()
方法中,我们定义了 MapStateDescriptor
,然后用这个 descriptor
创建 MapState
。
在 flatMap()
函数中,我们获取当前单词的计数值,如果不存在则设置为0。然后我们增加计数值,更新 MapState,并且输出当前单词和它的出现次数。
案例3:使用ReducingState统计输入流中每个键的最大值
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(
- Tuple2.of("A", 6),
- Tuple2.of("B", 5),
- Tuple2.of("C", 4),
- Tuple2.of("A", 3),
- Tuple2.of("B", 2),
- Tuple2.of("C", 1)
- );
- dataStream.keyBy(0).flatMap(new MaxValueReducer()).print();
- env.execute("ReducingState Example");
- }
- public static class MaxValueReducer extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
- private transient ReducingState<Integer> maxState;
- @Override
- public void open(Configuration config) {
- ReducingStateDescriptor<Integer> descriptor = new ReducingStateDescriptor<>(
- "maxValue", // state的名字
- Math::max, // ReduceFunction,这里取两者的最大值
- TypeInformation.of(Integer.class)); // 类型信息
- maxState = getRuntimeContext().getReducingState(descriptor);
- }
- @Override
- public void flatMap(Tuple2<String, Integer> input, Collector<Tuple2<String, Integer>> out) throws Exception {
- maxState.add(input.f1); // 更新state的值
- out.collect(Tuple2.of(input.f0, maxState.get())); // 输出当前key的最大值
- }
- }
在上述代码中,我们首先创建了一个新的MaxValueReducer
类,该类扩展了RichFlatMapFunction
。然后定义了一个ReducingState
变量,用于在每个key上维护最大值。在open()
方法中,我们初始化了这个状态变量。在flatMap()
方法中,我们简单地将新的值添加到状态中,并输出当前key的最大值。
输出如下:
- 7> (A,6)
- 7> (A,6)
- 2> (B,5)
- 2> (C,4)
- 2> (B,5)
- 2> (C,4)
案例4:使用AggregatingState统计输入流中每个键的平均值
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<String, Integer>> input = env.fromElements(
- Tuple2.of("A", 6),
- Tuple2.of("B", 5),
- Tuple2.of("C", 4),
- Tuple2.of("A", 3),
- Tuple2.of("B", 2),
- Tuple2.of("C", 1)
- );
- input.keyBy(x -> x.f0)
- .process(new AggregatingProcessFunction())
- .print();
- env.execute();
- }
- public static class AverageAggregate implements AggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {
- @Override
- public Tuple2<Integer, Integer> createAccumulator() {
- return new Tuple2<>(0, 0);
- }
- @Override
- public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
- return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
- }
- @Override
- public Double getResult(Tuple2<Integer, Integer> accumulator) {
- return ((double) accumulator.f0) / accumulator.f1;
- }
- @Override
- public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
- return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
- }
- }
-
- public static class AggregatingProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Double>> {
- private AggregatingState<Integer, Double> avgState;
- @Override
- public void open(Configuration parameters) {
- AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double> descriptor =
- new AggregatingStateDescriptor<>("average", new AverageAggregate(), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {
- }));
- avgState = getRuntimeContext().getAggregatingState(descriptor);
- }
- @Override
- public void processElement(Tuple2<String, Integer> value, Context ctx,
- Collector<Tuple2<String, Double>> out) throws Exception {
- avgState.add(value.f1);
- out.collect(new Tuple2<>(value.f0, avgState.get()));
- }
- }
输入如下:
- 7> (A,6.0)
- 2> (B,5.0)
- 2> (C,4.0)
- 7> (A,4.5)
- 2> (B,3.5)
- 2> (C,2.5)
这段代码主要是计算每个键对应的值的平均数。代码中定义了:AverageAggregate
和AggregatingProcessFunction
。
AverageAggregate
类实现了AggregateFunction
接口,用于计算平均值:
createAccumulator
方法返回一个新的累加器,这里是一个包含两个整数的元组,表示当前的总数和元素的数量。
add
方法向累加器添加一个元素的值,将其添加到总数中,并增加元素数量。
getResult
方法根据累加器计算平均值。
merge
方法合并两个累加器,将他们的总数和元素数量相加。
AggregatingProcessFunction
类扩展了KeyedProcessFunction
,在接收到一个元素时添加到状态中的平均值,并输出当前的平均值:
在open
方法中,创建了一个AggregatingStateDescriptor
,描述要保存的状态,这里保存的是平均值。
processElement
方法在接收到一个新元素时,将其值添加到状态中的平均值,然后输出包含键和当前平均值的元组。
以上案例代码都经过本地运行和测试,建议大家自行运行以便更深入地理解。
有状态流应用中的检查点(CheckPoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)
简单来讲,就是一次「存盘」,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态。
如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同「读档」一样。
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的enableCheckpointing()
方法就可以开启检查点。
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(1000);
这里传入的参数是检查点的间隔时间,单位为毫秒。
除了检查点之外,Flink 还提供了「保存点(SavePoint)」的功能。
保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照。
保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个「自动存盘」的功能。而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是「手动存盘」。
因此两者尽管原理一致,但用途就有所差别了。
检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复
检查点具体的持久化存储位置,取决于「检查点存储(CheckPointStorage)」的设置。
默认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口,这就是「 CheckPointStorage」。
具体可以通过调用检查点配置的 setCheckpointStorage()
来配置,需要传入一个CheckPointStorage 的实现类。例如:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 设置检查点时间间隔为1000ms
- env.enableCheckpointing(1000);
- // 设置checkpoint存储路径, 注意路径需要是可访问且有写权限的HDFS或本地路径
- URI checkpointPath = URI.create("hdfs://localhost:9000/flink-checkpoints");
- FileSystemCheckpointStorage storage = new FileSystemCheckpointStorage(checkpointPath, 10000);
- // 应用配置
- env.getCheckpointConfig().setCheckpointStorage(storage);
- // 设置重启策略,这里我们设置为固定延时无限重启
- //Flink的重启策略是用来决定如何处理作业执行过程中出现的失败情况的。如果Flink作业在运行时出错,比如由于代码错误、硬件故障或 网络问题等,那么重启策略就会决定是否和如何重启作业。
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- // 尝试重启次数
- 3,
- //每次尝试重启的固定延迟时间为 10 秒
- org.apache.flink.api.common.time.Time.of(10, java.util.concurrent.TimeUnit.SECONDS)
- ));
- env.execute("Flink Checkpoint Example");
- }
Flink 主要提供了两种 CheckPointStorage:
作业管理器的堆内存(JobManagerCheckpointStorage)
文件系统(FileSystemCheckpointStorage)
对于实际生产应用,我们一般会将 CheckPointStorage 配置为高可用的分布式文件系统(HDFS,S3 等)。
Flink会在输入的数据集上间隔性地生成CheckPoint Barrier,通过栅栏(Barrier)将间隔时间段内的数据划分到相应的CheckPoint中。
当程序出现异常时,Operator就能够从上一次快照中恢复所有算子之前的状态,从而保证数据的一致性。
例如在Kafka Consumer算子中维护offset状态,当系统出现问题无法从Kafka中消费数据时,可以将offset记录在状态中,当任务重新恢复时就能够从指定的偏移量开始消费数据。
默认情况Flink不开启检查点,用户需要在程序中通过调用方法配置来开启检查点,另外还可以调整其他相关参数
CheckPoint 开启和时间间隔指定
开启检查点并且指定检查点时间间隔为1000ms,根据实际情况自行选择,如果状态比较大,则建议适当增加该值
env.enableCheckpointing(1000)
Exactly-once 和 At-least-once语义选择
选择Exactly-once语义保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,不允许出现丢数据或者数据重复,与此同时,Flink的性能也相对较弱。
而At-least-once语义更适合于时廷和吞吐量要求非常高但对数据的一致性要求不高的场景。如下通过setCheckpointingMode()
方法来设定语义模式,默认情况下使用的是Exactly-once模式。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
CheckPoint 超时时间
超时时间指定了每次CheckPoint执行过程中的上限时间范围,一旦CheckPoint执行时间超过该阈值,Flink将会中断CheckPoint过程,并按照超时处理。该指标可以通过setCheckpointTimeout()
方法设定,默认为10分钟
env.getCheckpointConfig().setCheckpointTimeout(5 * 60 * 1000);
CheckPoint 最小时间间隔
该参数主要目的是设定两个CheckPoint之间的最小时间间隔,防止Flink应用密集地触发CheckPoint操作,会占用了大量计算资源而影响到整个应用的性能
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(600)
CheckPoint 最大并行执行数量
在默认情况下只有一个检查点可以运行,根据用户指定的数量可以同时触发多个CheckPoint,进而提升CheckPoint整体的效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1)
任务取消后,是否删除 CheckPoint 中保存的数据
RETAIN_ON_CANCELLATION
:表示一旦Flink处理程序被cancel后,会保留CheckPoint数据,以便根据实际需要恢复到指定的CheckPoint。
DELETE_ON_CANCELLATION
:表示一旦Flink处理程序被cancel后,会删除CheckPoint数据,只有Job执行失败的时候才会保存CheckPoint。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
容忍的检查的失败数
设置可以容忍的检查的失败数,超过这个数量则系统自动关闭和停止任务。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1)
SavePoint 底层实现其实也是使用CheckPoint的机制。
SavePoint是用户以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
要使用SavePoint,需要按照以下步骤进行:
配置状态后端: 在Flink中,状态可以保存在不同的后端存储中,例如内存、文件系统或分布式存储系统(如HDFS)。要启用SavePoint,需要在Flink配置文件中配置合适的状态后端。
通常,使用分布式存储系统作为状态后端是比较常见的做法,因为它可以提供更好的可靠性和容错性。
生成SavePoint: 在Flink应用程序运行时,可以通过以下方式手动触发生成SavePoint:
bin/flink savepoint <jobID> [targetDirectory]
其中,<jobID>
是要保存状态的Flink作业的Job ID,[targetDirectory]
是可选的目标目录,用于保存SavePoint数据。如果没有提供targetDirectory
,SavePoint将会保存到Flink配置中所配置的状态后端中。
恢复SavePoint: 要恢复到SavePoint状态,可以通过以下方式提交作业:
bin/flink run -s :savepointPath [:runArgs]
其中,savepointPath
是之前生成的SavePoint的路径,runArgs
是提交作业时的其他参数。
确保应用程序状态的兼容性: 在使用SavePoint时,应用程序的状态结构和代码必须与生成SavePoint的版本保持兼容。这意味着在更新应用程序代码后,可能需要做一些额外的工作来保证状态的向后兼容性,以便能够成功恢复到旧的SavePoint。
在Flink中提供了StateBackend来存储和管理状态数据。
Flink一共实现了三种类型的状态管理器:MemoryStateBackend
、FsStateBackend
、RocksDBStateBackend
。
基于内存的状态管理器,将状态数据全部存储在JVM堆内存中。
基于内存的状态管理具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。
同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend。
MemoryStateBackend是Flink的默认状态后端管理器
env.setStateBackend(new MemoryStateBackend(100*1024*1024));
注意:聚合类算子的状态会同步到 JobManager 内存中,因此对于聚合类算子比较多的应用会对 JobManager 的内存造成一定的压力,进而影响集群。
和MemoryStateBackend有所不同的是,FsStateBackend是基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统。
env.setStateBackend(new FsStateBackend("path",true));
如果path是本地文件路径,格式为:file:///
;如果path是HDFS文件路径,格式为:hdfs://
。
第二个参数代表是否异步保存状态数据到HDFS,异步方式能够尽可能避免ChecPoint的过程中影响流式计算任务。
FsStateBackend更适合任务量比较大的应用,例如:包含了时间范围非常长的窗口计算,或者状态比较大的场景。
RocksDBStateBackend是Flink中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend需要单独引入相关的依赖包到工程中。
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
- <version>1.14.4</version>
- <scope>test</scope>
- </dependency>
env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink-backend"));
RocksDBStateBackend采用异步的方式进行状态数据的Snapshot,任务中的状态数据首先被写入本地RockDB中,这样在RockDB仅会存储正在进行计算的热数据,而需要进行CheckPoint的时候,会把本地的数据直接复制到远端的FileSystem中。
与FsStateBackend相比,RocksDBStateBackend在性能上要比FsStateBackend高一些,主要是因为借助于RocksDB在本地存储了最新热数据,然后通过异步的方式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比性能就会较弱一些。
RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。
全局配置需要修改集群中的配置文件flink-conf.yaml
。
配置FsStateBackend
- state.backend: filesystem
- state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
配置MemoryStateBackend
state.backend: jobmanager
配置RocksDBStateBackend
- #同时操作RocksDB的线程数
- state.backend.rocksdb.checkpoint.transfer.thread.num: 1
- #RocksDB存储状态数据的本地文件路径
- state.backend.rocksdb.localdir: 本地path
在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有数据都到齐了才开始处理。
所以聚合计算其实在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。
窗口实质上是将无界流切割为一系列有界流,采用左开右闭的原则
Flink中的窗口分为两类:基于时间的窗口(Time-based Window)和基于数量的窗口(Count-based Window)
时间窗口(Time Window):按照时间段去截取数据,这在实际应用中最常见。
计数窗口(Count Window):由数据驱动,也就是说按照固定的个数,来截取一段数据集。
时间窗口中又包含了:滚动时间窗口、滑动时间窗口、会话窗口
计数窗口包含了:滚动计数窗口、滑动计数窗口
时间窗口、计数窗口只是对窗口的一个大致划分。在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以有不同的功能应用。
根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window)、全局窗口(Global Window)
滚动窗口每个窗口的大小固定,且相邻两个窗口之间没有重叠
滚动窗口可以基于时间定义,也可以基于数据个数定义,需要的参数只有窗口大小。
我们可以定义一个大小为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个大小为10的滚动计数窗口,就会每10个数进行一次统计。
基于时间的滚动窗口:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<Integer, Integer>> randomKeyedStream = env
- .fromSequence(1, Long.MAX_VALUE)
- // 将每个数映射为一个二元组,第一个元素是随机键,第二个元素是数本身
- .map(new MapFunction<Long, Tuple2<Integer, Integer>>() {
- private final Random rnd = new Random();
- @Override
- public Tuple2<Integer, Integer> map(Long value) {
- return new Tuple2<>(rnd.nextInt(10), value.intValue());
- }
- });
- // 对流进行滚动窗口操作,窗口大小为5秒
- // 应用窗口函数,求每个窗口的和
- DataStream<Integer> sum = randomKeyedStream
- .assignTimestampsAndWatermarks(WatermarkStrategy
- .<Tuple2<Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
- .withTimestampAssigner((event, timestamp) -> event.f1))
- .keyBy(0)
- .timeWindow(Time.seconds(5))
- .apply(new WindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, TimeWindow>() {
- @Override
- public void apply(Tuple key,
- TimeWindow window,
- Iterable<Tuple2<Integer, Integer>> values,
- Collector<Integer> out){
- int sum1 = 0;
- for (Tuple2<Integer, Integer> val: values) {
- sum1 += val.f1;
- }
- out.collect(sum1);
- }
- });
- sum.print();
- env.execute("Tumbling Window Example");
- }
这个程序的主要功能是从1到Long.MAX_VALUE
产生一个序列,并为每个生成的数字创建一个二元组(Tuple2),然后在5秒大小的窗口上对二元组进行操作并输出每个窗口中所有值的总和。
详细解释如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
: 获取Flink的运行环境。
产生一个无限长的序列(从1开始到最大的Long型数),每个数字都映射成一个二元组,第一个元素(f0)是一个0-9的随机整数(作为键用于之后的keyBy操作),第二个元素(f1)是数字本身。
使用assignTimestampsAndWatermarks
来定义事件时间和水位线。这里设定了最大延迟时间为5秒(forBoundedOutOfOrderness
),并将二元组的第二个元素作为时间戳。
使用keyBy(0)
按照二元组的第一个元素进行分区,这样保证了相同键的元素会被发送到同一个任务中。
定义了一个5秒的滚动窗口timeWindow(Time.seconds(5))
。
使用apply
函数应用在每个窗口上,计算每个窗口中所有二元组的第二个元素(f1)的总和,并收集结果。最终,每个窗口计算的总和都会被输出。
sum.print();
: 命令将处理后的数据打印出来。
env.execute("Tumbling Window Example");
: 启动Flink任务。
基于计数的滚动窗口:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<String> text = env.socketTextStream("localhost", 9999);
- DataStream<Tuple2<String, Integer>> counts = text
- .flatMap(new Tokenizer())
- .keyBy(0)
- .countWindow(5) // Count window of 5 elements
- .sum(1);
- counts.print().setParallelism(1);
- env.execute("Window WordCount");
- }
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- String[] words = value.toLowerCase().split("\\W+");
- for (String word : words) {
- if (word.length() > 0) {
- out.collect(new Tuple2<>(word, 1));
- }
- }
- }
- }
这段程序从本地9999端口读取数据流,对每一行的单词进行小写处理和分割,然后在滑动窗口中(大小为5个元素)计算出各个单词的出现次数。
滑动窗口的大小固定,但窗口之间不是首尾相接,会有部分重合。同样,滑动窗口也可以基于时间和计数定义。
滑动窗口的参数有两个:窗口大小和滑动步长
基于时间的滑动窗口:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<String> input = env.socketTextStream("localhost", 9999);
- DataStream<Tuple2<String, Integer>> processedInput = input.map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String value){
- String[] words = value.split(",");
- return new Tuple2<>(words[0], Integer.parseInt(words[1]));
- }
- });
- // 指定窗口类型为滑动窗口,窗口大小为10分钟,滑动步长为5分钟
- DataStream<Tuple2<String, Integer>> windowCounts = processedInput
- .keyBy(0)
- .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
- .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2){
- return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
- }
- });
- windowCounts.print().setParallelism(1);
- env.execute("Time Window Example");
- }
这段程序从一个套接字端口读取输入数据,将每行输入按照“,”切分并映射为tuple(字符串,整数)。然后,它按照第一个元素(即字符串)进行分组,并使用滑动窗口(窗口大小为10秒,滑动步长为5秒)进行聚合 - 在每个窗口内,所有具有相同键的值的整数部分被相加。最终结果会在控制台上打印。
基于计数的滑动窗口:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<String> text = env.socketTextStream("localhost", 9999);
- DataStream<Tuple2<String, Integer>> counts = text
- .flatMap(new Tokenizer())
- .keyBy(0)
- .countWindow(5, 1)
- .sum(1);
- counts.print().setParallelism(1);
- env.execute("Sliding Window WordCount");
- }
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- String[] words = value.toLowerCase().split("\\W+");
- for (String word : words) {
- if (word.length() > 0) {
- out.collect(new Tuple2<>(word, 1));
- }
- }
- }
- }
这段代码是实时滑动窗口词频统计程序。它从本地9999端口读取数据流,将接收到的每行文本拆分为单词然后输出为(单词,1)的形式,接着按照单词分组,使用大小为5,步长为1的滑动窗口,并对每个窗口中的同一单词出现次数进行求和,最后打印结果。
会话窗口是Flink中一种基于时间的窗口类型,每个窗口的大小不固定,且相邻两个窗口之间没有重叠,“会话”终止的标志就是隔一段时间没有数据进来
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<String, Long>> inputStream = env.fromElements(
- new Tuple2<>("user1", 1617229200000L),
- new Tuple2<>("user1", 1617229205000L),
- new Tuple2<>("user2", 1617229210000L),
- new Tuple2<>("user1", 1617229215000L),
- new Tuple2<>("user2", 1617229220000L)
- );
- SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = inputStream
- .keyBy(value -> value.f0)
- .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
- .sum(1);
- resultStream.print();
- env.execute("Session Window Example");
- }
这段代码从一个数据流中读取用户活动数据(包含用户ID和Unix时间戳),然后根据用户ID将数据进行分组,并应用了一个会话窗口(当用户五分钟内无活动则关闭该用户的窗口)。
然后,它对每个用户在各自窗口内的活动时间戳求和,并打印出结果。最后执行的名为"Session Window Example"的任务即完成了这一流式计算过程。
在Flink中,数据流可以按键分区(keyed)和非按键分区(non-keyed)。
按键分区是指将数据流根据特定的键值进行分区,使得相同键值的元素被分配到同一个分区中。这样可以保证相同键值的元素由同一个worker实例处理。只有按键分区的数据流才能使用键分区状态和计时器。
非按键分区是指数据流没有根据特定的键值进行分区。这种情况下,数据流中的元素可以被任意分配到不同的分区中。
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)来开窗,还是直接在没有按键分区的DataStream上开窗。也就是在调用窗口算子之前是否有keyBy操作。
按键分区窗口:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<String> text = env.socketTextStream("localhost", 9999);
- DataStream<Tuple2<String, Integer>> counts =
- // 将输入字符串拆分为tuple类型,包含word和数量
- text.map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String value) {
- return new Tuple2<>(value, 1);
- }
- })
- // 根据元组的第一字段(word)进行分区键
- .keyBy(0)
- // 定义一个滚动窗口,时间间隔为5秒
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
- // 应用reduce函数,累加各个窗口中同一单词的数量
- .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
- return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
- }
- });
- counts.print();
- env.execute("Window WordCount");
这段代码从 localhost 的 9999 端口接收数据流,将输入的每个字符串作为一个单词和数字 1 的 tuple 对象,然后根据单词进行分区,创建一个滚动窗口(间隔为5秒),并在每个窗口中对同一单词的数量进行累加统计,最后打印出结果。
非按键分区窗口:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<String> text = env.socketTextStream("localhost", 9999);
- DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() {
- @Override
- public Integer map(String value) {
- return Integer.parseInt(value);
- }
- });
- DataStream<Integer> windowCounts = parsed
- .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
- .reduce(new ReduceFunction<Integer>() {
- @Override
- public Integer reduce(Integer value1, Integer value2) {
- return value1 + value2;
- }
- });
- windowCounts.print().setParallelism(1);
- env.execute("Non keyed Window example");
- }
这段程序从localhost的9999端口读取数据流,把每条数据转化为整数,然后在5秒的滚动窗口内将所有的整数值进行累加,并打印出结果。
所谓的“窗口函数”(window functions),就是定义窗口如何进行计算操作的函数
窗口函数根据处理的方式可以分为两类:「增量窗口聚合函数」和「全窗口聚合函数」。
增量窗口聚合函数每来一条数据就立即进行计算,中间保持着聚合状态,但是不立即输出结果,等到窗口到了结束时间需要输出计算结果的时候,取出之前聚合的状态直接输出。
常见的增量聚合函数有:reduce()
、aggregate()
、sum()
、min()
、max()
。
下面是一个使用增量聚合函数的代码示例:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Long> data = env.fromSequence(1,Long.MAX_VALUE);
- DataStream<Long> result = data.keyBy(value -> value % 2)
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
- .aggregate(new SumAggregator());
- result.print();
- env.execute("Incremental Aggregation Job");
- }
-
- public static class SumAggregator implements AggregateFunction<Long, Long, Long> {
- @Override
- public Long createAccumulator() {
- return 0L;
- }
- @Override
- public Long add(Long value, Long accumulator) {
- return value + accumulator;
- }
- @Override
- public Long getResult(Long accumulator) {
- return accumulator;
- }
- @Override
- public Long merge(Long a, Long b) {
- return a + b;
- }
- }
这段代码从1到Long.MAX_VALUE
产生一个连续的数据流。接着,它将数据按照奇偶性进行分类,并在每个5秒的时间窗口内对相同类别的数值进行累加操作。最后打印出累加结果。
全窗口函数是指在整个窗口中的所有数据都准备好后才进行计算。
Flink中的全窗口函数有两种: WindowFunction
和ProcessWindowFunction
。
与增量窗口函数不同,全窗口函数可以访问窗口中的所有数据,因此可以执行更复杂的计算。例如,可以计算窗口中数据的中位数,或者对窗口中的数据进行排序。
WindowFunction接收一个Iterable类型的输入,其中包含了窗口中所有的数据。ProcessWindowFunction则更加强大,它不仅可以访问窗口中的所有数据, 还可以获取到一个“上下文对象”(Context)。
这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(Processing Time)和事件时间水位线(Event Time Watermark)。
这就使得 ProcessWindowFunction 更加灵活、功能更加丰富,WindowFunction作用可以被 ProcessWindowFunction 全覆盖。
不过这种额外的功能可能会带来一些性能上的损失,因此只有当你确实需要这些额外功能时,才应该使用ProcessWindowFunction,如果你不需要这些功能,“简单”的WindowFunction可能会更有效率。
下面是使用 WindowFunction 计算窗口内数据总和的代码示例:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<String> text = env.fromElements("a", "b", "c", "a", "b", "b");
- DataStream<String> withTimestampsAndWatermarks = text.assignTimestampsAndWatermarks(
- WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(100))
- .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
- );
- DataStream<Tuple2<String, Integer>> mapped = withTimestampsAndWatermarks.map(
- new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String value) {
- return new Tuple2<>(value, 1);
- }
- });
- mapped.keyBy(0)
- .timeWindow(Time.seconds(5))
- .apply(new SumWindowFunction())
- .print();
- env.execute("Window Sum");
- }
下面是一个使用ProcessWindowFunction统计网站1天UV的代码示例:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple2<String, Integer>> data = env.fromElements(
- new Tuple2<>("user1", 1),
- new Tuple2<>("user2", 1),
- new Tuple2<>("user1", 1));
- data = data.assignTimestampsAndWatermarks(WatermarkStrategy
- .<Tuple2<String,Integer>>forMonotonousTimestamps()
- .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis())
- );
- data.keyBy(0)
- .window(TumblingEventTimeWindows.of(Time.days(1)))
- .process(new UVProcessWindowFunction())
- .print();
- env.execute("Daily User View Count");
- }
-
- public static class UVProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Long>, Tuple, TimeWindow> {
- @Override
- public void process(Tuple key, Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Long>> out){
- long count = 0;
- BloomFilter<CharSequence> bloomFilter = BloomFilter.create(Funnels.stringFunnel(), 100000, 0.01);
- for (Tuple2<String, Integer> in: input) {
- if (!bloomFilter.mightContain(in.f0)) {
- count += 1;
- bloomFilter.put(in.f0);
- }
- }
- out.collect(new Tuple2<>(key.getField(0), count));
- }
- }
这段代码从数据流中读取用户视图数据(数据为("user", view_count)),然后对每个用户的观看次数实现了基于时间窗口(一天)的统计。利用布隆过滤器并在窗口内去重,可以避免重复计数。最后,每个窗口结束时,它会输出每个用户的id和相应的不重复观看次数。
全窗口函数为处理提供了更多的背景信息,因为它需要等到收集完所有窗口内的数据才进行计算,但是全窗口函数可能会增加系统的复杂性和运行时间。
另一方面,增量窗口函数可以在数据进入窗口时进行部分聚合计算,从而提高效率,但是它可能不适用于所有类型的计算,例如中位数或者标准差这种需要全部数据的计算就无法使用增量聚合。
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
之前在调用 WindowedStream 的reduce()
和aggregate()
方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction
或者ProcessWindowFunction
。
- // ReduceFunction 与 WindowFunction 结合
- public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)
-
- // ReduceFunction 与 ProcessWindowFunction 结合
- public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)
-
- // AggregateFunction 与 WindowFunction 结合
- public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
-
- // AggregateFunction 与 ProcessWindowFunction 结合
- public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。
需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了
下面我们举一个具体的实例来说明:
在网站的各种统计指标中,一个很重要的统计指标就是热门的链接,想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次。
我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果,代码示例如下:
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<String> text = env.socketTextStream("localhost", 9999);
- DataStream<Tuple2<String, Long>> urlCounts = text
- .flatMap(new Tokenizer())
- .keyBy(value -> value.f0)
- .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
- .aggregate(new CountAgg(), new WindowResultFunction());
- urlCounts.print();
- env.execute("UrlCount Job");
- }
-
- public static class CountAgg implements AggregateFunction<Tuple2<String, Integer>, Long, Long> {
- @Override
- public Long createAccumulator() {
- return 0L;
- }
-
- @Override
- public Long add(Tuple2<String, Integer> value, Long accumulator) {
- return accumulator + value.f1;
- }
-
- @Override
- public Long getResult(Long accumulator) {
- return accumulator;
- }
-
- @Override
- public Long merge(Long a, Long b) {
- return a + b;
- }
- }
-
- public static class WindowResultFunction implements WindowFunction<Long, Tuple2<String, Long>, String, TimeWindow> {
- @Override
- public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<Tuple2<String, Long>> out) {
- Long count = input.iterator().next();
- out.collect(new Tuple2<>(key, count));
- }
- }
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- String[] words = value.toLowerCase().split("\\W+");
- for (String word : words) {
- if (word.length() > 0) {
- out.collect(new Tuple2<>(word, 1));
- }
- }
- }
- }
在这个示例中,我们首先把数据根据 URL 进行了分组 (keyBy),然后定义了一个滑动窗口,窗口长度是10秒,每5秒滑动一次。接着我们使用增量聚合函数 CountAgg
对每个窗口内的元素进行聚合,最后用全窗口函数 WindowResultFunction
输出结果。
窗口重叠是指在使用滑动窗口时,多个窗口之间存在重叠部分。这意味着同一批数据可能会被多个窗口同时处理。
例如,假设我们有一个数据流,它包含了0到9的整数。我们定义了一个大小为5的滑动窗口,滑动距离为2。那么,我们将会得到以下三个窗口:
窗口1:包含0, 1, 2, 3, 4
窗口2:包含2, 3, 4, 5, 6
窗口3:包含4, 5, 6, 7, 8
在这个例子中,窗口1和窗口2之间存在重叠部分,即2, 3, 4。同样,窗口2和窗口3之间也存在重叠部分,即4, 5, 6。
enableOptimizeWindowOverlap()
方法是用来启用Flink的窗口重叠优化功能的。它可以减少计算重叠窗口时的计算量。
在我之前给出的代码示例中,我没有使用enableOptimizeWindowOverlap()
方法来启用窗口重叠优化功能。这意味着Flink不会尝试优化计算重叠窗口时的计算量。
如果你想使用窗口重叠优化功能,你可以在你的代码中添加以下行:
env.getConfig().enableOptimizeWindowOverlap();
这将启用窗口重叠优化功能,Flink将尝试优化计算重叠窗口时的计算量。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。