赞
踩
在流式数据处理的过程中,有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。
用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。
在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。当产生于2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线,随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。
水位线就像它的名字所表达的,是数据流中的一部分,随着数据一起流动,在不同任务之
间传输。
注意:只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
设置延长时间
在乱序的情况下,我们无法正确处理“迟到”的数据。为了让窗口能正确收集到迟到的数据,我们可以等上几秒,也就是用当前已有数据的最大时间戳减去几秒,就是要插入的水位线的时间戳。
如何设置水位线
- .assignTimestampsAndWatermarks(
- WatermarkStrategy
- .forBoundedOutOfOrderness[(String, Long)](Duration.ofSeconds(5)) //设置5秒延迟
- .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
- override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = element._2
- })
- )
水位线的传递
在实际应用中往往上下游都有多个并行子任务,为了统一推进事件时间的进展,我们要求上游任务处理完水位线、时钟改变之后,要把当前的水位线广播给所有的下游任务。这样,后续任务就不需要依赖原始数据中的时间戳,也可以知道当前事件时间了。
上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
水位线的总结
然而如果我们采用事件时间语义,就会有些费解了。由于有乱序数据,我们需要设置一个
延迟时间来等所有数据到齐。我们可以设置延迟时间为 2 秒。
但是这样一来,0~10 秒的窗口不光包含了迟到的 9 秒数据,连 11 秒和 12 秒的数据也包含进去了。我们为了正确处理迟到数据,结果把早到的数据划分到了错误的窗口——最终结果都是错误的。
在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下,我们应该把窗口理解成一个“桶”。在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
我们最容易想到的就是按照时间段去截取数据,这种窗口就叫作“时间窗口”(Time Window)。这在实际应用中最常见。除了由时间驱动之外,窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这种窗口叫作“计数窗口”(Count Window)。
(1)滚动窗口(Tumbling Windows)
滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。
滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计。
(2)滑动窗口(Sliding Windows)
滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的,所以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。例如,我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口,那么就会统计 1 小时内的数据,每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据个数定义。
(3)会话窗口(Session Windows)
数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。会话窗口只能基于时间来定义。在 Flink 底层,对会话窗口的处理会比较特殊:每来一个新的数据,都会创建一个新的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge)操作。在 Window 算子中,对会话窗口会有单独的处理逻辑。
(4)全局窗口(Global Windows)
“全局窗口”,这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream
来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,
是否有 keyBy 操作。
(1)按键分区窗口(Keyed Windows)
经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
- stream.keyBy(...)
- .window(...)
(2)非按键分区(Non-Keyed Windows)
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。
stream.windowAll(...)
有了前置的基础,接下来我们就可以真正在代码中实现一个窗口操作了。简单来说,窗口
操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
- stream.keyBy(<key selector>)
- .window(<window assigner>)
- .aggregate(<window function>)
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。窗口分配器其实就是在指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner 作为参数,返 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。
(1)滚动处理时间窗口
窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。
- stream.keyBy(...)
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
- .aggregate(...)
(2)滑动处理时间窗口
窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。
- stream.keyBy(...)
- .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
- .aggregate(...)
(3)处理时间会话窗口
窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。
- stream.keyBy(...)
- .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
- .aggregate(...)
(4)滚动事件时间窗口
窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。
- stream.keyBy(...)
- .window(TumblingEventTimeWindows.of(Time.seconds(5)))
- .aggregate(...)
(5)滑动事件时间窗口
窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。
- stream.keyBy(...)
- .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
- .aggregate(...)
(6)事件时间会话窗口
窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。
- stream.keyBy(...)
- .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
- .aggregate(...)
(1)滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。
- stream.keyBy(...)
- .countWindow(10)
(2)滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。
- stream.keyBy(...)
- .countWindow(10,3)
- env.addSource(consumer)
- .map(f => {
- println(f)
- User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
- })
- .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
- override def extractAscendingTimestamp(element: User): Long = element.timestamp
- })
- .keyBy(_.userId)
- .window(TumblingEventTimeWindows.of(Time.seconds(10)))
- // reduce 返回的类型,应该和输入的类型一样
- // 这里统计的是每个窗口,每个userId 出现的次数,timestamp 是没用的,给了0值
- .reduce { (v1, v2) => User(v1.userId, v1.count + v2.count, 0) }
- .print()
AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数,一个输入类型(IN),一个累加器(ACC),一个输出类型(OUT)。输入类型,就是输入流的类型。接口中有一个方法,可以把输入的元素和累加器累加。并且可以初始化一个累加器,然后把两个累加器合并成一个累加器,获得输出结果。
- env.addSource(consumer)
- .map(f => {
- println(f)
- User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
- })
- .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
- override def extractAscendingTimestamp(element: User): Long = element.timestamp
- })
- .keyBy(_.userId)
- .window(TumblingEventTimeWindows.of(Time.seconds(10)))
- // 使用 aggregate 来计算
- .aggregate(new MyAggregateFunction)
- .print()
-
-
-
- class MyAggregateFunction extends AggregateFunction[User, User, (String, Int)] {
- override def createAccumulator(): User = User("", 0, 0)
-
- override def add(value: User, accumulator: User): User = User(value.userId, value.count + accumulator.count, 0)
-
- override def getResult(accumulator: User): (String, Int) = (accumulator.userId, accumulator.count)
-
- override def merge(a: User, b: User): User = User(a.userId, a.count + b.count, 0)
- }
ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性。
但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。
- env.addSource(consumer)
- .map(f => {
- println(f)
- User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
- })
- .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
- override def extractAscendingTimestamp(element: User): Long = element.timestamp
- })
- .keyBy(_.userId)
- .window(TumblingEventTimeWindows.of(Time.seconds(10)))
- // 使用 ProcessFunction 来处理整个窗口数据
- .process(new MyProcessFunction())
- .print()
-
- class MyProcessFunction extends ProcessWindowFunction[User, String, String, TimeWindow] {
- override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
- var count = 0
- // 遍历,获得窗口所有数据
- for (user <- elements) {
- println(user)
- count += 1
- }
- out.collect(s"Window ${context.window} , count : ${count}")
- }
- }
7.4、ProcessWindowFunction 结合 其他 函数一起计算
使用 ReduceFunction 和 AggregateFunction 进行增量计算,计算的结果输出给 ProcessWindowFunction,然后可以使用 context 附加输出一些元数据信息,比如当前窗口信息、当前水印、当前的processTime等等。
如下:我们使用 ReduceFunction 来计算 每个窗口的 count 最小值,然后输出最小值和这个窗口的开始时间:
- env.addSource(consumer)
- .map(f => {
- println(f)
- User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
- })
- .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
- override def extractAscendingTimestamp(element: User): Long = element.timestamp
- })
- .keyBy(_.userId)
- .window(TumblingEventTimeWindows.of(Time.seconds(10)))
- // 使用 reduce 和 processWindowFunction
- .reduce(new MyReduceFunction, new MyProcessFunction)
- .print()
-
-
- class MyReduceFunction extends ReduceFunction[User] {
- override def reduce(value1: User, value2: User): User = {
- if (value1.count > value2.count) value2
- else value1
- }
- }
-
- class MyProcessFunction extends ProcessWindowFunction[User, (Long, User), String, TimeWindow] {
- override def process(key: String, context: Context, elements: Iterable[User], out: Collector[(Long, User)]): Unit = {
- val min = elements.iterator.next
- out.collect((context.window.getStart, min))
- }
- }
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗
口函数,所以可以认为是计算得到结果并输出的过程。基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
- stream.keyBy(...)
- .window(...)
- .trigger(new MyTrigger())
- stream.keyBy(...)
- .window(...)
- .evictor(new MyEvictor())
Evictor 接口定义了两个方法:
evictBefore():定义执行窗口函数之前的移除数据操作
evictAfter():定义执行窗口函数之后的以处数据操作
默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。
在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线
(watermark)并不一定能保证时间戳更早的所有数据不会再来。当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。
- stream.keyBy(...)
- .window(TumblingEventTimeWindows.of(Time.hours(1)))
- .allowedLateness(Time.minutes(1))
我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。基于 WindowedStream 用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。
-
- SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
- .window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag)
- .aggregate(new MyAggregateFunction())
- DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
1. 窗口的创建
2. 窗口计算的触发
3. 窗口的销毁
设置水位线延迟时间
允许窗口处理迟到数据
将迟到数据放入窗口侧输出流
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。