赞
踩
学习文档:Flink 官方文档 - DataStream API - 事件时间 - 内置 Watermark 生成器
学习笔记如下:
如果数据源中数据的时间戳升序排序,那么当前时间戳就可以充当 watermark,因为后续到达的数据的时间戳不会比当前的小。例如,每个并行数据源实例都只读取一个 Kafka 分区,并使用 Kafka 的消息时间戳,则可以保证每个数据源中数据的时间戳单调递增。
当满足使用并行数据源,且病性能数据源中的每个单分区数据源时间戳递增时,可以使用 Flink 的内置时间戳分配器 AscendingTimestampsWatermarks
:
例如:
WatermarkStrategy.forMonotonousTimestamps();
- 1
// flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
如果数据源中数据的事件戳不严格升序排序,但是 watermark 滞后于数据流中最大时间戳不超过一个固定的时间长度,则可以使用 Flink 的内置时间戳分配器 BoundedOutOfOrdernessWatermarks
;该生成器的参数 maxOutOfOrderness
代表在计算窗口时,允许元素被忽略的最小延迟时间:
例如:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
- 1
// flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
时间戳分配器 AscendingTimestampsWatermarks
继承自 BoundedOutOfOrdernessWatermarks
,是一种特殊的、固定延迟时间为 0 毫秒的最大固定延迟的时间戳分配器(即不存在延迟情况):
// flink-core/src/main/java/org/apache/flink/api/common/eventtime/AscendingTimestampsWatermarks.java
@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
/** Creates a new watermark generator with for ascending timestamps. */
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
时间戳分配器 BoundedOutOfOrdernessWatermarks
如下:
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
// ------------------------------------------------------------------------
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
该类使用 outOfOrdernessMillis
属性存储固定的延迟时间,使用 maxTimestamp
属性存储当前消息的最大时间戳。
当每个消息到达时,onEvent
会被调用,并更新当前消息的最大时间戳。
当每次 Flink 周期性地调用 onPeriodicEmit
时,会根据当前消息的最大时间戳 maxTimestamp
减去固定的延迟时间 outOfOrdernessMillis
并发出 watermark。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。