当前位置:   article > 正文

24.flink windows 窗口,彻底站起来。

flink windows

摘要

本文的主要目的是学会flink window窗口编程,本文章不会对:滑动窗口,滚动窗口,sessoin窗口作深入的讲解,那不是本文的目的。本文在假设你对窗口有一定了解的基础的前提下,带你看看flink窗口编程。 在读本篇文章的基础上,你需要先阅读:flink 水位线彻底站起来
内容较多,无法一一道来, 欢迎留言讨论。

1.理解窗口

1.1 通俗的理解

回忆下批处理吧, 我们以日志数据来举例。一般来说我们的日志都是按照日期进行分类的, 比如:/log/2022/03/02/, 如果我想分析2022年三月二号的日志的话,我只需要加载上面的目录就行了, 而实际上这不就是一个窗口嘛? 这个窗口就是我们定义好的文件夹中的数据。 所以窗口不是流处理独有的概念,他并没有那么高深。 我们来给窗口下个定义: 窗口就是对数据精确划分的一个集合。
至于你怎么划分那是你的事了, 比如常见的根据时间划分, 也有的是根据数量划分, 甚至根据某些特性划分,划分的方式多了去了。 所有这些划分最终的目的,就是把聚集的数据 按照业务层需要的方式 去分割成一个个的小集合, 而这每个小集合都是一个窗口。
我为什么会将理解窗口作为一个小节,那是因为我也是小白过来的,明白小白的心态。理解了窗口的本质,会对后面的学习事半功倍。

2.flink 窗口对象

flink的窗口对象有两个实现类,见下图:
在这里插入图片描述

2.1 GlabalWindow

官网这么说的:A global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.
flink有一个全局窗口分配器(后面会讲分配器),该分配器将相同key(指的是keyedStream)流数据全部分配到一个全局窗口GlabalWindow中,GlabalWindow窗口必须指定触发器trigger,因为全局窗口分配器没有默认的触发器,所以必须指定触发器trigger。
看个例子,keyBy数据流有个方法。countWindow()大家知道countWindow不属于时间窗口,那么其只可能是GlobalWindow结合触发器trigger实现的,看源码:

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }
    //发现其底层就是调用了window方法,
    GlobalWindows.create()创建了全局窗口分配器
    然后指定trigger
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这个代码看不懂没关系,看了后面的分配器就懂了,此时只需要知道GlobalWindow没有触发器即可。要用GlobalWindow的时候开发者必须指定一个触发器,否则的话GlobalWindow永远不会被触发。

2.2 TimeWindow

时间窗口,可以说这是做常用的了吧,不管是滑动窗口,滚动窗口又或是sessoin窗口内部都是TimeWindow对象。
TimeWindow 对象有三个属性

  1. start
    窗口的开始时间
  2. end
    第一个刚好不属于该窗口的时间,其实指的是第二个窗口的开始时间。
    源码英文说的最贴切:first timestamp that does not belong to this window any more.
  3. maxtimestamp
    maxtimestamp = end-1, 我们知道end表示的时间其实已经越过当前窗口,而且是第一个越过当前窗口的时间,因此end-1则一定属于当前窗口。 所以maxtimestamp才是TimeWindow实际意义上的结束时间,而水位线trigger定时器也是基于maxtimestamp的

2.3 flink窗口怎么实现的

在看下面的介绍之前,其实flink的窗口可能和大部分的人理解的不一样,他并不是先构建一个携带时间范围的窗口对象,然后流数据过来的时候根据流数据的时间去判断其属于哪个窗口。 你是不是这么理解的,此处请对号入座。 网上很多人也是这么说的,为什么大家都这样说,因为这样说好理解,但是实际上flink内部不是这样实现的。 请往下看…

当流数据到来的时候flink通过窗口分配器WindowAssigner每个数据建立一个TimeWindow对象,同时每条数据再注册一个触发器trigger
flink会根据数据中的event time 和用户子自定义的窗口宽度size 生成TimeWindow对象的窗口范围[startTime, endTime] .
假如现在时间是15:01, 窗口宽度size=5, 然后连续过来三个流数据:
e1(event time=15:02),e2(event time=15:02),e3(event time=15:02)。然后这三个流数据会生成三个一一对应的TimeWindow对象。
注意了,flink根据e1,e2,e3的event time和 窗口宽度size=5 计算出这三个TimeWindow对象的窗口范围都为[15:00, 15:05], 这种计算方法是flink计算的,这个计算才是核心,有兴趣的建议看看源码。
这三个窗口对象的窗口时间范围都是[15:00, 15:05], 在此基础上flink会为这三个流数据都注册一个trigger定时触发器。 因为这三个元素的窗口时间范围一样, 因此这三个trigger触发器的时间也是一样的,然后等时间一到, 这三个对象就会同时参与计算, 这样的话这三个对象逻辑上就属于同一个窗口。下图是滑动窗口在event time下的trigger定时触发器源码:
这块比较拗口,我们再来理一理:

1.流中的每个元素到来, 都会分配一个TimeWindow对象和一个触发器Trigger
2.TimeWindow对象根据当前元素的event time以及窗口的宽度 来确定 窗口范围[starttime,endtime]
3.随着流元素的逐个到来, 紧挨着到来的流元素生成的TimeWindow对象的[starttime,endtime]可能一样
4.[starttime,endtime]一样的TimeWindow对象逻辑上属于一个窗口,这个逻辑上的窗口才是
大众理解的窗口
5.属于同一个逻辑窗口的流数据,对应的trigger也是一样的。为什么呢?因为这些元素的TimeWindow对象的窗口范围[starttime,endtime]是一样的, 而trigger对象需要的时间就是从maxtimestamp 其实(maxtimestamp=endtime-1)

在这里插入图片描述

2.4.flink 的TimeWindow窗口怎么触发的

触发逻辑

