当前位置:   article > 正文

深入理解Flink中的TumblingWindow_tumblingprocessingtimewindows

tumblingprocessingtimewindows

在了解TumblingWindow之前,首先让我们对当涉及到流处理或流计算时的“窗口”有一个基本的了解。在数据流中,有一个持续生成数据的源,这使得计算最终值变得不可行。

“窗口”定义了无界流上的有限元素集,我们可以在其上应用计算。这个集合可以基于时间、元素计数、计数和时间的组合,或者一些自定义逻辑来为窗口分配元素。例如:

  • 每分钟收到的订单数量(固定时间)
  • 完成最后100个订单的平均时间(固定元素)

框架厂商实现了一个多种类型的“窗口”定义。Flink有三种类型

  1. 翻滚窗口(又称为”滚动窗口“)
  2. 滑动窗口
  3. 会话窗口,

本文将重点讨论其中的第一种。

TumblingWindow

这个窗口很容易理解,也很容易上手。它是一个固定大小的窗口,其中窗口大小可以是时间(30秒,5分钟),也可以是计数(100个元素)。

preview

5分钟的时间窗口将收集窗口中到达的所有元素,并在5分钟后对其进行计算。每五分钟将启动一个新窗口。计数窗口100将收集窗口中的100个元素,并在添加第100个元素时计算窗口的值。最重要的是,窗口之间没有重叠,也没有重复的元素。每个元素只分配给一个窗口。如果指定了一个key,那么Flink将对流进行逻辑分区,并为每个keyed元素运行并行窗口操作。

让我们看一个例子来更好地理解它们。这里我们使用一个简单的“IntegerGeneratorSource”类作为一个数据源,每秒钟生成一个整数(从1开始)。下面的代码行初始化一个本地Flink环境并创建一个DataStream对象。

  1. // 设置流执行环境
  2. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // 添加数据源
  4. DataStream<String> initStream = env.addSource(new IntegerGenerator());

(1)Tumbling Time window

下面是计算滚动时间窗口的代码。

  1. initStream.timeWindowAll(Time.seconds(5))
  2. .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
  3. @Override
  4. public void process(Context context, Iterable<Integer> input, Collector<Integer> output) throws Exception {
  5. logger.info("Computing sum for {}", input);
  6. int sum = 0;
  7. for(int i : input){
  8. sum += i;
  9. }
  10. output.collect(sum);
  11. }
  12. })
  13. .print();
  14. env.execute("flink TumblingWindow");

注意: ProcessAllWindowFunction将允许Flink缓冲内存中一个窗口的所有元素,然后将整个元素传递给计算。这就是为什么有一个Iterable<>对象作为process()的输入参数。

运行的部分日志信息如下图:

 

  • 在第一个窗口关闭之前产生了三个整数。注意,虽然我们说的是窗口大小为5秒,但第一个窗口并没有运行5秒。原因是,在默认情况下,Flink将四舍五入到最近的时钟边界,在我们的例子中是在“19:47:35”。这触发了Flink TriggerWindow关闭当前窗口并将其传递到下一个步骤。
  • 代码中的 process()方法在所有的元素 [1, 2, 3]上被调用,结果'6'被打印输出到控制台。
  • 新窗口启动并收集下一组整数。在5秒后的“19:47:40”时,窗口关闭。所有收集到的数据都被发送给进程,进程打印接收到的整数并计算该窗口中数字的和= '30'。然后当前窗口这整数和被打印到控制台。

(2)Tumbling Count window

下面是计算滚动计数窗口的代码。定义一个包含四个元素的滚动窗口(按计数固定大小)。

  1. initStream.countWindowAll(4)
  2. .reduce(new ReduceFunction<Integer>() {
  3. @Override
  4. public Integer reduce(Integer value1, Integer value2) throws Exception {
  5. logger.info("Reducing {} and {}", value1, value2);
  6. return value1 + value2;
  7. }
  8. }).print();
  9. env.execute("flink TumblingWindow");

注意: ReduceFunction将让Flink执行增量计算(滚动更新)。与ProcessFunction相比,内存占用非常小。第一个参数是前一个窗口的计算值,第二个参数是分配给这个窗口的当前元素。

