当前位置:   article > 正文

Flink从入门到放弃之入门篇(四)-剖析窗口生命周期_flink state 生命周期

flink state 生命周期

一、应用场景

Apache Flink可以说是目前大数据实时流处理最流行的技术,功能非常强大,支持开发和运行多种不同类型的应用程序。主要特性包括:批流一体化、状态管理、事件时间支持以及精准一次的状态一致性保障等。目前Flink的应用场景整体概括下来包含以下几点:

  1. 事件驱动型应用

  2. 数据分析(OLAP)型应用

  3. 数据管道/ETL类型应用

接下来将针对这三类应用做一个简单的概述,希望读者能有一个大概的了解。

1.1 事件驱动型应用

概念:事件驱动是在计算存储分离的传统应用基础上进化而来的,它是一类具有状态的应用,从一个或多个事件中提取数据,并根据事件来触发计算、状态更新或者其他的动作。事件驱动型应用在设计上,将数据和计算进行分离,应用只需要访问本地(内存/磁盘)来获取数据,容错性的实现依赖于定期持久化存储写入checkpoint,关于传统型应用和事件驱动型应用的区别可见下图:

优势:事件驱动型应用通过本地访问数据来实现更高的吞吐和更低的延迟,并异步增量来完成远程持久化存储的checkpoint。而传统型的应用需要共享同一个数据库,因此任何对数据库自身的修改都需要谨慎协调。

实际案例:典型的事件驱动应用如:基于规则的监控告警;反欺诈;异常检测等业务监控场景

1.2 OLAP型应用

概念:其实这块的应用场景应该是比较常见的,对于传统的分析方式大多是以批查询处理的,如通过hive/spark等离线技术进行处理的,而Flink是可以支持批流一体的分析应用

优势:相对于批查询,flink流处理分析可以使得结果产出延迟更低。flink流分析简化了应用抽象,通常实现批查询需要由多个独立的组件组成,需要定时调度完成ETL,一旦某个环节出现问题将会影响后续的步骤。而flink流分析涵盖了从数据接入到数据结果产出的所有步骤,同时可以依赖底层引擎提供的故障恢复机制。

实际案例:实时数据即席分析,图分析等实时分析型场景

1.3 数据管道/ETL应用

概念:同上面的OLAP类型应用,一般用于构建实时数仓中的步骤,将一系列ETL步骤组成一个pipeline形式

优势:优势同OLAP应用

实际案例:实时数仓,实时查询索引构建

二、窗口概念

在引入窗口概念之前,我们需要知道Flink中的数据主要分为两类:有界数据流无界数据流

无界数据流:指的是一旦开始生成后就会持续不断的产生新的数据,即数据没有时间边界,这种类型的数据一般适用于做ETL

有界数据流:指的是输入的数据有始有终,一般这种类型的数据用于批处理,如统计过去一分钟的pv或者uv等类似聚合类操作。

而flink又是实时流技术,那么如何支持有界数据流的聚合操作呢?这个时候就有了窗口的概念。

窗口的作用就是为了周期性的获取数据,即把传入的无界数据流在逻辑上划分多个buckets,所以可以把窗口看作是从流到批的一个桥梁。

如上图所示,在一个无界的数据流上,我们通过指定窗口各种属性来实现有界流的处理。因为有了窗口,使得flink成为流批一体的潮流大数据技术。

三、窗口生命周期

通过以上的内容,我们应该知道了窗口的作用(主要是为了解决什么样的问题)。那么这个时候需要思考四个问题

  1. 数据元素是如何分配到对应窗口中的(也就是窗口的分配器)?

  2. 元素分配到对应窗口之后什么时候会触发计算(也就是窗口的触发器)?

  3. 在窗口内我们能够进行什么样的操作(也就是窗口内的操作)?

  4. 当窗口过期后是如何处理的(也就是窗口的销毁关闭)?

其实这四个问题从大体上可以理解为窗口的整个生命周期过程。接下来我们对每个环节进行讲解

四、窗口分配器WindowAssigner