如果你2.3小节没看懂建议不要往下看了。到这里我假设2.3小节你已经明白。 每个流数据都有一个
TimeWindow对象和一个Trigger触发器,
当不同流元素TimeWindow的窗口时间范围[start,end]一样的时候,其触发器Trigger也是一样的,
既然Trigger一样那么时间一到就会同时开始处理。
此时他们属于同一个逻辑窗口, 不严谨的说法就是他们是属于一个窗口的。

flink时间窗口根据时间的特性分为:event time和process time,不管哪种窗口上面的触发逻辑都是一摸一样。不同的是 event time类型的Trigger触发器多了一个判断水位线的逻辑,process time的触发其实就是我上面说的触发逻辑,接下来聊聊event time的触发.
在这里插入图片描述

event time 模式下:Trigger对象解析数据的时候,除了注册了触发器之外,还加了了watermar>=TimeWindow.maxtimestamp 的判断。 步骤1:就是Trigger对象根据TimeWindow的 maxtimestamp注册的定时器,即便2不发生,窗口最终也会被触发。此步骤就是本小节开头说的触发逻辑。
步骤2:仔细看红色圈圈的代码,我们可以看到和水位线条进行比较的是窗口的maxtimestamp时间,而触发器的用的时间也是maxtimestamp, 如果watermar>=TimeWindow.maxtimestamp 成立的话,也会触发窗口计算。
有些读者迷茫了,会什么会计算两次。 那是因为2 是为了处理迟到数据的时候触发的计算, 而1则是正常的窗口计算.
默认情况下:迟到数据会被丢弃,当配置迟到时间:

window(TumblingEventTimeWindows.of(Time.seconds(timeWindowSize)))
.allowedLateness(Time.seconds(timeWindowAllowLater))//允许迟到时间
  • 1
  • 2

的时候,1 才会生效。 此时的逻辑是:触发器 触发一次窗口计算, 水位线时间越过 window.maxTimestamp+allowedLateness 的时候再重新计算一次。 这意味着计算了两次, 因此下游需要 将window starttime 和window endtime设置在主键中 以修正之前的窗口结果。
此时窗口的状态数据才会被真的清空,如果往后再来迟到数据,则 默认会丢弃。 为了处理再迟到的数据,此时可以采用侧流收集然后留着后续处理。

// UserSlotGame 是我自定义的类,用于接收kafka数据。
OutputTag<UserSlotGame> outputTag = new OutputTag<>("late", TypeInformation.of(UserSlotGame.class));
.window(TumblingEventTimeWindows.of(Time.seconds(timeWindowSize)))
                .allowedLateness(Time.seconds(timeWindowAllowLater))//允许迟到时间
                .sideOutputLateData(outputTag) //侧流收集
                .process(省略)
                
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3.窗口程序的代码结构

  1. Keyed wondow(键控流窗口)
stream
   .keyBy(...)              <-  keyed versus non-keyed windows
   .window(WindowAssigner)  <- 必须:窗口分配器,决定了流数据被分配到哪个窗口中
   .trigger(Trigger)        <-  可选: "触发器" (若不设置会选择默认值)
  .evictor(Evictor)         <-  可选: "清除器" (else no evictor)
  .allowedLateness(Time)    <-  可选: 允许元素迟到多久,默认为0意思是不允许迟到
 .sideOutputLateData(OutputTag)<-  可选: 迟到的数据输出到OutputTag管道中
 .reduce/aggregate/apply()  <-  必须: 落到窗口中的数据怎么处理
 .getSideOutput(OutputTag)  <-  可选: 获取迟到的数据
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  1. No-Keyed window(非键控流窗口)
stream
       .windowAll(WindowAssigner)     ->必须
      .trigger(Trigger)       	      ->可选
      .evictor(Evictor)    			  ->可选
      .allowedLateness(Time)          ->可选
      .sideOutputLateData(OutputTag)  ->可选
       .reduce/aggregate/apply()     ->必须 
      .getSideOutput(OutputTag)       ->可选
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

说明:可以看到键控流和非键控流的最大的区别就是是否对流数据 调用了keyBy算子。 经过keyBy处理的数据流,会根据key分成多条task并行任务, 这些task任务可以并行执行。 而不经过keyBy算子的话,那所有的数据都将被一个窗口处理,也就是只能调用windowAll, 这样的话所有的数据都会被一个task处理, 也就是说所有的数据都将发送到下游算子的单个task中–>并行度为1.

4.窗口分配器 WindowAssigner接口及其实现类

WindowAssigner是最顶层接口,其余的都是它的实现类,其实现类都称为窗口分配器,可以拿来直接用。 其实现类我不会都说一遍,下文会说一些常用的,至于其他的感兴趣可以自己去看。

4.1窗口分配器的作用

  1. 窗口分配器顾名思义就是当一个流元素到来的时候为其分配所属于的窗口,也就是创建了一个Window对象(TimeWindow或者是GlobalWindow)
  2. 除了分配窗口,还未当前流元素分配了一个Trigger触发器.

4.2 WindowAssigner窗口分配器-顶级接口

public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
    *每个流数据 element都会经过这个方法,其作用就是根据timestamp
    生成TimeWindow或者GlobalWindow,实际开发中大部分都是TimeWindow
    TimeWindow包含starttime,endtime,maxtimestamp
    */
    public abstract Collection<W> assignWindows(
            T element, long timestamp, WindowAssignerContext context);

    /** 该方法为到来的流元素element定义了一个Trigger对象,该Trigger对象决定了窗口以何种方式触发
     */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    ...
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

4.3滑动窗口分配器-实现类

对应的WindowAssigner实现类有两个:

  1. TumblingEventTimeWindows
  2. TumblingProcessingTimeWindows

常用场景:每隔五分钟统计一次
下图是一个每隔五秒划分一个窗口的图解。滑动窗口是不会重叠的。