执行过程中的日志信息如下所示:

  • 每个窗口中,先收集前两个整数,然后Flink触发TriggerWindow用前两个元素调用reduce()方法。计算结果'3'被缓存在Flink中。
  • 数据源生成下一个整数'3'。
  • reduce()方法被调用,请注意,这里的第一个参数'3'是来自上一次计算的结果,而第二个参数'3'是数据源当前生成的整数。计算结果'6'被缓存在Flink中。
  • 数据源生成下一个整数'4'。
  • reduce()方法被调用,第一个参数'6'是来自上一次计算的结果,而第二个参数'4'是数据源当前生成的整数。计算结果'6'被缓存在Flink中。现在计算的值是10。此时,Flink已经从源中收集了4个整数,因此这个窗口的count条件已经满足。
  • 由于当前窗口计数大小已达到,Flink打印此窗口的值10(1+2+3+4)。
  • 一个新窗口启动,它等待来自源的下两个整数。
  • 使用一组新的数字(现在是5和6)调用reduce()。
  • 对后面的两个数字应用类似的逻辑,并相应地调用reduce()来执行增量更新。
  • 当Flink为当前窗口获取4个数字时,它调用print()并输出26 (5+6+7+8)。

自定义滚动窗口的时间偏移量

继续在上一小节中“Tumbling Time window”上修改代码如下:

  1. initStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
  2. .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
  3. @Override
  4. public void process(Context context, Iterable<Integer> input, Collector<Integer> output) throws Exception {
  5. logger.info("Computing sum for {}", input);
  6. int sum = 0;
  7. for(int i : input){
  8. sum += i;
  9. }
  10. output.collect(sum);
  11. }
  12. })
  13. .print();

可以注意到,这里只修改了第一行,将"timeWindowAll(Time.seconds(5))"替换为更详细的 "windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2) ))"

timeWindowAll()是一个包装器方法,默认为windowAll(TumblingProcessingTimeWindows.of(size)),也就是一个按时间固定大小的窗口(这个时间是系统运行Flink作业的时间,即处理时间)。

默认情况下,Flink在时钟边界处启动窗口,但是使用windowAll()的第二个参数,我们可以自定义时钟边界。

执行过程中的日志信息如下所示:

Flink启动一个窗口,收集整数。然而,在"20:12:27",这个窗口关闭并触发对[1]的求和计算。然后总和被打印在控制台。

注意,如果没有提供偏移量,那么Flink会在"20:12:25"关闭窗口。但是由于偏移量是2秒,这使得窗口在超出时钟边界2秒处结束,即"20:12:27"。

TumblingWindow使用事件时间

到目前为止,Flink执行作业所采用的时间为默认系统时间,即事件处理时间(Process Time)。然而,在许多应用场景中,我们希望使用事件的实际时间,即事件在事件源中创建的时间,这称为“事件时间(Event Time)”。

在事件时间中,Flink会根据元素本身的时间戳将元素分组到windows中,而不是任何系统时钟。请看下面这个例子。

首先定义一个名为“Element”的POJO类,用来表示数据流中的事件类型。

  1. public class Element {
  2. public Integer value;
  3. public Long timestamp;
  4. public Element(){}
  5. public Element(Integer value, Long timestamp) {
  6. this.value = value;
  7. this.timestamp = timestamp;
  8. }
  9. @Override
  10. public String toString() {
  11. return "Element{" +
  12. "value=" + value +
  13. '}';
  14. }
  15. }

接下来,定义一个简单的数据源类“ElementGeneratorSource”,它将创建Element类型的事件对象并分配随机递增时间戳(这是为了确保不产生具有匹配系统时间的元素)。在生产环境下,时间戳是事件本身的一部分。

  1. package com.xueai8.ch03.source;
  2. import com.xueai8.ch03.entity.Element;
  3. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.time.Instant;
  7. import java.time.LocalDateTime;
  8. import java.time.ZoneId;
  9. import java.util.concurrent.ThreadLocalRandom;
  10. /**
  11. * Created by www.xueai8.com
  12. * 自定义数据源
  13. */
  14. public class ElementGeneratorSource implements SourceFunction<Element> {
  15. volatile boolean isRunning = true;
  16. final Logger logger = LoggerFactory.getLogger(ElementGeneratorSource.class);
  17. @Override
  18. public void run( SourceContext<Element> ctx ) throws Exception{
  19. int counter = 1;
  20. // flink程序启动20秒后
  21. long eventStartTime = System.currentTimeMillis() - 20000;
  22. // 使用上面的时间戳创建第一个事件
  23. Element element = new Element(counter++, eventStartTime);
  24. while( isRunning ){
  25. logger.info("Produced Element with value {} and timestamp {}", element.value, printTime(element.timestamp));
  26. ctx.collect( element );
  27. // 创建元素并分配具有随机性的时间戳,以便它们不与当前系统时钟时间相同
  28. element = new Element(counter++, element.timestamp + ThreadLocalRandom.current().nextLong( 1000, 6000 ));
  29. Thread.sleep(1000);
  30. }
  31. }
  32. @Override
  33. public void cancel(){
  34. isRunning = false;
  35. }
  36. // 辅助函数以可读格式打印 epoch 时间
  37. String printTime(long longValue){
  38. return LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneId.systemDefault()).toString();
  39. }
  40. }