在开始梳理窗口分配过程之前,我们应该先知道Flink中的窗口从大体上划分有2种类型:

  1. 根据时间划分窗口,也就是TimeWindow,按照时间来生成窗口。每个时间窗口都有一个开始时间和结束时间,表示一个左闭右开的时间段。根据时间窗口再进一步进行划分,有以下几种窗口分配类型:

    1. 滚动窗口(Tumbling Window)

    2. 滑动窗口(Sliding Window)

    3. 会话窗口(Session Window)

  2. 根据数据划分窗口,也就是GlobalWindow(CountWindow),根据数据条数来生成一个窗口,和时间无关。

由于基于数据条数来划分窗口是比较简单的,这里不再细说。接下来将针对时间窗口(实际生产中也是常用的)来进行讲述。

在讲述时间窗口之前,需要先了解一下在Flink中,关于时间又分为三种:

  1. Event Time:即事件产生的时间

  2. IngestionTime:即进入系统的时间,也就是数据进行flink的时间

  3. Processing Time:即数据被Operator算子处理的时间

滚动窗口Tumbling Window

滚动窗口分配器会把每个元素分配到一个指定窗口大小的窗口中,且每个窗口之间没有重叠。例如当指定大小为5分钟的窗口,那么就会每5分钟启动一个新的窗口,如下图所示:

该类窗口的特点:

  1. 时间对齐,默认情况下时间窗口会做一个对齐,比如设置一个一小时的窗口,那么窗口的起止时间是[0:00:00.000 - 0:59:59.999)

  2. 窗口长度固定

  3. 窗口没有重叠

适用场景:适合做每个时间段的聚合计算,BI分析。例如统计某页面每分钟点击的pv。

滑动窗口 Sliding Window

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数(size)来配置,另一个窗口滑动参数(slide)控制滑动窗口开始的频率。滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下,元素会被分配到多个窗口下。例如,可以设置一个大小为10分钟的窗口,每5分钟滑动一次,那么每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。

该类窗口的特点:

  1. 时间可以对齐

  2. 窗口长度固定

  3. 有重叠

适用场景:对最近一段时间段内进行统计(如某接口近几分钟的失败调用率)

会话窗口Session Window

会话窗口由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

session窗口分配器通过session活动来对元素进行分组,session窗口和滑动窗口和滚动窗口相比,不会有重叠和固定的开始时间和结束时间的情况。当它在一个固定的时间周期内不再接收元素,即非活动间隔产生,那个窗口就会关闭。

一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session关闭并且后续的元素将被分配到新的session窗口中去

会话窗口就是根据上图中的session gap来切分不同的窗口,当一个窗口在大于session gap时间内没有接收到数据,窗口就会关闭,所以在这种模式下,窗口的长度是可变的,开始和结束时间也是不确定的,唯独可以设置定长的session gap.

该类窗口的特点:

  1. 时间无对齐

  2. 当前系统时间-分组内最后一次的时间如果超时,则进行触发计算

五、窗口触发器WindowTrigger

触发器(Trigger)决定了何时启动Window Function来处理窗口中的数据以及何时将窗口内的数据清理。

增量计算窗口函数对每个新流入的数据直接进行聚合,Trigger决定了在窗口结束时将聚合结果发送出去;全量计算窗口函数需要将窗口内的元素缓存,Trigger决定了在窗口结束时对所有元素进行计算然后将结果发送出去。

每一个窗口都有一个默认的Trigger,当到达窗口的结束时间时,Trigger和对应的计算就会被触发。目前flink内置的触发器有以下几种:

  • EventTimeTrigger:基于event time来触发计算

  • ProcessingTimeTrigger:基于processing time来触发计算,通过对比ProcessingTime和窗口EndTime来确定是否触发窗口

  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算

  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。

  • CountTrigger:基于窗口内元素数量来触发计算,当超过设定的阈值来触发窗口计算

  • PurgingTrigger:对其他触发器做一个转换,即支持清理窗口数据功能

  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算

这里需要注意的是GlobalWindow的默认触发器是NeverTrigger,它从不触发。因此,在使用GlobalWindow时,必须定义一个自定义触发器。如果我们有一些个性化的触发条件,比如窗口内遇到某个特定的元素、元素总数达到一定数量或窗口中的元素到达时满足某种特定的模式时,我们可以自定义一个Trigger。那么接下来需要了解一下Trigger接口的五种方法:

  • onElement()方法:当某个窗口增加一个元素时,会调用该方法,返回一个TriggerResult

  • onEventTime()方法:当一个基于Event Time的Timer触发了FIRE时调用onEventTime方法

  • onProcessingTime()方法:当一个基于Processing Time的Timer触发了FIRE时调用onProcessTime方法

  • onMerge()方法:和有状态的触发器有关,当多个窗口被合并时调用onMerge,并会合并触发器的状态,例如使用会话窗口时。

  • clear()方法:当窗口数据被清理时,调用clear方法来清理所有的Trigger状态数据,否则随着窗口越来越多,状态数据也会越来越多