仔细看上图,Tumbling在英文中是滑动的意思,因此上图中:
TumblingEventTimeWindows和TumblingProcessingTimeWindows都是滑动窗口。

  1. TumblingEventTimeWindows 基于event time (不需要水位线)
  2. TumblingProcessingTimeWindows 基于process time(不需要水位线)

TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10))
第一个参数指的是窗口的大小,第二个参数指的是窗口的起始偏移量。 窗口是按照自然时间划分的,比如Time.minutes(5)
划分的窗口是:[00:00:00,00:05:00],[00:05:00,00:10:00]。TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10))意思是在自然划分的窗口的基础上起始偏移量为十秒,此时划分的窗口就是:
[00:00:10, 00:05:10], [00:05:10,00:10:10]
TumblingProcessingTimeWindows 参数也是一样的就不多说了

4.31 模板代码
 stream1
 .keyBy(0)
 .window(TumblingEventTimeWindows.of(Time.minutes(5),Time.seconds(10)))
 .<windowFunction>
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply() 
  • 1
  • 2
  • 3
  • 4
  • 5

4.4滚动窗口分配器-实现类

对应的WindowAssigner实现类有两个:

  1. SlidingEventTimeWindows
  2. SlidingProcessingTimeWindows
    常用场景:每隔一分钟统计过去五分钟的数据量
    滚动窗口有两个必要属性:1.步长slide 2.窗口宽度size
    意思是每隔slide 滑动一次, 一般slide比size小, 你仔细想想,当slide=size的时候滚动窗口就变成了滑动窗口。 滚动窗口会有重叠,意思是一个流元素可能会属于多个滑动窗口中, slide越小窗口分配的就越多。
  1. SlidingEventTimeWindows 基于event time(需要水位线)
  2. SlidingProcessingTimeWindows 基于process time(不需要水位线)

SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5),Time.se conds(1))第一个参数是窗口大小size,第二个参数是步长,第三个参数是偏移量。窗口划分也是基于自然时间划分的。

4.41. 代码模板
 stream1
 .keyBy(0)
 .window(SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5),Time.seconds(1))).<windowFunction>
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply() 
  • 1
  • 2
  • 3
  • 4

4.5 sessoin窗口分配器(会话窗口)

对应的WindowAssigner实现类有两个:

  1. EventTimeSessionWindows //静态事件时间
  2. DynamicEventTimeSessionWindows //动态事件时间
  3. ProcessingTimeSessionWindows //静态处理时间
  4. DynamicProcessingTimeSessionWindows //动态处理时间

常用于:十分钟内没有新的数据到来则开启新的窗口。
会话窗口其实逻辑也很简单, 它有有一个时间间隔在这里为了下文方便我们把这个时间间隔记作sessionTimeout。在sessionTimeout内如果没有新的数据流到窗口则该窗口关闭, 然后开启新的窗口。 那如果一直有数据过来则该窗口永远不会关闭。 这也意味着会话窗口的endTime是不确定的,话句话说窗口的大小是不确定的。
sessoin窗口按照类型分为静态和动态窗口

  1. 静态sessoin窗口
    sessionTimeout是我们传递的一个不会改变的值
    EventTimeSessionWindows
    ProcessingTimeSessionWindows
  2. 动态sessoin窗口
    sessionTimeout是动态变化的,sessionTimeout的赋值是每次获取到流数据的时候动态返 回一个long类型的值,该返回逻辑是我们自定义的。
    DynamicEventTimeSessionWindows
    DynamicProcessingTimeSessionWindows

上面关于动态和静态的sessoin很难理解,因为sessoin窗口和普通的窗口不一样,sessoin窗口涉及到窗口的合并,如果不了解窗口分配很难理解,下面我将尽量讲清楚窗口的分配逻辑。

4.51. sessoin窗口的分配

窗口对象:TimeWindow(timestamp, timestamp + sessionTimeout)
timestamp是流数据的时间,可能是event time 也可能是process time,和你用的窗口分配器类型有关(如果你用EventTimeSessionWindows则timestamp表示event time )

sessoin窗口没办法很简单的举例子来说明窗口的分配,我们必须结合源码来看,我将尽量讲解清除。

  1. 静态
    在这里插入图片描述
    注意图中箭头部分,sessionTimeout是我们传递过来的。
  2. 动态
    在这里插入图片描述
    注意图中箭头部分,sessionTimeout不是我们传递的,而是通过extract(element)从每个元素中提取的,我门点开extract源码如下:
    在这里插入图片描述
    这只是一个最简单的接口,接收一个流数据返回一个long类型的毫秒,这个返回值作为新的sessionTimeout

上面你可能知道了sessoin窗口根据sessionTimeout是自动生成还是动态提取被分为:静态sessoin窗口和动态sessoin窗口。
sessoin窗口和滑动/滚动窗口最大的不同就在于sessoin窗口有一个合并操作。不管是动态还是静态,通过源码可以看到sessoin每到来一个元素都会创建一个时间窗口对象:TimeWindow(timestamp, timestamp + sessionTimeout)

基于时间timestamp 和sessoinTimeout确定一个窗口范围,我们说过sessoin窗口范围是在不断变化的,如果在[timestamp, timestamp + sessionTimeout]范围内到达了新的流数据,该流数据对应的窗口为[timestamp2, timestamp2 + sessionTimeout], 则合并后二者实际上属于一个窗口[timestamp,timestamp2 + sessionTimeout],看下图。
在这里插入图片描述

4.52. 静态sessoin窗口模板代码
 stream1
 .keyBy(0)
 .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
 .<windowFunction>
windowFunction指的是具体的窗口执行函数比如:reduce/aggregate/apply() 
  • 1
  • 2
  • 3
  • 4
  • 5
