赞
踩
官方对Window的介绍:
Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations.
窗口是处理无界流的核心所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以计算处理。
通俗点说,就是按固定时间或长度将无限数据流切分成不同的窗口,我们就可以对窗口内截取的数据使用一些计算函数进行处理,从而得到一定范围内的统计结果。
Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构,只有一点区别:keyed streams 要调用 keyBy(...)后再调用 window(...) , 而 non-keyed streams 只用直接调用 windowAll(...)。具体如下:
1、Keyed Windows
2、Non-Keyed Windows
分析:
使用 keyed stream 允许你的窗口计算由多个 task 并行(原始流会被分割为多个逻辑上的流),因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。
但是对于 non-keyed stream,原始流不会被分割为多个逻辑上的流, 所以所有的窗口计算会被同一个 task 完成,也就是并行度为 1,会影响性能。
Flink提供了多种窗口来满足大部分的使用场景。如:滚动窗口( Tumbling Window)、滑动窗口(Sliding Window)、会话窗口( Session Window)和 全局窗口(Global Windows)。
滚动窗口是将数据分配到指定窗口中,滚动窗口的大小是固定的,且各自范围之间不重叠。可以在细分为滚动时间窗口和滚动计数窗口。
使用场景:适用于按照指定的周期来统计指标。
例如:指定滚动窗口的大小为 5 秒钟,那么每 5 秒钟就会有一个窗口被计算,并且创建一个新的窗口。
代码使用:
- DataStream<T> input = ...;
-
- // tumbling event-time windows
- input
- .keyBy(<key selector>)
- .window(TumblingEventTimeWindows.of(Time.seconds(5)))
- .<windowed transformation>(<window function>);
-
- // tumbling processing-time windows
- input
- .keyBy(<key selector>)
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
- .<windowed transformation>(<window function>);
-
- // daily tumbling event-time windows offset by -8 hours.
- input
- .keyBy(<key selector>)
- .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
- .<windowed transformation>(<window function>);
时间间隔可以用 Time.milliseconds(x)
、Time.seconds(x)
、Time.minutes(x)
等来指定。
滑动窗口将数据分配到大小固定且允许相互重叠的桶中,这意味着每个数据有可能同时属于多个桶。窗口大小可以通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。可以在细分为时间滑动窗口和计数滑动窗口。
使用场景是:根据指定的统计周期来计算指定窗口时间大小的指标。
例如:每隔5秒钟,计算一次前10秒的数据(窗口大小为10,滑动距离为5,每5s得到一个新的窗口, 里面包含之前 10s到达的数据)。
窗口大小 > 滑动距离时,窗口之间有重叠,前2个窗口是下图的window1和window2。
窗口大小 = 滑动距离时,也就是滚动窗口了,前2个窗口是下图的window1和window3。
窗口大小 < 滑动距离时,窗口之间不会重叠,前2个窗口是下图的window1和window4,这种设置会遗漏数据。
代码使用:
- DataStream<T> input = ...;
-
- // 滑动 event-time 窗口
- input
- .keyBy(<key selector>)
- .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
- .<windowed transformation>(<window function>);
-
- // 滑动 processing-time 窗口
- input
- .keyBy(<key selector>)
- .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
- .<windowed transformation>(<window function>);
-
- // 滑动 processing-time 窗口,偏移量为 -8 小时
- input
- .keyBy(<key selector>)
- .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
- .<windowed transformation>(<window function>);
会话窗口把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭(即在一段不活跃的间隔之后)。 会话窗口可以设置固定的会话间隔(session gap)定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
会话窗口在一些常见的真实场景中非常有用,这些场景既不适合用滚动窗口,也不适用滑动窗口。
代码使用:
- DataStream<T> input = ...;
-
- // 设置了固定间隔的 event-time 会话窗口
- input
- .keyBy(<key selector>)
- .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
- .<windowed transformation>(<window function>);
-
- // 设置了动态间隔的 event-time 会话窗口
- input
- .keyBy(<key selector>)
- .window(EventTimeSessionWindows.withDynamicGap((element) -> {
- // 决定并返回会话间隔
- }))
- .<windowed transformation>(<window function>);
-
- // 设置了固定间隔的 processing-time session 窗口
- input
- .keyBy(<key selector>)
- .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
- .<windowed transformation>(<window function>);
-
- // 设置了动态间隔的 processing-time 会话窗口
- input
- .keyBy(<key selector>)
- .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
- // 决定并返回会话间隔
- }))
- .<windowed transformation>(<window function>);
全局窗口是将拥有相同 key 的所有数据分发到一个全局窗口。 窗口需要你指定了自定义的 trigger 时才有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。
代码使用:
- DataStream<T> input = ...;
-
- input
- .keyBy(<key selector>)
- .window(GlobalWindows.create())
- .<windowed transformation>(<window function>);
需求:每隔5s统计一次,统计最近5s内单词出现的频次
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
-
-
- public class TestTumblingTimeWindows{
-
- public static void main(String[] args) throws Exception {
- //1.创建流环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //2、获取数据
- DataStream<String> source = env.socketTextStream("node1", 9000);
-
- DataStream<Tuple2<String, Integer>> windowCounts = source
- .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
- for (String word : value.split("\\s")) {
- out.collect(Tuple2.of(word, 1));
- }
- })
- //.keyBy(0) //过时
- .keyBy(t -> t.f0)
- //滚动窗口
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
- .sum(1);
-
-
- windowCounts.print();
-
- env.execute("TestTumblingTimeWindows");
- }
-
- }
需求:每隔5s统计一次,统计最近10s内单词出现的频次
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
-
-
- public class TestSlidingTimeWindows{
-
- public static void main(String[] args) throws Exception {
- //1.创建流环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //2、获取数据
- DataStream<String> source = env.socketTextStream("node1", 9000);
-
- DataStream<Tuple2<String, Integer>> windowCounts = source
- .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
- for (String word : value.split("\\s")) {
- out.collect(Tuple2.of(word, 1));
- }
- })
- //.keyBy(0) //过时
- .keyBy(t -> t.f0)
- //滑动窗口
- .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
- .sum(1);
-
-
- windowCounts.print();
-
- env.execute("TestSlidingTimeWindows");
- }
-
- }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。