赞
踩
Flink window 源码分析1:窗口整体执行流程
Flink window 源码分析2:Window 的主要组件
Flink window 源码分析3:WindowOperator
Flink window 源码分析4:WindowState
本文分析的源码为flink 1.18.0_scala2.12版本。
代码位置:org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
功能:选择将数据划分到哪个窗口中。一条数据可能同时分发到多个 Window 中。
在窗口操作中,元素按其键(如果是keyBy后的元素)和窗口范围进行分组。具有相同键和窗口的元素的集合称为一个 pane。当触发器决定触发某个 pane 时,WindowFunction 就会被调用,以生成该 pane 的输出元素。
Flink 提供了几种通用的 WindowAssigner:
并且提供了基于时间(time)划分窗口和基于数据个数(count)划分窗口两种方式,大多数应用中会基于时间划分窗口。
上述数据分发器都会继承 WindowAssigner 抽象类。如果需要自己定制数据分发策略,则可以实现一个 class,继承 WindowAssigner。WindowAssigner 源码如下,对其中方法的作用做了注释。
@PublicEvolving public abstract class WindowAssigner<T, W extends Window> implements Serializable { private static final long serialVersionUID = 1L; /** * 将某个带有时间戳`timestamp`的元素`element`分配给一个或多个窗口,并返回窗口集合 */ public abstract Collection<W> assignWindows( T element, long timestamp, WindowAssignerContext context); /** * 返回WindowAssigner默认的 trigger */ public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); /** * 返回一个类型序列化器用来序列化窗口 */ public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig); /** * 是否是 event time */ public abstract boolean isEventTime(); /** * 提供给 WindowAssigner 的上下文,允许其查询当前处理时间。 */ public abstract static class WindowAssignerContext { /** 返回当前处理时间 */ public abstract long getCurrentProcessingTime(); } }
WindowAssigner会在 WindowOperator 中的 processElement()、onEventTime()、onProcessingTime() 方法使用,用于查看当前处理的元素属于哪些窗口,或者说应属于哪些 pane 中。可在笔记 Window 源码分析3 中详细查看。
代码位置:org.apache.flink.streaming.api.windowing.triggers.Trigger
功能:触发器决定何时对窗口的某一 pane 中的数据进行处理,以提交该 pane 的结果。
pane 是具有相同键和窗口的元素的集合。每个 pane 都有自己的 Trigger。
触发器不得在内部维护状态,因为它们可以被重新创建或用于不同的键。所有必要的状态都应使用 Trigger.TriggerContext 上的状态抽象来持久化。
当与 org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner 一起使用时,触发器必须从 canMerge() 返回 true,并且必须正确实现onMerge(OnMergeContext)方法。
常见的一些窗口对应的 Trigger 类型如下:
窗口类型 | Trigger | 触发时机 |
---|---|---|
EventTime | EventTimeTrigger | 当 Watermarker 超过 pane 对应的 EndTime |
ProcessingTime | ProcessingTimeTrigger | 当计算节点系统时钟超过 pane 对应的 EndTime |
GlobalWindow | NeverTrigger | 永不触发 |
上述触发器都会继承 Trigger 抽象类。如果需要自己定制触发器,则可以实现一个 class,继承 Trigger。
Trigger 部分源码如下,主要列举的重要的方法。注释中提到了 TriggerResult 和 TriggerContext,后面有对其的讲解。
@PublicEvolving public abstract class Trigger<T, W extends Window> implements Serializable { /** * 对 pane 中的每个元素调用该方法。 */ public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; /** * 当使用TriggerContext设置的处理时间计时器触发时调用。 */ public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; /** * 当使用TriggerContext设置的事件时间计时器触发时调用。 */ public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; /** * 当多个窗口被 WindowAssigner 合并为一个窗口时调用。这种情况会在 sessionWindow 中出现。 */ public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } ...... }
上面注释中提到了 TriggerContext。这是一个在 Trigger 中定义的一个接口。提供给触发器方法使用,允许注册定时器回调并处理状态。以下展示了其定义的方法(不包括已经弃用的方法),这里不做重点讲解。
public interface TriggerContext {
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
long getCurrentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
注释中还多次使用到 TriggerResult 类,这是一个枚举类,标识是否触发 pane (或者可以说窗口,前提是理解了 pane 的含义)的计算或其他行为。其代码如下。代码位置在:org.apache.flink.streaming.api.windowing.triggers.TriggerResult。
public enum TriggerResult { /** 不对采取任何行动 */ CONTINUE(false, false), /** 调用用户定义的窗口处理函数 windowFunction,并提交结果。 */ /** 这里需要注意一下,若窗口的操作为reduce、aggregate,WindowFunction是不操作数据的。这句话的详情在 Window源码分析4 中有讲解,有需要可自行查看*/ FIRE_AND_PURGE(true, true), /** * 调用用户定义的windowFunction,并提交结果。窗口不会被清除,所有元素都会保留。 */ FIRE(true, false), /** * pane中的所有元素都会被清除,窗口也会被丢弃。不调用用户定义的windowFunction,不提交结果。。 */ PURGE(false, true); // 后面的方法可自行看代码,一眼就知道功能 }
Trigger 会在 WindowOperator 中的 processElement()、onEventTime()、onProcessingTime() 方法使用,用于判断当前时刻是否触发窗口计算或其他行为。可在笔记 Window 源码分析3 中详细查看。
代码位置:org.apache.flink.streaming.api.windowing.evictors.Evictors
功能:Evictor 可以在调用 WindowFunction 之前/之后,以及在 Trigger 触发之后,从 pane 中移除元素。
pane 是具有相同键和窗口的元素的集合。
Evictor 不是必需的。根据应用功能要求选择是否实现并使用 Evictor。
用户若想实现 Evictor,可自行编写一个 class,继承 Evictor 接口。
Evictor 接口的代码很简单,主要是两个方法:evictBefore 和 evictAfter。
@PublicEvolving public interface Evictor<T, W extends Window> extends Serializable { /** * 在 WindowFunction(用户定义的窗口处理函数) 之前调用。 可选择删除当前处理的窗口的中的某些元素元素。 * 这里需要注意一下,若窗口的操作为reduce、aggregate,WindowFunction是不操作数据的。这句话的详情在 Window源码分析4 中有讲解,有需要可自行查看 */ void evictBefore( Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); /** * 在 WindowFunction 之后调用。 可选择删除当前处理的窗口的中的某些元素元素。 */ void evictAfter( Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); /** 为 Evictor 方法提供的上下文对象。包含一些辅助功能,不重要,这里不做讲解。 */ interface EvictorContext { long getCurrentProcessingTime(); MetricGroup getMetricGroup(); long getCurrentWatermark(); } }
Evictors 在哪里调用?在 EvictingWindowOperator 中的 processElement()、onEventTime()、onProcessingTime() 方法中,判断到当前时刻是否触发窗口计算时,会调用 emitWindowContents() 调用代码如下:
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
// 就是这一句
emitWindowContents(actualWindow, contents, evictingWindowState);
}
在 emitWindowContents 中,会有一句 userFunction.process(…),即调用用户定义的窗口处理函数 WindowFunction(前面代码注释中也多次说了,若窗口的操作为reduce、aggregate,WindowFunction 是不操作数据的。)。看一下 WindowFunction 代码,如下。可以看到 evictBefore 和 evictAfter 分别在调用 userFunction.process(…) 前后进行调用。
private void emitWindowContents( W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); /* 省略这里不关心的代码 */ evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); /* 省略这里不关心的代码 */ processContext.window = triggerContext.window; userFunction.process( triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector); evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); /* 省略这里不关心的代码 */ } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。