4.53 动态sessoin窗口模板代码
 stream1.keyBy(0).window( DynamicEventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>(){
            @Override
            public long extract(Tuple2<String, Long> element) {
                return element.f1;
            }
        }));
        SessionWindowTimeGapExtractor用于返回当前流数据 
        构建窗口的sessoinTimeout参数。  
        每个流数据都会调用一次,因为
        每个流数据都会构建一个TimeWindow(timestamp, timestamp + sessionTimeout)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

关于sessoin只能尽量说成这样了。 另外值得注意的就是,上面我讲解的WindowAssigner都是flink已经实现好的,其内部包含了Trigger所以不需要再次指定,内置的WindowAssigner都有一个默认的Trigger. 你可以点开源码看下。

5.实战部分(基于reduce处理函数)

滚动窗口和滑动窗口用法基本一样,我们挑滑动窗口来做两个例子,例子分为event time 和process time两种模式。 记住process time不需要水位线。

5.1 order订单对象代码


public class Order {
    private String name;//订单名称
    private int price;//订单价格
    private long timestamp; //订单产生时间,用于event time 

    public Order() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Order{" +
                "name='" + name + '\'' +
                ", price=" + price +
                ", timestamp=" + timestamp +
                '}';
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

5.2. 简单的数据源,模拟Order订单对象的生产,一秒产生两个订单


import org.apache.flink.streaming.api.functions.source.SourceFunction;



//模拟订单产生的时间
public class SourceOrder implements SourceFunction<Order> {
    private volatile boolean flag=true;

    @Override
    public void run(SourceContext<Order> ctx) throws Exception {
        long timestamp = System.currentTimeMillis();
        while (flag) {

            Order order_0 = new Order();
            Order order_1 = new Order();
            order_0.setName("order_0");
            order_0.setPrice(1);
            order_0.setTimestamp(timestamp);

            order_1.setName("order_1");
            order_1.setPrice(2);
            order_1.setTimestamp(timestamp);
            ctx.collect(order_0);
            ctx.collect(order_1);
            Thread.sleep(1000);
            timestamp += 1000;//订单产生间隔一秒
     
        }

    }

    @Override
    public void cancel() {
        flag = false;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

5.3. 基于event time 的滑动窗口


import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;

public class WindowsEventTimeMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义两条流
        DataStreamSource<Order>  ds = env.addSource(new SourceOrder());
//        ds.print("源数据:");
//        ds.print();
        env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
        KeyedStream<Order, String> keyedStream  = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){
            @Override
            public TimestampAssigner<Order> createTimestampAssigner(Context context) {
                return (e,timestamp) -> e.getTimestamp();
            }
        })).keyBy((KeySelector<Order, String>) value -> value.getName());

        OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){};
        SingleOutputStreamOperator<Order> single=  keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .sideOutputLateData(outputTag)
                .reduce(new ReduceFunction<Order>() {
                    @Override
                    public Order reduce(Order value1, Order value2) throws Exception {
                        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String str_time01 = sdf.format(value1.getTimestamp());
                        String str_time02 = sdf.format(value2.getTimestamp());
                        Order order = new Order();
                        order.setName(value1.getName()+">>"+str_time01+":"+str_time02);
                        order.setPrice(value1.getPrice()+value2.getPrice());
                        order.setTimestamp(9999999);
                        return order;
                    }
                });
        single.getSideOutput(outputTag).print("迟到数据:");
        single.print("真实窗口输出:");
        env.execute("event time demo.");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

在这里插入图片描述

5.4.基于 process time 的滑动窗口

基于process time就很简单了,不需要水位线条,代码很简单那。


import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.SimpleDateFormat;

public class WindowsProcessTimeMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义两条流
        DataStreamSource<Order>  ds = env.addSource(new SourceOrder());
//        ds.print();
        SingleOutputStreamOperator<Order> outStream = ds
                .keyBy("name")
                .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
                .reduce(new ReduceFunction<Order>() {
                    @Override
                    public Order reduce(Order value1, Order value2) throws Exception {
                        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String str_time01 = sdf.format(value1.getTimestamp());
                        String str_time02 = sdf.format(value2.getTimestamp());
                        Order order = new Order();
                        order.setName(value1.getName()+">>"+str_time01+":"+str_time02);
                        order.setPrice(value1.getPrice()+value2.getPrice());
                        return order;
                    }
                });
        outStream.print();

        env.execute("prcocess time demo");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

结果:
在这里插入图片描述
对了,你可以把时间TumblingProcessingTimeWindows.of(Time.seconds(2))改成:
TumblingProcessingTimeWindows.of(Time.seconds(1)) 结果如下:
在这里插入图片描述
你会发现,这不是reduce的结果,reduce根本没有被调用。 因为设置一秒的话,由于数据源是一秒生成一条数据, 那么也就意味着每个窗口大概率只有一条数据, 那么reduce在只有一条数据的时候不会生效。 咳咳,感觉有点坑。 希望对此有研究的小伙伴给我留言。

6.window 窗口处理函数介绍

6.1 再谈reduce函数

reduce:对窗口中的每条数据做逐个处理,最后整个窗口输出一条数据。