现在,我们定义一个管道,使用TumblingEventTime窗口处理这些元素。

  1. package com.xueai8.ch03;
  2. import com.xueai8.ch03.entity.Element;
  3. import com.xueai8.ch03.source.ElementGeneratorSource;
  4. import org.apache.flink.streaming.api.TimeCharacteristic;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
  8. import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
  9. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  10. import org.apache.flink.streaming.api.windowing.time.Time;
  11. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  12. import org.apache.flink.util.Collector;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. /**
  16. * Created by www.xueai8.com
  17. *
  18. * TumblingWindow:翻滚窗口(或者叫滚动窗口)
  19. * 使用事件时间(Event Time)
  20. */
  21. public class TumblingWindowDemo2 {
  22. public static void main(String[] args) throws Exception {
  23. Logger logger = LoggerFactory.getLogger(TumblingWindowDemo2.class);
  24. // 设置流执行环境
  25. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  26. // 设置为 EventTime,否则默认为 ProcessTime
  27. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  28. DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );
  29. elementStream
  30. // 在定义窗口之前,需要告诉Flink如何获取它接收到的每个元素的时间戳和水印
  31. .assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>(){
  32. @Override
  33. public long extractAscendingTimestamp( Element element ){
  34. return element.timestamp;
  35. }
  36. })
  37. // 定义一个TumblingEventTimeWindows类型的窗口,大小为10
  38. .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
  39. .process(new ProcessAllWindowFunction<Element, Integer ,TimeWindow>(){
  40. @Override
  41. public void process( Context arg0, Iterable<Element> input, Collector<Integer> output )
  42. throws Exception {
  43. logger.info( "Computing sum for {}", input );
  44. int sum = 0;
  45. for(Element e : input) {
  46. sum += e.value;
  47. }
  48. output.collect( sum );
  49. }
  50. })
  51. .print();
  52. env.execute();
  53. }
  54. }

"AscendingTimestampExtractor"是一种时间戳分配程序和水印生成器,用于时间戳单调递增的流。使用这种由flink提供的API的另一个好处是,它将为我们生成水印。水印是一种让Flink知道何时关闭当前窗口的方法(属于窗口的最后一个元素已经到达)。

简而言之,assignTimestampsAndWatermarks()方法将允许Flink知道如何从事件/元素读取到Flink的时间戳,最重要的是,如何计算水印。

上面示例代码执行时输出的日志信息如下所示:

从图中可以看出,在第一个窗口中,生成的元素的时间戳与系统时钟是不同的(每一行日志先打印系统时钟,最右侧是事件时间)

 

当在"2020-05-14T20:20:03.400"生成第三个元素时,它触发当前窗口关闭,因为水印已经到达了。在一个10秒钟的TimeWindow中,结束时间将会是"2020-05-14T20:20:00.000"。因此当前窗口只收集前两个值。

在一下轮窗口计算中,第二个窗口将在"2020-05-14T20:20:10.000"关闭,这意味着在第二个窗口中值3、4、5将会被收集,因为值6这个元素"timestamp >= current watermark"。

小结

在本文中,我们观察了两种类型的tumblr窗口(时间vs计数)及其默认行为。我们还看到了两个窗口函数,ProcessAllFunction和ReduceFunction用于累加和增量计算。

另外,我们还讨论了重写默认时间时钟边界以及如何使用TumblingEventTimeWindow。我们还看到了一个为元素分配时间戳的例子。

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号