当前位置:   article > 正文

Flink 中的 Window (窗口)_tumblingprocessingtimewindows

tumblingprocessingtimewindows

一、什么是Window?

官方对Window的介绍:

Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations.         

窗口是处理无界流的核心所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以计算处理。 

        通俗点说,就是按固定时间或长度将无限数据流切分成不同的窗口,我们就可以对窗口内截取的数据使用一些计算函数进行处理,从而得到一定范围内的统计结果。

二、基本结构

        Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构,只有一点区别:keyed streams调用 keyBy(...)后再调用 window(...) , 而 non-keyed streams 只用直接调用 windowAll(...)。具体如下:

1、Keyed Windows

2、Non-Keyed Windows

分析:

        使用 keyed stream 允许你的窗口计算由多个 task 并行(原始流会被分割为多个逻辑上的流),因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。

        但是对于 non-keyed stream,原始流不会被分割为多个逻辑上的流, 所以所有的窗口计算会被同一个 task 完成,也就是并行度为 1,会影响性能。

三、Window分类

        Flink提供了多种窗口来满足大部分的使用场景。如:滚动窗口( Tumbling Window)、滑动窗口(Sliding Window)、会话窗口( Session Window)和 全局窗口(Global Windows)。

1、滚动窗口(Tumbling Windows)

        滚动窗口是将数据分配到指定窗口中,滚动窗口的大小是固定的,且各自范围之间不重叠。可以在细分为滚动时间窗口滚动计数窗口

        使用场景:适用于按照指定的周期来统计指标。

        例如:指定滚动窗口的大小为 5 秒钟,那么每 5 秒钟就会有一个窗口被计算,并且创建一个新的窗口。

代码使用:

  1. DataStream<T> input = ...;
  2. // tumbling event-time windows
  3. input
  4. .keyBy(<key selector>)
  5. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  6. .<windowed transformation>(<window function>);
  7. // tumbling processing-time windows
  8. input
  9. .keyBy(<key selector>)
  10. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  11. .<windowed transformation>(<window function>);
  12. // daily tumbling event-time windows offset by -8 hours.
  13. input
  14. .keyBy(<key selector>)
  15. .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
  16. .<windowed transformation>(<window function>);

时间间隔可以用 Time.milliseconds(x)Time.seconds(x)Time.minutes(x) 等来指定。

2、滑动窗口(Sliding Windows)

        滑动窗口将数据分配到大小固定且允许相互重叠的桶中,这意味着每个数据有可能同时属于多个桶。窗口大小可以通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。可以在细分为时间滑动窗口计数滑动窗口

        使用场景是:根据指定的统计周期来计算指定窗口时间大小的指标。

        例如:每隔5秒钟,计算一次前10秒的数据(窗口大小为10,滑动距离为5,每5s得到一个新的窗口, 里面包含之前 10s到达的数据)。
        窗口大小 > 滑动距离时,窗口之间有重叠,前2个窗口是下图的window1和window2。
        窗口大小 = 滑动距离时,也就是滚动窗口了,前2个窗口是下图的window1和window3。
        窗口大小 < 滑动距离时,窗口之间不会重叠,前2个窗口是下图的window1和window4,这种设置会遗漏数据。

代码使用:

  1. DataStream<T> input = ...;
  2. // 滑动 event-time 窗口
  3. input
  4. .keyBy(<key selector>)
  5. .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  6. .<windowed transformation>(<window function>);
  7. // 滑动 processing-time 窗口
  8. input
  9. .keyBy(<key selector>)
  10. .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  11. .<windowed transformation>(<window function>);
  12. // 滑动 processing-time 窗口,偏移量为 -8 小时
  13. input
  14. .keyBy(<key selector>)
  15. .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
  16. .<windowed transformation>(<window function>);

3、会话窗口(Session Windows)

        会话窗口把数据按活跃的会话分组。 与滚动窗口滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭(即在一段不活跃的间隔之后)。 会话窗口可以设置固定的会话间隔(session gap)定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

        会话窗口在一些常见的真实场景中非常有用,这些场景既不适合用滚动窗口,也不适用滑动窗口。

代码使用:

  1. DataStream<T> input = ...;
  2. // 设置了固定间隔的 event-time 会话窗口
  3. input
  4. .keyBy(<key selector>)
  5. .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  6. .<windowed transformation>(<window function>);
  7. // 设置了动态间隔的 event-time 会话窗口
  8. input
  9. .keyBy(<key selector>)
  10. .window(EventTimeSessionWindows.withDynamicGap((element) -> {
  11. // 决定并返回会话间隔
  12. }))
  13. .<windowed transformation>(<window function>);
  14. // 设置了固定间隔的 processing-time session 窗口
  15. input
  16. .keyBy(<key selector>)
  17. .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
  18. .<windowed transformation>(<window function>);
  19. // 设置了动态间隔的 processing-time 会话窗口
  20. input
  21. .keyBy(<key selector>)
  22. .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
  23. // 决定并返回会话间隔
  24. }))
  25. .<windowed transformation>(<window function>);

4、全局窗口(Global Windows)

        全局窗口是将拥有相同 key 的所有数据分发到一个全局窗口。 窗口需要你指定了自定义的 trigger 时才有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

代码使用:

  1. DataStream<T> input = ...;
  2. input
  3. .keyBy(<key selector>)
  4. .window(GlobalWindows.create())
  5. .<windowed transformation>(<window function>);

四、例子

1、滚动窗口例子

需求:每隔5s统计一次,统计最近5s内单词出现的频次

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  6. import org.apache.flink.streaming.api.windowing.time.Time;
  7. public class TestTumblingTimeWindows{
  8. public static void main(String[] args) throws Exception {
  9. //1.创建流环境
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. //2、获取数据
  12. DataStream<String> source = env.socketTextStream("node1", 9000);
  13. DataStream<Tuple2<String, Integer>> windowCounts = source
  14. .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
  15. for (String word : value.split("\\s")) {
  16. out.collect(Tuple2.of(word, 1));
  17. }
  18. })
  19. //.keyBy(0) //过时
  20. .keyBy(t -> t.f0)
  21. //滚动窗口
  22. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  23. .sum(1);
  24. windowCounts.print();
  25. env.execute("TestTumblingTimeWindows");
  26. }
  27. }

 2、滑动窗口例子

需求:每隔5s统计一次,统计最近10s内单词出现的频次

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
  6. import org.apache.flink.streaming.api.windowing.time.Time;
  7. public class TestSlidingTimeWindows{
  8. public static void main(String[] args) throws Exception {
  9. //1.创建流环境
  10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. //2、获取数据
  12. DataStream<String> source = env.socketTextStream("node1", 9000);
  13. DataStream<Tuple2<String, Integer>> windowCounts = source
  14. .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
  15. for (String word : value.split("\\s")) {
  16. out.collect(Tuple2.of(word, 1));
  17. }
  18. })
  19. //.keyBy(0) //过时
  20. .keyBy(t -> t.f0)
  21. //滑动窗口
  22. .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
  23. .sum(1);
  24. windowCounts.print();
  25. env.execute("TestSlidingTimeWindows");
  26. }
  27. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/839788
推荐阅读
相关标签
  

闽ICP备14008679号