当前位置:   article > 正文

Flink|《Flink 官方文档 - DataStream API - 事件时间 - 内置 Watermark 生成器》学习笔记 + 源码分析_java flink registerdatastream 定义事件时间

java flink registerdatastream 定义事件时间

学习文档:Flink 官方文档 - DataStream API - 事件时间 - 内置 Watermark 生成器

学习笔记如下:


单调递增时间戳分配器(Monotonously Increasing Timestamps)

如果数据源中数据的时间戳升序排序,那么当前时间戳就可以充当 watermark,因为后续到达的数据的时间戳不会比当前的小。例如,每个并行数据源实例都只读取一个 Kafka 分区,并使用 Kafka 的消息时间戳,则可以保证每个数据源中数据的时间戳单调递增。

当满足使用并行数据源,且病性能数据源中的每个单分区数据源时间戳递增时,可以使用 Flink 的内置时间戳分配器 AscendingTimestampsWatermarks

例如:

WatermarkStrategy.forMonotonousTimestamps();
  • 1
// flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
    return (ctx) -> new AscendingTimestampsWatermarks<>();
}
  • 1
  • 2
  • 3
  • 4
  • 5

数据之间存在最大固定延迟的时间戳分配器

如果数据源中数据的事件戳不严格升序排序,但是 watermark 滞后于数据流中最大时间戳不超过一个固定的时间长度,则可以使用 Flink 的内置时间戳分配器 BoundedOutOfOrdernessWatermarks;该生成器的参数 maxOutOfOrderness 代表在计算窗口时,允许元素被忽略的最小延迟时间:

例如:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
  • 1
// flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java

static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
    return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
  • 1
  • 2
  • 3
  • 4
  • 5

源码解析

时间戳分配器 AscendingTimestampsWatermarks 继承自 BoundedOutOfOrdernessWatermarks,是一种特殊的、固定延迟时间为 0 毫秒的最大固定延迟的时间戳分配器(即不存在延迟情况):

// flink-core/src/main/java/org/apache/flink/api/common/eventtime/AscendingTimestampsWatermarks.java

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

    /** Creates a new watermark generator with for ascending timestamps. */
    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

时间戳分配器 BoundedOutOfOrdernessWatermarks 如下:

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

该类使用 outOfOrdernessMillis 属性存储固定的延迟时间,使用 maxTimestamp 属性存储当前消息的最大时间戳。

当每个消息到达时,onEvent 会被调用,并更新当前消息的最大时间戳。

当每次 Flink 周期性地调用 onPeriodicEmit 时,会根据当前消息的最大时间戳 maxTimestamp 减去固定的延迟时间 outOfOrdernessMillis 并发出 watermark。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/928071
推荐阅读
相关标签
  

闽ICP备14008679号