赞
踩
在了解TumblingWindow之前,首先让我们对当涉及到流处理或流计算时的“窗口”有一个基本的了解。在数据流中,有一个持续生成数据的源,这使得计算最终值变得不可行。
“窗口”定义了无界流上的有限元素集,我们可以在其上应用计算。这个集合可以基于时间、元素计数、计数和时间的组合,或者一些自定义逻辑来为窗口分配元素。例如:
流框架厂商实现了一个多种类型的“窗口”定义。Flink有三种类型
本文将重点讨论其中的第一种。
这个窗口很容易理解,也很容易上手。它是一个固定大小的窗口,其中窗口大小可以是时间(30秒,5分钟),也可以是计数(100个元素)。
5分钟的时间窗口将收集窗口中到达的所有元素,并在5分钟后对其进行计算。每五分钟将启动一个新窗口。计数窗口100将收集窗口中的100个元素,并在添加第100个元素时计算窗口的值。最重要的是,窗口之间没有重叠,也没有重复的元素。每个元素只分配给一个窗口。如果指定了一个key,那么Flink将对流进行逻辑分区,并为每个keyed元素运行并行窗口操作。
让我们看一个例子来更好地理解它们。这里我们使用一个简单的“IntegerGeneratorSource”类作为一个数据源,每秒钟生成一个整数(从1开始)。下面的代码行初始化一个本地Flink环境并创建一个DataStream对象。
- // 设置流执行环境
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 添加数据源
- DataStream<String> initStream = env.addSource(new IntegerGenerator());
(1)Tumbling Time window
下面是计算滚动时间窗口的代码。
- initStream.timeWindowAll(Time.seconds(5))
- .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
- @Override
- public void process(Context context, Iterable<Integer> input, Collector<Integer> output) throws Exception {
- logger.info("Computing sum for {}", input);
- int sum = 0;
- for(int i : input){
- sum += i;
- }
- output.collect(sum);
- }
- })
- .print();
-
- env.execute("flink TumblingWindow");
注意: ProcessAllWindowFunction将允许Flink缓冲内存中一个窗口的所有元素,然后将整个元素传递给计算。这就是为什么有一个Iterable<>对象作为process()的输入参数。
运行的部分日志信息如下图:
process()
方法在所有的元素 [1, 2, 3]上被调用,结果'6'被打印输出到控制台。(2)Tumbling Count window
下面是计算滚动计数窗口的代码。定义一个包含四个元素的滚动窗口(按计数固定大小)。
- initStream.countWindowAll(4)
- .reduce(new ReduceFunction<Integer>() {
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- logger.info("Reducing {} and {}", value1, value2);
- return value1 + value2;
- }
- }).print();
-
- env.execute("flink TumblingWindow");
注意: ReduceFunction将让Flink执行增量计算(滚动更新)。与ProcessFunction相比,内存占用非常小。第一个参数是前一个窗口的计算值,第二个参数是分配给这个窗口的当前元素。
执行过程中的日志信息如下所示:
继续在上一小节中“Tumbling Time window”上修改代码如下:
- initStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
- .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
- @Override
- public void process(Context context, Iterable<Integer> input, Collector<Integer> output) throws Exception {
- logger.info("Computing sum for {}", input);
- int sum = 0;
- for(int i : input){
- sum += i;
- }
- output.collect(sum);
- }
- })
- .print();
可以注意到,这里只修改了第一行,将"timeWindowAll(Time.seconds(5))"替换为更详细的 "windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2) ))"
timeWindowAll()是一个包装器方法,默认为windowAll(TumblingProcessingTimeWindows.of(size)),也就是一个按时间固定大小的窗口(这个时间是系统运行Flink作业的时间,即处理时间)。
默认情况下,Flink在时钟边界处启动窗口,但是使用windowAll()的第二个参数,我们可以自定义时钟边界。
执行过程中的日志信息如下所示:
Flink启动一个窗口,收集整数。然而,在"20:12:27",这个窗口关闭并触发对[1]的求和计算。然后总和被打印在控制台。
注意,如果没有提供偏移量,那么Flink会在"20:12:25"关闭窗口。但是由于偏移量是2秒,这使得窗口在超出时钟边界2秒处结束,即"20:12:27"。
到目前为止,Flink执行作业所采用的时间为默认系统时间,即事件处理时间(Process Time)。然而,在许多应用场景中,我们希望使用事件的实际时间,即事件在事件源中创建的时间,这称为“事件时间(Event Time)”。
在事件时间中,Flink会根据元素本身的时间戳将元素分组到windows中,而不是任何系统时钟。请看下面这个例子。
首先定义一个名为“Element”的POJO类,用来表示数据流中的事件类型。
- public class Element {
- public Integer value;
- public Long timestamp;
-
- public Element(){}
-
- public Element(Integer value, Long timestamp) {
- this.value = value;
- this.timestamp = timestamp;
- }
-
- @Override
- public String toString() {
- return "Element{" +
- "value=" + value +
- '}';
- }
- }