@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {

    /**
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}
通过源码可以看到:reduce最后的返回值类型和流数据的类型必须保持一致,
而这也是开发中需要主义的地方。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

上一节的实战部分展示了reduce函数,我们想一下,reduce函数发出的结果有一个小的问题,就是这个结果是属于哪个窗口的我们并不知道。 如果我可以获取到结果所属的窗口的时间那就很有用了,这样的话结果更加清晰。 你想每条reduce的结果我都知道其计算的是哪个窗口,把窗口时间附加到结果中很明确,方便后续我们做一些基于时间的过滤。 但是有些人可能会说了,我不需要知道窗口的时间啊, 我只需要用System.currentTimeMillis()获取当前系统时间,然后把这个时间赋予到结果中不也行嘛? 问题已经出现,请大家好好思考一下,对event time来说其窗口时间 范围i是根据event time来计算的,如果我今天去kafka拿到昨天产生的基于event time的数据,明显窗口时间是昨天的,而你用System.currentTimeMillis()获取的时间就是今天的啊,这样肯定有问题。 有人就说了既然如此我随便取一个 流元素的event time不也行嘛? 答案是可以的,但是不严谨。 flink的reduce 有一个重载函数方法reduce(ReduceFunction, WindowFunction)
windowFunction用于处理reduce发出的结果。
下面代码实现了,对reduce结果添加窗口时间范围的功能。


import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.function.Consumer;

public class WindowsEventTimeReducetionMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义两条流
        DataStreamSource<Order>  ds = env.addSource(new SourceOrder());
//        ds.print("源数据:");
//        ds.print();
        env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
        KeyedStream<Order, String> keyedStream  = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){
            @Override
            public TimestampAssigner<Order> createTimestampAssigner(Context context) {
                return (e,timestamp) -> e.getTimestamp();
            }
        })).keyBy((KeySelector<Order, String>) value -> value.getName());

        OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){};
        SingleOutputStreamOperator<Order> single=  keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .sideOutputLateData(outputTag)
                .reduce(new ReduceFunction<Order>() {
                    @Override
                    public Order reduce(Order value1, Order value2) throws Exception {
                        Order order = new Order();
                        order.setName(value1.getName());
                        order.setPrice(value1.getPrice() + value2.getPrice());//订单价格求和
                        return order;
                        
                    }
                }, new WindowFunction<Order, Order, String, TimeWindow>() {
                    //由于窗口的数据已经被reduceFunction处理并且做了累加,数据到WindowFunction
                    //的时候就是那个reduceFunction处理后的数据,处理后的数据都在Iterable
                    //中,直接遍历出来加上 窗口时间即可
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<Order> input, Collector<Order> out) throws Exception {
                        input.forEach(new Consumer<Order>() {
                            @Override
                            public void accept(Order order) {
                                SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                String window_start_time =sdf.format( window.getStart());//当前窗口的起始时间
                                String window_end_time =sdf.format( window.getEnd());//endtime是下一个窗口的初始时间
                                order.setTimestamp(window.maxTimestamp());//maxTimestamp=endtime-1
                                order.setName(order.getName()+">>>>window_time_range[ "+window_start_time+" , "+window_end_time+" ]");
                                out.collect(order);
                            }
                        });
                    }
                });
        single.getSideOutput(outputTag).print("迟到数据:");
        single.print("真实输出:");
        env.execute("sdaaf");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

结果:
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:32 , 2022-08-24 15:35:34 ]’, price=2, timestamp=1661326533999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:32 , 2022-08-24 15:35:34 ]’, price=4, timestamp=1661326533999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:34 , 2022-08-24 15:35:36 ]’, price=4, timestamp=1661326535999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:34 , 2022-08-24 15:35:36 ]’, price=2, timestamp=1661326535999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:36 , 2022-08-24 15:35:38 ]’, price=4, timestamp=1661326537999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:36 , 2022-08-24 15:35:38 ]’, price=2, timestamp=1661326537999}
真实输出::8> Order{name=‘order_0>>>>window_time_range[ 2022-08-24 15:35:38 , 2022-08-24 15:35:40 ]’, price=2, timestamp=1661326539999}
真实输出::7> Order{name=‘order_1>>>>window_time_range[ 2022-08-24 15:35:38 , 2022-08-24 15:35:40 ]’, price=4, timestamp=1661326539999}

6.2 aggregate函数

reduce是一个的局限性有点大,输入类型和输出类型必须保持一致,对于window窗口内的聚合行为有一个比较高级的聚合方法aggregate,aggregate方法接收一个聚合器接口参数,以及洽谈的一些参数,所有的重载方法都在下图。
在这里插入图片描述
除了红色箭头是AggregationFunction ,其他的都是 AggregateFunction<T, ACC, V>

6.21 aggregate 中内置的sum, max,min,minBy,maxBy是怎么实现的

先看红色箭头AggregationFunction的接口源码:

    private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {
        return reduce(aggregator);
    }
public abstract class AggregationFunction<T> implements ReduceFunction<T> {
    private static final long serialVersionUID = 1L;

    /** Aggregation types that can be used on a windowed stream or keyed stream. */
    public enum AggregationType {
        SUM,
        MIN,
        MAX,
        MINBY,
        MAXBY,
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

请仔细看上面的代码,你会发现
aggregate(AggregationFunction aggregator) { return reduce(aggregator);
这个聚合器竟然调用了reduce,当你点开AggregationFunction的时候发现了这家伙继承了Reducetion,且里面有一个枚举对象,枚举对象中的值为: SUM, MIN,MAX, MINBY, MAXBY。
。AggregationFunction有两个实现类:
在这里插入图片描述
具体我就不展开了,这个聚合器你基本不会重新的,flink基本上已经实现好了:
可以直接用,用法就是: window后面直接调用".sum" “.max” “.min” “.minBy” “maxBy”

6.22 aggregate(AggregateFunction<IN, ACC, OUT>)

这也是这一小节的重点,它和reduce最大的不同就是输入类型和输出类型可以不一致

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

    /**
     * 创建一个累加器,可以是任意flink支持的可序列化的类型
     * 用于缓存数据
     * 最终的结果是从这个缓存中拿出来的。
     */
    ACC createAccumulator();

    /**
     * 每来一条数据都要和缓存中的数据做同样的逻辑计算
     * 然后返回一个新的缓存对象 (缓存指的是累加器)
     */
    ACC add(IN value, ACC accumulator);

    /**
     * 当窗口出发的时候会调用这个方法,从累加器中获取结果
     * 即发出窗口的计算结果。
     */
    OUT getResult(ACC accumulator);

    /**
     * 我之前说过,flink的窗口只有sessoin窗口的大小是不固定的
     * 这个不固定是通过窗口合并实现的, 所以
     * 当你的窗口是sessoin窗口的时候,可能会有窗口合并
     * 而累加器是属于窗口的属性,当窗口合并的时候必须在这里
     * 手动合并两个窗口的累加器缓存。
     * 换言之如果我们不用sessoin窗口,此方法不需要实现就行。
     * 无论是滑动窗口还是滚动窗口他们的窗口大小时不变的,也不需要合并。
     */
    ACC merge(ACC a, ACC b);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

下面我们还是原来的reduce代码稍加改动实现和reduce一样的功能,我自定义了HashMap累加器存储订单号和 价格,方会该订单的所有价格之和,最终结果定义成String类型。


package com.pg.flink.dataStreame.window;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
public class WindowsEventTimeAggregateMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义两条流
        DataStreamSource<Order>  ds = env.addSource(new SourceOrder());
//        ds.print("源数据:");
//        ds.print();
        env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
        KeyedStream<Order, String> keyedStream  = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){
            @Override
            public TimestampAssigner<Order> createTimestampAssigner(Context context) {
                return (e,timestamp) -> e.getTimestamp();
            }
        })).keyBy((KeySelector<Order, String>) value -> value.getName());

        OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){};
        SingleOutputStreamOperator<String>  single=  keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .sideOutputLateData(outputTag)
               .aggregate(new AggregateFunction<Order, HashMap<String,Integer>, String>() {
                              @Override
                              //初始化累加器(其实就是一个缓存)
                              public HashMap<String, Integer> createAccumulator() {
                                  return new HashMap<>();//key 存储order的name,value 存储其价格
                              }

                              @Override
                              public HashMap<String, Integer> add(Order value, HashMap<String, Integer> accumulator) {

                                  String name = value.getName();
                                  int price = value.getPrice();
                                  HashMap<String, Integer> new_accumulator = new HashMap<>();
                                  //当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可
                                  new_accumulator.put(name, price + accumulator.getOrDefault(name,0));
                                  return new_accumulator;
                              }

                              @Override
                              public String getResult(HashMap<String, Integer> accumulator) {
                                  return accumulator.toString();
                              }

                              @Override
                              public HashMap<String, Integer> merge(HashMap<String, Integer> a, HashMap<String, Integer> b) {
                                  //因为不是sessoin窗口,返回一个空的map即可
                                  return new HashMap<>();
                              }
                          });

        single.getSideOutput(outputTag).print("迟到数据:");
        single.print("真实输出:");
        env.execute("sdaaf");
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

