赞
踩
上一篇记录了flink的算子(窗口)的相关信息,那么是否有疑问,定义的窗口时什么时候使用的呢?
大家是否记得窗口有一个 [.trigger(…)] 的而可选项呢?
[.trigger(...)] //trigger:指定触发器Trigger(可选)
每个WindowAssigner都带有一个默认触发器,窗口就是通过trigger进行触发操作。
Flink中定义了Trigger抽象类,任何trigger必须继承Trigger类
@PublicEvolving public abstract class Trigger<T, W extends Window> implements Serializable { private static final long serialVersionUID = -4104633972991191369L; public Trigger() { } public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception; public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception; public boolean canMerge() { return false; } public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception; public interface OnMergeContext extends Trigger.TriggerContext { <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> var1); } public interface TriggerContext { long getCurrentProcessingTime(); MetricGroup getMetricGroup(); long getCurrentWatermark(); void registerProcessingTimeTimer(long var1); void registerEventTimeTimer(long var1); void deleteProcessingTimeTimer(long var1); void deleteEventTimeTimer(long var1); <S extends State> S getPartitionedState(StateDescriptor<S, ?> var1); /** @deprecated */ @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String var1, Class<S> var2, S var3); /** @deprecated */ @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String var1, TypeInformation<S> var2, S var3); } }
onElement() :方法会在窗口中每进入一条数据的时候调用一次
onProcessingTime():方法会在一个ProcessingTime定时器触发的时候调用
onEventTime():方法会在一个EventTime定时器触发的时候调用
clear():方法会在窗口清除的时候调用
Flink官方提供了几种常用的trigger实现,同时,用户可以根据需求自定义trigger。
onElement() ,onProcessingTime(),onEventTime()方法的返回类型都是 TriggerResult
TriggerResult中包含四个枚举值:
CONTINUE:表示对窗口不执行任何操作。
FIRE:表示对窗口中的数据按照窗口函数中的逻辑进行计算,并将结果输出。注意计算完成后,窗口中的数据并不会被清除,将会被保留。
PURGE:表示将窗口中的数据和窗口清除。
FIRE_AND_PURGE:表示先将数据进行计算,输出结果,然后将窗口中的数据和窗口进行清除。
我们先看一下flink为我们提供了那些触发器吧。
CountTrigger: 指定条数触发
ContinuousEventTimeTrigger:指定事件时间触发
ContinuousProcessingTimeTrigger:指定处理时间触发
ProcessingTimeTrigger: 默认触发器,窗口结束触发
EventTimeTrigger: 默认处理时间触发器,窗口结束触发
NeverTrigger:全局窗口触发器,不触发
我们以 EventTimeTrigger为例来说明下,大家可以直接参考源码哈
前文中有搜索热度的实现,现在想统计每30秒搜索的次数,在window中使用了TumblingEventTimeWindows方法,我们可以看用下TumblingEventTimeWindows是如何实现的
DataStream<Object> accumulatorStream = dataSteam
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.aggregate(...);
@PublicEvolving public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long offset; protected TumblingEventTimeWindows(long size, long offset) { if (offset >= 0L && offset < size) { this.size = size; this.offset = offset; } else { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size"); } } public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > -9223372036854775808L) { long start = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.size); return Collections.singletonList(new TimeWindow(start, start + this.size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?"); } } public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } public String toString() { return "TumblingEventTimeWindows(" + this.size + ")"; } public static TumblingEventTimeWindows of(Time size) { return new TumblingEventTimeWindows(size.toMilliseconds(), 0L); } public static TumblingEventTimeWindows of(Time size, Time offset) { return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds()); } public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new Serializer(); } public boolean isEventTime() { return true; }
我们可以看到Trigger方法的实现,调用了EventTimeTrigger.create()
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
EventTimeTrigger的实现如下
既然是Event事件类型,那么onEventTime中满足条件当然是直接TriggerResult.FIRE
啦,onProcessingTime 当然TriggerResult.CONTINUE
@PublicEvolving public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
每个窗口分配器默认都有一个触发器,如果默认的触发器不符合你的要求,就可以使用trigger(…)自定义触发器。
通常来说,默认的触发器适用于多种场景。例如,event-time窗口分配器都有一个EventTimeTrigger作为默认触发器。该触发器在watermark通过窗口末尾时出发。
注意:GlobalWindow默认的触发器时NeverTrigger,该触发器从不触发,所以在使用GlobalWindow时必须自定义触发器。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。