接下来,定义一个简单的数据源类“ElementGeneratorSource”,它将创建Element类型的事件对象并分配随机递增时间戳(这是为了确保不产生具有匹配系统时间的元素)。在生产环境下,时间戳是事件本身的一部分。
- package com.xueai8.ch03.source;
-
- import com.xueai8.ch03.entity.Element;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.time.Instant;
- import java.time.LocalDateTime;
- import java.time.ZoneId;
- import java.util.concurrent.ThreadLocalRandom;
-
- /**
- * Created by www.xueai8.com
- * 自定义数据源
- */
- public class ElementGeneratorSource implements SourceFunction<Element> {
-
- volatile boolean isRunning = true;
- final Logger logger = LoggerFactory.getLogger(ElementGeneratorSource.class);
-
- @Override
- public void run( SourceContext<Element> ctx ) throws Exception{
- int counter = 1;
-
- // flink程序启动20秒后
- long eventStartTime = System.currentTimeMillis() - 20000;
-
- // 使用上面的时间戳创建第一个事件
- Element element = new Element(counter++, eventStartTime);
-
- while( isRunning ){
- logger.info("Produced Element with value {} and timestamp {}", element.value, printTime(element.timestamp));
- ctx.collect( element );
-
- // 创建元素并分配具有随机性的时间戳,以便它们不与当前系统时钟时间相同
- element = new Element(counter++, element.timestamp + ThreadLocalRandom.current().nextLong( 1000, 6000 ));
-
- Thread.sleep(1000);
- }
- }
-
- @Override
- public void cancel(){
- isRunning = false;
- }
-
- // 辅助函数以可读格式打印 epoch 时间
- String printTime(long longValue){
- return LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneId.systemDefault()).toString();
- }
-
- }

现在,我们定义一个管道,使用TumblingEventTime窗口处理这些元素。
- package com.xueai8.ch03;
-
- import com.xueai8.ch03.entity.Element;
- import com.xueai8.ch03.source.ElementGeneratorSource;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
- import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- /**
- * Created by www.xueai8.com
- *
- * TumblingWindow:翻滚窗口(或者叫滚动窗口)
- * 使用事件时间(Event Time)
- */
- public class TumblingWindowDemo2 {
-
- public static void main(String[] args) throws Exception {
- Logger logger = LoggerFactory.getLogger(TumblingWindowDemo2.class);
-
- // 设置流执行环境
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 设置为 EventTime,否则默认为 ProcessTime
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );
-
- elementStream
- // 在定义窗口之前,需要告诉Flink如何获取它接收到的每个元素的时间戳和水印
- .assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>(){
- @Override
- public long extractAscendingTimestamp( Element element ){
- return element.timestamp;
- }
-
- })
- // 定义一个TumblingEventTimeWindows类型的窗口,大小为10秒
- .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
- .process(new ProcessAllWindowFunction<Element, Integer ,TimeWindow>(){
- @Override
- public void process( Context arg0, Iterable<Element> input, Collector<Integer> output )
- throws Exception {
- logger.info( "Computing sum for {}", input );
- int sum = 0;
- for(Element e : input) {
- sum += e.value;
- }
- output.collect( sum );
-
- }
- })
- .print();
-
- env.execute();
- }
- }

"AscendingTimestampExtractor"是一种时间戳分配程序和水印生成器,用于时间戳单调递增的流。使用这种由flink提供的API的另一个好处是,它将为我们生成水印。水印是一种让Flink知道何时关闭当前窗口的方法(属于窗口的最后一个元素已经到达)。
简而言之,assignTimestampsAndWatermarks()方法将允许Flink知道如何从事件/元素读取到Flink的时间戳,最重要的是,如何计算水印。
上面示例代码执行时输出的日志信息如下所示:
从图中可以看出,在第一个窗口中,生成的元素的时间戳与系统时钟是不同的(每一行日志先打印系统时钟,最右侧是事件时间)
当在"2020-05-14T20:20:03.400"生成第三个元素时,它触发当前窗口关闭,因为水印已经到达了。在一个10秒钟的TimeWindow中,结束时间将会是"2020-05-14T20:20:00.000"。因此当前窗口只收集前两个值。
在一下轮窗口计算中,第二个窗口将在"2020-05-14T20:20:10.000"关闭,这意味着在第二个窗口中值3、4、5将会被收集,因为值6这个元素"timestamp >= current watermark"。
在本文中,我们观察了两种类型的tumblr窗口(时间vs计数)及其默认行为。我们还看到了两个窗口函数,ProcessAllFunction和ReduceFunction用于累加和增量计算。
另外,我们还讨论了重写默认时间时钟边界以及如何使用TumblingEventTimeWindow。我们还看到了一个为元素分配时间戳的例子。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。