结果如下:
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
真实输出::7> {order_1=4}
真实输出::8> {order_0=2}
多说一句:我自定义的sourec每隔一秒中产生一个order_0,和一个order_1
窗口长度两秒, 所以每一次的结果都是两条数据相加,所以结果是准确的哦。

6.23 aggregate(AggregateFunction<IN, ACC, OUT>,WindowFunction<IN, OUT, KEY, W extends Window>)

aggregate(AggregateFunction<IN, ACC, OUT>,WindowFunction<IN, OUT, KEY, W extends Window>)
window中的数据结果就是AggregateFunction处理后的结果, 然后此结果会再次流入WindowFunction再次处理,最后返回最终的处理结果,WindowFunction最主要的功能就是提供的访问窗口的时间的方法,能让我们知道 当前结果的计算时间信息。

import com.pg.flink.dataStreame.window.Order;
import com.pg.flink.dataStreame.window.SourceOrder;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.function.Consumer;

public class WindowsEventTimeAggregateWindowFunctionMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义两条流
        DataStreamSource<Order>  ds = env.addSource(new SourceOrder());
//        ds.print("源数据:");
//        ds.print();
        env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
        KeyedStream<Order, String> keyedStream  = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){
            @Override
            public TimestampAssigner<Order> createTimestampAssigner(Context context) {
                return (e,timestamp) -> e.getTimestamp();
            }
        })).keyBy((KeySelector<Order, String>) value -> value.getName());

        OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){};
        SingleOutputStreamOperator<String>  single=  keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .sideOutputLateData(outputTag)
               .aggregate(new AggregateFunction<Order, HashMap<String, Integer>, String>() {
                   @Override
                   //初始化累加器(其实就是一个缓存)
                   public HashMap<String, Integer> createAccumulator() {
                       return new HashMap<>();//key 存储order的name,value 存储其价格
                   }

                   @Override
                   public HashMap<String, Integer> add(Order value, HashMap<String, Integer> accumulator) {

                       String name = value.getName();
                       int price = value.getPrice();
                       HashMap<String, Integer> new_accumulator = new HashMap<>();
                       //当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可
                       new_accumulator.put(name, price + accumulator.getOrDefault(name, 0));
                       return new_accumulator;
                   }

                   @Override
                   public String getResult(HashMap<String, Integer> accumulator) {
                       return accumulator.toString();
                   }

                   @Override
                   public HashMap<String, Integer> merge(HashMap<String, Integer> a, HashMap<String, Integer> b) {
                       //因为不是sessoin窗口,返回一个空的map即可
                       return new HashMap<>();
                   }
               }, new WindowFunction<String, String, String, TimeWindow>() {
                   @Override
//第一个参数是key的类型
// 第二个参数是当前数据是被TimeWindow对象包装的
//第三个获取的窗口中的所有数据,因为窗口数据已经被AggregateTion处理过来,所以迭代器此时只有一条数据,此数据就是AggregateTion中getResult返回的结果
//第四个参数用来发出最终结果
                   public void apply(String s, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {

                       //
                       input.forEach(new Consumer<String>() {
                           @Override
                           public void accept(String order) {
                               SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                               String window_start_time =sdf.format( window.getStart());//当前窗口的起始时间
                               String window_end_time =sdf.format( window.getEnd());//endtime是下一个窗口的初始时间
                               out.collect(order+">>window_time_range["+window_start_time+" , "+window_end_time+" ]");
                           }
                       });
                   }

               });

        single.getSideOutput(outputTag).print("迟到数据:");
        single.print("真实输出:");
        env.execute("sdaaf");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

结果:
真实输出::8> {order_0=2}>>window_time_range[2022-08-25 19:10:36 , 2022-08-25 19:10:38 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:36 , 2022-08-25 19:10:38 ]
真实输出::8> {order_0=2}>>window_time_range[2022-08-25 19:10:38 , 2022-08-25 19:10:40 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:38 , 2022-08-25 19:10:40 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-25 19:10:40 , 2022-08-25 19:10:42 ]

6.24 aggregate(AggregateFunction<IN, ACC, OUT>,ProcessWindowFunction<IN, OUT, KEY, W extends Window>

aggregate(AggregateFunction<IN, ACC, OUT>,ProcessWindowFunction<IN,OUT, KEY,W>
和6.23比较类似,也可以获取到窗口的时间,但是ProcessWindowFunction有自己的特点:

  1. 可以访问算子的processTime
  2. 可以访问算子的水位线时钟
  3. 可以使用定时器(尤其注意只有keyedStream可以使用此功能)
  4. 可以访问更新以及初始化状态,此状态是你自定义的状态,我们可以基于这些状态去发出数据,这样甚至可以直接更改算子的行为。
    更多有关processFunction的介绍参考:flink processFunction

代码实例暂时先不展示状态相关的操作,后续会讲解,就展示一下简单的获取窗口时间吧。


import com.pg.flink.dataStreame.window.Order;
import com.pg.flink.dataStreame.window.SourceOrder;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.function.Consumer;

public class WindowsEventTimeAggregateProcessFunctionMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义两条流
        DataStreamSource<Order>  ds = env.addSource(new SourceOrder());
//        ds.print("源数据:");
//        ds.print();
        env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
        KeyedStream<Order, String> keyedStream  = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>(){
            @Override
            public TimestampAssigner<Order> createTimestampAssigner(Context context) {
                return (e,timestamp) -> e.getTimestamp();
            }
        })).keyBy((KeySelector<Order, String>) value -> value.getName());

        OutputTag<Order> outputTag = new OutputTag<Order>("sideout"){};
        SingleOutputStreamOperator<String>  single=  keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .sideOutputLateData(outputTag)
               .aggregate(new AggregateFunction<Order, HashMap<String, Integer>, String>() {
                   @Override
                   //初始化累加器(其实就是一个缓存)
                   public HashMap<String, Integer> createAccumulator() {
                       return new HashMap<>();//key 存储order的name,value 存储其价格
                   }

                   @Override
                   public HashMap<String, Integer> add(Order value, HashMap<String, Integer> accumulator) {

                       String name = value.getName();
                       int price = value.getPrice();
                       HashMap<String, Integer> new_accumulator = new HashMap<>();
                       //当name不存在的时候表示是第一个数据此时累加器没有数据,我们给个默认值0即可
                       new_accumulator.put(name, price + accumulator.getOrDefault(name, 0));
                       return new_accumulator;
                   }

                   @Override
                   public String getResult(HashMap<String, Integer> accumulator) {
                       return accumulator.toString();
                   }

                   @Override
                   public HashMap<String, Integer> merge(HashMap<String, Integer> a, HashMap<String, Integer> b) {
                       //因为不是sessoin窗口,返回一个空的map即可
                       return new HashMap<>();
                   }
               }, new ProcessWindowFunction<String, String, String, TimeWindow>() {
                   @Override
                   public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                     elements.forEach(new Consumer<String>() {
                         @Override
                         public void accept(String s) {
                             SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                             String window_start_time =sdf.format( context.window().getStart());//当前窗口的起始时间
                             String window_end_time =sdf.format(  context.window().getEnd());//endtime是下一个窗口的初始时间
                             out.collect(s+">>window_time_range["+window_start_time+" , "+window_end_time+" ]");
                         }
                     });
                   }
               });

        single.getSideOutput(outputTag).print("迟到数据:");
        single.print("真实输出:");
        env.execute("sdaaf");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

