赞
踩
Flink 窗口触发器(Trigger)(一)
Flink 窗口触发器(Trigger)(二)
Flink的窗口触发器(Trigger)是流处理中一个非常关键的概念,它定义了窗口何时被触发并决定触发后的行为(如进行窗口数据的计算或清理)。
Flink提供了多种内置的触发器,以下几种为常用类型:
触发器通常包含以下几个关键方法:
触发器在触发时会返回一个TriggerResult枚举值,以决定窗口的后续行为。常见的TriggerResult值包括:
Flink的窗口触发器是流处理中非常灵活且强大的工具,它允许开发者根据实际需求定义窗口的触发条件和触发后的行为。通过选择合适的触发器和配置相应的参数,可以实现高效、准确的流数据处理。
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @PublicEvolving public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @PublicEvolving public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() { } public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) { return TriggerResult.FIRE; } public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } public boolean canMerge() { return true; } public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } public String toString() { return "ProcessingTimeTrigger()"; } public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); } }
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.flink.streaming.api.windowing.triggers; import java.time.Duration; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.windows.Window; @PublicEvolving public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, W> { private static final long serialVersionUID = 1L; private final Trigger<T, W> nestedTrigger; private final long interval; private final boolean resetTimerOnNewRecord; private final boolean shouldClearOnTimeout; private final ValueStateDescriptor<Long> timeoutStateDesc; private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long interval, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) { this.nestedTrigger = nestedTrigger; this.interval = interval; this.resetTimerOnNewRecord = resetTimerOnNewRecord; this.shouldClearOnTimeout = shouldClearOnTimeout; this.timeoutStateDesc = new ValueStateDescriptor("timeout", LongSerializer.INSTANCE); } public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception { TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx); if (triggerResult.isFire()) { this.clear(window, ctx); return triggerResult; } else { ValueState<Long> timeoutState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc); long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval; Long timeoutTimestamp = (Long)timeoutState.value(); if (timeoutTimestamp != null && this.resetTimerOnNewRecord) { ctx.deleteProcessingTimeTimer(timeoutTimestamp); timeoutState.clear(); timeoutTimestamp = null; } if (timeoutTimestamp == null) { timeoutState.update(nextFireTimestamp); ctx.registerProcessingTimeTimer(nextFireTimestamp); } return triggerResult; } } public TriggerResult onProcessingTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception { TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(timestamp, window, ctx); if (this.shouldClearOnTimeout) { this.clear(window, ctx); } return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE; } public TriggerResult onEventTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception { TriggerResult triggerResult = this.nestedTrigger.onEventTime(timestamp, window, ctx); if (this.shouldClearOnTimeout) { this.clear(window, ctx); } return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE; } public void clear(W window, Trigger.TriggerContext ctx) throws Exception { ValueState<Long> timeoutTimestampState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc); Long timeoutTimestamp = (Long)timeoutTimestampState.value(); if (timeoutTimestamp != null) { ctx.deleteProcessingTimeTimer(timeoutTimestamp); timeoutTimestampState.clear(); } this.nestedTrigger.clear(window, ctx); } public String toString() { return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")"; } public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout) { return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), false, true); } public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) { return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout); } }
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.windows.Window; @PublicEvolving public class CountTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc; private CountTrigger(long maxCount) { this.stateDesc = new ReducingStateDescriptor("count", new Sum(), LongSerializer.INSTANCE); this.maxCount = maxCount; } public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception { ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc); count.add(1L); if ((Long)count.get() >= this.maxCount) { count.clear(); return TriggerResult.FIRE; } else { return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) { return TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(W window, Trigger.TriggerContext ctx) throws Exception { ((ReducingState)ctx.getPartitionedState(this.stateDesc)).clear(); } public boolean canMerge() { return true; } public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(this.stateDesc); } public String toString() { return "CountTrigger(" + this.maxCount + ")"; } public static <W extends Window> CountTrigger<W> of(long maxCount) { return new CountTrigger(maxCount); } private static class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; private Sum() { } public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } }
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; @PublicEvolving public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long interval; private final ReducingStateDescriptor<Long> stateDesc; private ContinuousEventTimeTrigger(long interval) { this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE); this.interval = interval; } public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc); if (fireTimestampState.get() == null) { this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState); } return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception { if (time == window.maxTimestamp()) { return TriggerResult.FIRE; } else { ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc); Long fireTimestamp = (Long)fireTimestampState.get(); if (fireTimestamp != null && fireTimestamp == time) { fireTimestampState.clear(); this.registerNextFireTimestamp(time, window, ctx, fireTimestampState); return TriggerResult.FIRE; } else { return TriggerResult.CONTINUE; } } } public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(W window, Trigger.TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc); Long timestamp = (Long)fireTimestamp.get(); if (timestamp != null) { ctx.deleteEventTimeTimer(timestamp); fireTimestamp.clear(); } } public boolean canMerge() { return true; } public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(this.stateDesc); Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get(); if (nextFireTimestamp != null) { ctx.registerEventTimeTimer(nextFireTimestamp); } } public String toString() { return "ContinuousEventTimeTrigger(" + this.interval + ")"; } @VisibleForTesting public long getInterval() { return this.interval; } public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) { return new ContinuousEventTimeTrigger(interval.toMilliseconds()); } private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception { long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp()); fireTimestampState.add(nextFireTimestamp); ctx.registerEventTimeTimer(nextFireTimestamp); } private static class Min implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; private Min() { } public Long reduce(Long value1, Long value2) throws Exception { return Math.min(value1, value2); } } }
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; @PublicEvolving public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long interval; private final ReducingStateDescriptor<Long> stateDesc; private ContinuousProcessingTimeTrigger(long interval) { this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE); this.interval = interval; } public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc); timestamp = ctx.getCurrentProcessingTime(); if (fireTimestampState.get() == null) { this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState); } return TriggerResult.CONTINUE; } public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc); if (((Long)fireTimestampState.get()).equals(time)) { fireTimestampState.clear(); this.registerNextFireTimestamp(time, window, ctx, fireTimestampState); return TriggerResult.FIRE; } else { return TriggerResult.CONTINUE; } } public void clear(W window, Trigger.TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc); Long timestamp = (Long)fireTimestamp.get(); if (timestamp != null) { ctx.deleteProcessingTimeTimer(timestamp); fireTimestamp.clear(); } } public boolean canMerge() { return true; } public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(this.stateDesc); Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get(); if (nextFireTimestamp != null) { ctx.registerProcessingTimeTimer(nextFireTimestamp); } } @VisibleForTesting public long getInterval() { return this.interval; } public String toString() { return "ContinuousProcessingTimeTrigger(" + this.interval + ")"; } public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) { return new ContinuousProcessingTimeTrigger(interval.toMilliseconds()); } private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception { long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp()); fireTimestampState.add(nextFireTimestamp); ctx.registerProcessingTimeTimer(nextFireTimestamp); } private static class Min implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; private Min() { } public Long reduce(Long value1, Long value2) throws Exception { return Math.min(value1, value2); } } }
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.windows.Window; @PublicEvolving public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> { private static final long serialVersionUID = 1L; private final DeltaFunction<T> deltaFunction; private final double threshold; private final ValueStateDescriptor<T> stateDesc; private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) { this.deltaFunction = deltaFunction; this.threshold = threshold; this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer); } public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception { ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc); if (lastElementState.value() == null) { lastElementState.update(element); return TriggerResult.CONTINUE; } else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) { lastElementState.update(element); return TriggerResult.FIRE; } else { return TriggerResult.CONTINUE; } } public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) { return TriggerResult.CONTINUE; } public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } public void clear(W window, Trigger.TriggerContext ctx) throws Exception { ((ValueState)ctx.getPartitionedState(this.stateDesc)).clear(); } public String toString() { return "DeltaTrigger(" + this.deltaFunction + ", " + this.threshold + ")"; } public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) { return new DeltaTrigger(threshold, deltaFunction, stateSerializer); } }
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.streaming.api.windowing.windows.Window; @PublicEvolving public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> { private static final long serialVersionUID = 1L; private Trigger<T, W> nestedTrigger; private PurgingTrigger(Trigger<T, W> nestedTrigger) { this.nestedTrigger = nestedTrigger; } public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception { TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception { TriggerResult triggerResult = this.nestedTrigger.onEventTime(time, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception { TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(time, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } public void clear(W window, Trigger.TriggerContext ctx) throws Exception { this.nestedTrigger.clear(window, ctx); } public boolean canMerge() { return this.nestedTrigger.canMerge(); } public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception { this.nestedTrigger.onMerge(window, ctx); } public String toString() { return "PurgingTrigger(" + this.nestedTrigger.toString() + ")"; } public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) { return new PurgingTrigger(nestedTrigger); } @VisibleForTesting public Trigger<T, W> getNestedTrigger() { return this.nestedTrigger; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。