当满足某个条件,Trigger会返回一个TriggerResult封装的结果,根据返回结果进行下一步的操作:

  • CONTINUE:什么都不做

  • FIRE:启动计算并将结果发送给下游,不清理窗口数据

  • PURGE:清理窗口数据但是不执行计算

  • FIRE_AND_PURGE:启动计算,发送结果然后清理窗口数据

示例1:按照event_time来划分窗口,窗口长度为4s,同时每2s触发一次计算,并将中间状态结果清除掉

如上图所示,输入1000,flink,1 具体含义如下:

  • 1000是event_time可以理解成为1s

  • flink作为分组的key

  • 1代表key对应的value

具体代码见simple.window.trigger.PurgingTriggerDemo

示例2:自定义Trigger

实现功能:在EventTimeTrigger基础上增加一个元素个数统计,当窗口内的元素个数达到阈值后则触发计算,并清除状态;当达到窗口结束时间时再次触发计算,同时也清除状态。具体代码见simple.window.trigger.CustomCountEventTimeTrigger

  1. private static int count = 1;
  2. @Override
  3. public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  4.   long nextEndTime = window.getEnd() - 1;
  5.   System.out.println("当前事件时间:" + timestamp + " 窗口截止时间:" + nextEndTime + " 当前窗口内元素个数为:" + count);
  6.   if (nextEndTime <= ctx.getCurrentWatermark()) {
  7.     return TriggerResult.FIRE_AND_PURGE;
  8.   } else {
  9.     ctx.registerEventTimeTimer(nextEndTime);
  10.   }
  11.   //判断元素个数是否达到阈值
  12.   if (count < MAX_VALUE) {
  13.     count++;
  14.   } else {
  15.     count = 1;
  16.     System.out.println("元素个数达到阈值,开始触发计算:" + element);
  17.     return TriggerResult.FIRE_AND_PURGE;
  18.   }
  19.   return TriggerResult.CONTINUE;
  20. }
  21. //这里设置窗口长度为3s,元素个数阈值为2
  22. //输入:
  23. //1000,flink,1
  24. //2000,flink,2
  25. //3000,flink,3
  26. //4000,flink,4

由于代码篇幅过长不便于读者阅读,这里不在详细贴出,具体代码示例关注公众号后台回复即可查看

六、窗口函数windowFunction

当我们对无限流完成了窗口划分,并在一定时间下触发了窗口,那么这个时候就需要对窗口内的元素进行一定的操作,也就是所谓的窗口函数。窗口函数主要分为两种:一种是增量计算,如reduce和aggregate,在处理时会保存一个中间状态结果,新进来的元素会和这个状态中间数据进行一些操作;一种是全量计算,如process,指的是先缓存窗口内所有的元素,等触发之后对窗口内所有元素执行计算。

在讲述窗口函数具体实现之前,先来了解一下Flink窗口的大致骨架结构:

对于Keyed Windows:按照key分组的骨架结构如下

  1. stream
  2.   .keyBy(...)               <-  keyed versus non-keyed windows
  3.   .window(...)              <-  required: "assigner"
  4.   [.trigger(...)]            <-  optional"trigger" (else default trigger)
  5.   [.evictor(...)]            <-  optional"evictor" (else no evictor)
  6.   [.allowedLateness(...)]    <-  optional"lateness" (else zero)
  7.   [.sideOutputLateData(...)] <-  optional"output tag" (else no side output for late data)
  8.   .reduce/aggregate/fold/apply()      <-  required: "function"
  9.   [.getSideOutput(...)]      <-  optional"output tag"