真实输出::7> {order_1=4}>>window_time_range[2022-08-26 00:35:08 , 2022-08-26 00:35:10 ]
真实输出::8> {order_0=2}>>window_time_range[2022-08-26 00:35:08 , 2022-08-26 00:35:10 ]
真实输出::7> {order_1=4}>>window_time_range[2022-08-26 00:35:10 , 2022-08-26 00:35:12 ]

6.3 process函数

process(ProcessWindowFunction<T, R, K, W> function) {…}
此函数接收一个ProcessWindowFunction函数,process函数是最强大也是最难用的函数,可以这么说无论是reduce还是aggregate都定义了窗口的个性化操作, reduce是前后数据的迭代处理,aggregate定义通过累加器自由可以实现很多的功能,但是不管是reduce韩式aggregate中处理数据的时候数据元素都是一个个过来的, 是逐个过来的。 最容易看到的就是aggregate中的累加器,每次数据到来都会更新累加器,这明显就是逐个的行为。 但是process就不一样的,process是直接拿到当前窗口所有数据,其内部有一个迭代器Iterable, 包含了当前窗口的所有数据。
除此之外,ProcessWindowFunction还有个内部类, 可以访问水位线,processTime, window对象,和状态。 可以说, reduce和aggregate能实现的功能, process都能实现。 往深了说reduce和aggregate不过是process的特殊形式, 它们不过是把常用的功能抽取出来了而已。 而比较复杂的功能reduce和aggregate是无法实现的, reduce限制最严格,要求输入类型和输出类型一致,且reduce无法访问窗口的结束和开始时间, aggregate好一点,其内部有一个累加器相对而言也更灵活。 重要的来了,无论是reduce还是aggregate其参数都是继承了Function,而process函数的参数继承的是RichFunction,这也就意味着 processFunction是可以自定义以及访问状态的, 这也是proces最大的特性。 关于状态我之后会单独出一篇文章,状态是flink的核心,也是很难理解的地方。
我们已经说了,process可以实现reduce和aggregate的所有功能,以及可以实现一些更高级的功能,为了方便理解,我们下面用process实现reduce功能,并为每条输出添加所属的窗口的时间。


