当前位置:   article > 正文

Flink WindowOperator 源码分析_flink operator类图

flink operator类图

0x1 摘要

WindowOperator可以说是Flink窗口功能非常核心核心的类,是窗口功能源码的一条主线,延着这条主线去慢慢看源码会轻松很多。注:此文基于Flink 1.4.2 版本源码。

0x2 WindowOperator 类结构分析

先来看一下类结构图,可以使用idea来生成类图,下图经过稍微加工,去掉一些不重要类的结构图:
ca2a6732_bc3a_444f_b6d0_7aa927d16def
我们核心重点关注以下一个接口:

  • OneInputStreamOperator
  1. public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
  2. /**
  3. * Processes one element that arrived at this operator.
  4. * This method is guaranteed to not be called concurrently with other methods of the operator.
  5. */
  6. void processElement(StreamRecord<IN> element) throws Exception;
  7. /**
  8. * Processes a {@link Watermark}.
  9. * This method is guaranteed to not be called concurrently with other methods of the operator.
  10. *
  11. * @see org.apache.flink.streaming.api.watermark.Watermark
  12. */
  13. void processWatermark(Watermark mark) throws Exception;
  14. void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
  15. }

0x3 OneInputStreamOperator 具体实现分析

此接口三个方法WindowOperator类只实现了processElement方法,其余两个方法实现全部在AbstractStreamOperator抽象类中,此文不去讲解,此文重点介绍processElement方法,这个方法也是最重要的方法。

从方法注释可以看出,每一条消息过来都会调用此方法,此方法主体很清晰,看下面条件判断语句:

  1. final Collection<W> elementWindows = windowAssigner.assignWindows(
  2. element.getValue(), element.getTimestamp(), windowAssignerContext);
  3. //if element is handled by none of assigned elementWindows
  4. boolean isSkippedElement = true;
  5. final K key = this.<K>getKeyedStateBackend().getCurrentKey();
  6. if (windowAssigner instanceof MergingWindowAssigner) {
  7. ...
  8. } else {
  9. ...
  10. }
  11. // side output input event if
  12. // element not handled by any window
  13. // late arriving tag has been set
  14. // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
  15. if (isSkippedElement && isElementLate(element)) {
  16. if (lateDataOutputTag != null){
  17. sideOutput(element);
  18. } else {
  19. this.numLateRecordsDropped.inc();
  20. }
  21. }

分为合并窗口分配器和非合并窗口分配器,我们平时使用的TumblingProcessingTimeWindows都属于非合并窗口,今天就介绍非合并窗口,即代码中else逻辑。
原代码如下:

  1. for (W window: elementWindows) {
  2. // drop if the window is already late
  3. if (isWindowLate(window)) {
  4. continue;
  5. }
  6. isSkippedElement = false;
  7. windowState.setCurrentNamespace(window);
  8. windowState.add(element.getValue());
  9. triggerContext.key = key;
  10. triggerContext.window = window;
  11. TriggerResult triggerResult = triggerContext.onElement(element);
  12. if (triggerResult.isFire()) {
  13. ACC contents = windowState.get();
  14. if (contents == null) {
  15. continue;
  16. }
  17. emitWindowContents(window, contents);
  18. }
  19. if (triggerResult.isPurge()) {
  20. windowState.clear();
  21. }
  22. registerCleanupTimer(window);
  23. }

第一步:判断窗口是否延迟,如果延迟直接踩过,判断延迟的逻辑相对简单可自行查看源码
第二步:设置isSkippedElement标志位,此标志位等于false说明,当前元素可以匹配到窗口,true说明匹配不到窗口,后面会有处理逻辑
第三步:下面四行代码是一些状态设置
第四步:根据当前元素返回一个触发器结果
第五步:判断触发器结果是否需要执行,如果需要执行,则调用emitWindowContents方法执行
第六步:判断是否需要清理窗口状态信息
第七步:注册清除定时器

  1. protected void registerCleanupTimer(W window) {
  2. long cleanupTime = cleanupTime(window);
  3. if (cleanupTime == Long.MAX_VALUE) {
  4. // don't set a GC timer for "end of time"
  5. return;
  6. }
  7. if (windowAssigner.isEventTime()) {
  8. triggerContext.registerEventTimeTimer(cleanupTime);
  9. } else {
  10. triggerContext.registerProcessingTimeTimer(cleanupTime);
  11. }
  12. }

首先计算清除时间:

  1. private long cleanupTime(W window) {
  2. if (windowAssigner.isEventTime()) {
  3. long cleanupTime = window.maxTimestamp() + allowedLateness;
  4. return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
  5. } else {
  6. return window.maxTimestamp();
  7. }
  8. }

如果是事件时间则需要算上允许延迟时间,调用triggerContext注册Time

注:processElement方法开头代码

  1. final Collection<W> elementWindows = windowAssigner.assignWindows(
  2. element.getValue(), element.getTimestamp(), windowAssignerContext);

这段代码是窗口的分配,后面单独文章来分析窗口分配实现原理。

0x4 结束语

整个WindowOperator核心流程代码不多,但代码量还是比较大,里面涉及到窗口分配、时间触发器,每个点都涉及比较多的源码,不能一次性去讲完,需要慢慢去挖。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号