赞
踩
Keyed Windows stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag" ----------------------------------------------------------------------------------------------------------------------- Non-Keyed Windows stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag"
WindowAssigner 负责将每条输入的数据分发到正确的 window 中。这是通过 在window(…)(针对KeyedStream)或windowAll()(针对DataStream)传入不同的WindowAssigner来完成的。window() 方法接收的输入参数是一个 WindowAssigner。WindowAssigner负责将每个传入元素分配给一个或多个窗口(滑动窗口,有些元素可能需要复制到多个窗口中)。
Flink中根据比较常见的场景提供了一些WindowAssigner:tumbling windows, sliding windows, session windows and global windows 。也可以通过实现WindowAssigner class来自定义一些窗口分配器。
所有flink定义的窗口分配器(全局窗口除外)都是基于时间将元素分配给窗口。这个时间的定义可以是 processing time,也可以是event time。在生产需求中,大部分使用event time 。关于时间的定义以及水位的定义,会在后面的文章涉及到。
下面简单介绍下flink提供的常用的WindowAssigner
下面用的图显示了每种WindowAssigner的工作情况。紫色圆圈表示流的元素,这些元素由某个键(在这种情况下为用户1,用户2和用户3)划分。x轴显示时间进度。
tumbling windows, sliding windows, session windows and global windows
val input: DataStream[T] = ... // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // daily tumbling event-time windows offset by -8 hours. input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>)
通过静态方法TumblingEventTimeWindows.of来实例化TumblingEventTimeWindows类,可以通过源码看到TumblingEventTimeWindows构造是有两个参数
private TumblingProcessingTimeWindows(long size, long offset) {
...
}
时间间隔可以通过指定Time.milliseconds(x),Time.seconds(x), Time.minutes(x)
至于offset 参数,该参数可用于更改窗口的对齐方式。例如,如果没有偏移,则每小时滚动窗口与epoch对齐,即你将获得诸如的窗口 1:00:00.000 - 1:59:59.999,2:00:00.000 - 2:59:59.999等等。如果要更改,可以提供一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.999,2:15:00.000 - 3:14:59.999等
一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,必须指定的偏移量Time.hours(-8)。
import java.sql.DriverManager
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.functi
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。