对于Non-Keyed Windows:即不分组模式对应的骨架结构如下

  1. stream
  2.   .windowAll(...)           <-  required: "assigner"
  3.   [.trigger(...)]            <-  optional"trigger" (else default trigger)
  4.   [.evictor(...)]            <-  optional"evictor" (else no evictor)
  5.   [.allowedLateness(...)]    <-  optional"lateness" (else zero)
  6.   [.sideOutputLateData(...)] <-  optional"output tag" (else no side output for late data)
  7.   .reduce/aggregate/fold/apply()      <-  required: "function"
  8.   [.getSideOutput(...)]      <-  optional"output tag"

关于trigger触发器,第五小节已经大致介绍了。evictor销毁将在下一小节讲述,allowedLateness和sideOutputLateData这块涉及到WaterMark,读者可以先略过后面会有单独的文章进行讲解。

通过窗口函数可以实现对数据类型的转换并对窗口内的数据进行一些必要的操作,如下图一个DataStream通过调用keyBy转换成KedyedStream,再经过window转换成WindowedStream,然后再基于WindowedStream进行reduce、aggregate或者process等窗口函数进行操作

窗口函数大致可以分为ReduceFunction、AggregateFunction、FoldFunction和ProcessWindowFunction。接下来分别进行介绍其具体作用。

ReduceFunction

ReduceFunction指定如何将输入中的两个元素组合在一起以产生相同类型的输出元素。Flink使用ReduceFunction来逐步聚合窗口的元素。如下面的示例汇总了窗口中所有元素的元组的第二个字段

  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3.     .keyBy(<key selector>)
  4.     .window(<window assigner>)
  5.     .reduce(new ReduceFunction<Tuple2<String, Long>> {
  6.       public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
  7.         return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
  8.       }
  9.     });

AggregateFunction

AggregateFunctio是ReduceFunction的通用版本,也是一种增量计算窗口函数,保存了一个中间状态数据,但是使用比较复杂。先来看一下源码

  1. @PublicEvolving
  2. public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
  3.   //创建一个新的Accumulator,也就是中间状态数据,在发起一次aggregate时会调用
  4.  ACC createAccumulator();
  5.   
  6.   //当窗口内进入一个新元素时会把该元素和ACC进行合并,然后返回新的状态数据ACC
  7.  ACC add(IN value, ACC accumulator);
  8.   
  9.   //将中间结果转换为结果数据
  10.  OUT getResult(ACC accumulator);
  11.   
  12.   //合并两个ACC
  13.  ACC merge(ACC a, ACC b);
  14. }

源码中定义了三个类型,IN,ACC,OUT。输入类型是IN,输出类型是OUT,中间状态数据是ACC。这种复杂的设计是为了解决输入类型、中间状态和输出类型不一致的问题。下面将通过一个例子来讲解一下这几个函数的工作流程

示例:计算一个窗口的平均值,那么ACC就要保存总和以及个数

  1. class MyAggregateFunction implements AggregateFunction<Tuple3<StringString, Integer>, Tuple2<Integer, Integer>, Double> {
  2.     @Override
  3.     public Tuple2<Integer, Integer> createAccumulator() {
  4.         return Tuple2.of(00);
  5.     }
  6.     @Override
  7.     public Tuple2<Integer, Integer> add(Tuple3<StringString, Integer> value, Tuple2<Integer, Integer> accumulator) {
  8.         accumulator.f0 = accumulator.f0 + 1;
  9.         accumulator.f1 = accumulator.f1 + value.f2;
  10.         System.out.println("新增元素,当前中间状态结果:" + accumulator + " 当前事件元素:" + value);
  11.         return accumulator;
  12.     }
  13.     @Override
  14.     public Double getResult(Tuple2<Integer, Integer> accumulator) {
  15.         System.out.println("调用结果值..分子为:" + accumulator.f1 + " 分母为:" + accumulator.f0);
  16.         return Double.valueOf(accumulator.f1 / accumulator.f0);
  17.     }
  18.     @Override
  19.     public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
  20.         System.out.println("合并值:" + a);
  21.         return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
  22.     }
  23. }

如上图所示,当程序刚启动时,还没有数据进入,这个时候会创建一个新的ACC。当有数据流入后,会调用add函数更新ACC,如果有跨节点的ACC的话,flink会调用merge进行合并直到窗口结束后会调用getResult生成结果。