import com.pg.flink.dataStreame.window.Order;
import com.pg.flink.dataStreame.window.SourceOrder;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;

public class WindowsEventTimeProcessFunctionMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定义两条流
        DataStreamSource<Order> ds = env.addSource(new SourceOrder());
//        ds.print("源数据:");
//        ds.print();
        env.getConfig().setAutoWatermarkInterval(100);//水位线生成周期
        KeyedStream<Order, String> keyedStream = ds.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner(new TimestampAssignerSupplier<Order>() {
            @Override
            public TimestampAssigner<Order> createTimestampAssigner(Context context) {
                return (e, timestamp) -> e.getTimestamp();
            }
        })).keyBy((KeySelector<Order, String>) value -> value.getName());

        OutputTag<Order> outputTag = new OutputTag<Order>("sideout") {
        };
        SingleOutputStreamOperator<String> single = keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .sideOutputLateData(outputTag)
                .process(new ProcessWindowFunction<Order, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<Order> elements, Collector<String> out) throws Exception {

                        int sum = 0;
                        for (Order element : elements) {
                            sum += element.getPrice();
                        }
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String window_start_time = sdf.format(context.window().getStart());//当前窗口的起始时间
                        String window_end_time = sdf.format(context.window().getEnd());//endtime是下一个窗口的初始时间
                        out.collect("key=" + s + ", sum=" + sum + ">>>" + "window_time_range[" + window_start_time + "," + window_end_time + "]");
                    }
                });
        single.getSideOutput(outputTag).print("迟到数据:");
        single.print("真实输出:");
        env.execute("sdaaf");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

结果:
真实输出::7> key=order_1, sum=4>>>window_time_range[2022-08-26 01:34:12,2022-08-26 01:34:14]
真实输出::8> key=order_0, sum=2>>>window_time_range[2022-08-26 01:34:12,2022-08-26 01:34:14]
真实输出::7> key=order_1, sum=4>>>window_time_range[2022-08-26 01:34:14,2022-08-26 01:34:16]
真实输出::8> key=order_0, sum=2>>>window_time_range[2022-08-26 01:34:14,2022-08-26 01:34:16]

总结:process是万能的,窗口的一切你想要的操作在这里都能实现。

7.工作中常用的触发器

我们再来看看窗口的代码结构:

stream
   .keyBy(...)              <-  keyed versus non-keyed windows
   .window(WindowAssigner)  <- 必须:窗口分配器,决定了流数据被分配到哪个窗口中
   .trigger(Trigger)        <-  可选: "触发器" (若不设置会选择默认值)
  .evictor(Evictor)         <-  可选: "清除器" (else no evictor)
  .allowedLateness(Time)    <-  可选: 允许元素迟到多久,默认为0意思是不允许迟到
 .sideOutputLateData(OutputTag)<-  可选: 迟到的数据输出到OutputTag管道中
 .reduce/aggregate/apply()  <-  必须: 落到窗口中的数据怎么处理
 .getSideOutput(OutputTag)  <-  可选: 获取迟到的数据
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

读到这里请回头看看我们的代码中用到的部分:

	...省略。。。
    .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .sideOutputLateData(outputTag)
                .process(new ProcessWindowFunction<Order, String, String, TimeWindow>() {...省略...};
  • 1
  • 2
  • 3
  • 4

请仔细阅读上面两处的代码,你会发现:

 .trigger(Trigger)        <-  可选: "触发器" (若不设置会选择默认值)
  .evictor(Evictor)         <-  可选: "清除器" (else no evictor)
  • 1
  • 2

这两个地方我们根本没用到,我们用的是TumblingEventTimeWindows,这是flink内置的窗口分配器,其内部源码自己点开你会发现里面有个trigger,也就是说flink内置的窗口分配器其内部有一个默认的触发器trigger. 还拿TumblingEventTimeWindows举例子,其内部的触发器是:EventTimeTrigger,这个触发器有时候并不能满足我们的需要。 具体请看下一节。

8.ContinuousEventTimeTrigger

短窗口的计算由于其窗口期较短,那么很快就能获取到结果,但是对于长窗口来说窗口时间比较长,如果等窗口期结束才能看到结果,那么这份数据就不具备实时性,大多数情况我们希望能够看到一个长窗口的结果不断变动的情况。比如TumblingEventTimeWindows.of(Time.days(1)), 意思是按天划分窗口,也就是说必须是以天的维度触发一次计算,这种统计历史数据没问题,但是不适合实时看板统计。 窗口虽然是按照天划分,但是我想每分钟都看一下当前天的实时结果,那么此时TumblingEventTimeWindows内置的默认的trigger就不适合了,此时我们需要一个新的trigger取代其默认的trigger,
对此Flink提供了ContinuousEventTimeTrigger连续事件时间触发器与ContinuousProcessingTimeTrigger连续处理时间触发器,指定一个固定时间间隔interval,不需要等到窗口结束才能获取结果,能够在固定的interval获取到窗口的中间结果。

场景:求每个区域的每小时的商品销售额, 要求每隔1min能能够看到销售额变动情况。核心代码实现如下:

	.keyBy(...)
      .timeWindow(Time.hours(1))
      .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
  • 1
  • 2
  • 3
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/1019196
推荐阅读
相关标签
  

闽ICP备14008679号