注意:如果你未设置并行度,则默认按照机器的核数,那么这个时候就会出现应该窗口触发的时候但未触发的情况。笔者开发的demo见simple.window.function.AggregateFunctionDemo。

在笔者的例子中,如果未设置并行度为1,那么只有当窗口3的数据到来时才会触发窗口0。如果读者的机器核数比较多,那么有可能会调试多次不能触发窗口0的计算。具体原因见源码:

  1. private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws Exception {
  2.   long newMinWatermark = Long.MAX_VALUE;
  3.   boolean hasAlignedChannels = false;
  4.   // determine new overall watermark by considering only watermark-aligned channels across all channels
  5.   for (InputChannelStatus channelStatus : channelStatuses) {
  6.     if (channelStatus.isWatermarkAligned) {
  7.       hasAlignedChannels = true;
  8.       newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
  9.     }
  10.   }
  11.   // we acknowledge and output the new overall watermark if it really is aggregated
  12.   // from some remaining aligned channel, and is also larger than the last output watermark
  13.   if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
  14.     lastOutputWatermark = newMinWatermark;
  15.     output.emitWatermark(new Watermark(lastOutputWatermark));
  16.   }
  17. }

也就是说当同一个窗口内的元素所在task都满足窗口触发条件时,那么该窗口才会真正被触发。

FoldFunction

FoldFunction指定如何将窗口的输入元素与输出类型的元素组合。每当窗口内有数据流入就会和当前输出值进行一些合并操作。该函数作用其实跟ReduceFunction一样,唯一不同的是该函数可以设置一个初始值。具体使用可以参考下面官网的示例:即将所有输入的long类型值追加到初始值为空上。

  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3.     .keyBy(<key selector>)
  4.     .window(<window assigner>)
  5.     .fold("", new FoldFunction<Tuple2<String, Long>String>> {
  6.        public String fold(String acc, Tuple2<String, Long> value) {
  7.          return acc + value.f1;
  8.        }
  9.     });

这里需要注意的是该函数不能用于会话窗口或者其他可合并的窗口

ProcessWindowFunction

ProcessWindowFunction是属于全量计算的函数,需要把窗口内的全量数据进行缓存,因此是非常耗费性能和资源的。该函数会返回一个包含全部数据的可迭代对象Iterable,提供了可访问时间和一些状态信息,可以直接操作状态,所以该函数比其他窗口函数更加丰富灵活。源码定义如下:

  1. /**
  2. * IN :输入类型
  3. * OUT:输出类型
  4. * KEY: keyBy算子中按照key分组,Key的类型
  5. * W:窗口类型
  6. *
  7. **/
  8. public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
  9.  private static final long serialVersionUID = 1L;
  10.   
  11.   //对一个窗口内的元素进行处理,元素会缓存在Iterable中,处理后输出到Collector中
  12.  public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
  13.   //窗口执行完毕后会进行清理,删除各类状态数据
  14.  public void clear(Context context) throws Exception {}
  15.   //窗口上下文状态,包括窗口元数据、状态数据、WaterMark等
  16.  public abstract class Context implements java.io.Serializable {
  17.  
  18.     //返回当前正在处理的Window
  19.   public abstract W window();
  20.     //返回当前ProcessingTime
  21.   public abstract long currentProcessingTime();
  22.     //返回当前EventTime对应的Watermark
  23.   public abstract long currentWatermark();
  24.     //返回某个key下的某个window状态,单窗口下的状态,当使用单个窗口状态时,需要在clear函数中清理状态
  25.   public abstract KeyedStateStore windowState();
  26.     //返回某个key下的全局状态,跨多个窗口,也就是说多个窗口都能访问
  27.   public abstract KeyedStateStore globalState();
  28.     //迟到的数据发送到其他位置
  29.   public abstract <X> void output(OutputTag<X> outputTag, X value);
  30.  }
  31. }

示例:统计窗口下的元素个数

  1. DataStream<Tuple2<String, Long>> input = ...;
  2. input
  3.   .keyBy(t -> t.f0)
  4.   .timeWindow(Time.minutes(5))
  5.   .process(new MyProcessWindowFunction());
  6. public class MyProcessWindowFunction 
  7.     extends ProcessWindowFunction<Tuple2<String, Long>StringString, TimeWindow> {
  8.   @Override
  9.   public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
  10.     long count = 0;
  11.     for (Tuple2<String, Long> ininput) {
  12.       count++;
  13.     }
  14.     out.collect("Window: " + context.window() + "count: " + count);
  15.   }
  16. }

注意:ProcessWindowFunction需要把一个窗口内的所有元素都缓存起来,这种操作将占用大量的存储资源,虽然应用场景很广,能够解决比较复杂的场景问题,但是稍有使用不慎就会造成严重后果

ProcessWindowFunction With Incremental Aggreation:即ProcessWindowFunction和增量计算结合

为了解决ProcessWindowFunction将整个窗口元素缓存起来占用大量资源的情况,flink提供了可以将ProcessWindowFunction和reduce和aggregate组合的操作。即当元素到达窗口时进行增量计算,当窗口结束的时候,ProcessWindowFunction将会出增量结果进行处理输出结果。该组合操作即可以增量计算窗口,同时也可以访问窗口的一些元数据、状态信息等。

示例:ProcessWindowFunction和ReduceFunction结合使用来获取窗口中最小的元素和窗口的开始时间

  1. DataStream<SensorReading> input = ...;
  2. input
  3.   .keyBy(<key selector>)
  4.   .timeWindow(<duration>)
  5.   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
  6. // Function definitions
  7. private static class MyReduceFunction implements ReduceFunction<SensorReading> {
  8.   public SensorReading reduce(SensorReading r1, SensorReading r2) {
  9.       return r1.value() > r2.value() ? r2 : r1;
  10.   }
  11. }
  12. private static class MyProcessWindowFunction
  13.     extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>String, TimeWindow> {
  14.   public void process(String key,
  15.                     Context context,
  16.                     Iterable<SensorReading> minReadings,
  17.                     Collector<Tuple2<Long, SensorReading>> out) {
  18.       SensorReading min = minReadings.iterator().next();
  19.       out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  20.   }
  21. }

七、窗口销毁WindowEvictor

Flink的窗口模型允许除了WindowAssigner和Trigger之外还指定一个可选的Evictor,可以在Window Function执行前或者执行后调用Evictor.具体源码定义如下:

  1. /**
  2. *
  3. * T为元素类型
  4. * W为窗口
  5. */
  6. public interface Evictor<T, W extends Window> extends Serializable {
  7.   
  8.   //在Window Function之前调用,即可以在窗口处理之前剔除数据
  9.     void evictBefore(Iterable<TimestampedValue<T>> var1, int var2, W var3, Evictor.EvictorContext var4);
  10.   //在Window Function之后调用
  11.     void evictAfter(Iterable<TimestampedValue<T>> var1, int var2, W var3, Evictor.EvictorContext var4);
  12.   //Evictor上下文
  13.     public interface EvictorContext {
  14.         long getCurrentProcessingTime();
  15.         MetricGroup getMetricGroup();
  16.         long getCurrentWatermark();
  17.     }
  18. }

窗口中所有的元素被放在Iterable<TimestampedValue<T>>中,我们可以实现自己的清除逻辑。对于增量计算如ReduceFunction和AggregateFunction,没必要使用Evictor。

Flink提供了三种已实现的Evictor:

  • CountEvictor:保存指定数量的元素,多余的元素按照从前往后的顺序剔除

  • DeltaEvictor:计算Window中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素

  • TimeEvictor:对于给定的窗口,提供一个以毫秒为单位间隔的参数interval,找到最大的时间戳max_ts,然后删除所有时间戳小于max_ts-interval。

这里有以下几个注意点:

  1. 如果指定Evictor,那么可以防止预聚合操作,因为在计算之前会把所有的元素先传递给Evictor

  2. 由于Flink不保证元素顺序,因此Evictor当从窗口的开头删除元素时,那么该元素不一定是第一个或者最后一个到达的元素。

  3. 默认情况下,在窗口函数调用之前执行Evictor逻辑

八、总结

九、关于代码

由于文章涉及内容较多,篇幅较长可能会对读者造成困扰,因此文章中涉及到代码大部分都是源码或者官网示例以方便读者理解。

相关实战代码可关注公众号后台私信(暗号:flink代码)领取(后面flink专题连载相关实战代码均不在文章中给出了)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/677272
推荐阅读
相关标签
  

闽ICP备14008679号