当前位置:   article > 正文

flink DataStream API(三)事件时间-生成水印_flink水印时间戳案例

flink水印时间戳案例

生成水印

在本节中,您将了解Flink为处理事件时间戳和水印提供的API。有关事件时间、处理时间和摄取时间的介绍,请参阅introduction to event time的相关介绍。

水印策略介绍

为了处理事件时间,flink需要知道每个事件的时间戳,这就意味着,流中的没个元素都需要有它自己的事件时间戳。通常是通过使用TimestampAssigner从元素的某些字段中提取时间戳来实现的。

时间戳分配与生成水印密切相关,水印告诉系统事件时间的进度。您可以通过指定水印生成器来配置它。

Flink API需要一个同时包含TimestampAssignerWatermarkGeneratorWatermarkStrategy。许多常见的策略都是现成的,作为水印策略的静态方法,但用户也可以在需要时构建自己的策略。许多常用策略作为 WatermarkStrategy 中的静态方法开箱即用,但用户也可以在需要时构建自己的策略。

为了完整性起见,下面是接口:

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>,
            WatermarkGeneratorSupplier<T>{

    /**
     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
     * strategy.
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

如前所述,您通常不会自己实现这个接口,而是使用 WatermarkStrategy 上的静态帮助器方法来实现常见的水印策略。例如,通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);
  • 1
  • 2
  • 3

指定一个TimestampAssigner是可选的,在大多数情况下,实际上您并不希望指定一个TimestampAssigner。例如,当使用Kafka或Kinesis时,你会直接从Kafka/Kinesis记录中获得时间戳。

注意:时间戳和水印都指定为自 Java 纪元 1970-01-01T00:00:00Z 以来的毫秒数。

使用水印策略

在 Flink 应用中有两个地方可以使用 WatermarkStrategy

  • 直接在源上使用
  • 在非源操作之后使用。

第一个选项更可取,因为它允许源利用水印中的分片/分区/分割的逻辑。然后源可以生成更加准确的时间水印。

直接在源上指定 WatermarkStrategy 通常意味着您必须使用特定源。请参阅Watermark Strategies and the Kafka Connector 以了解其在 Kafka 连接器上的工作方式以及有关每个分区水印如何在那里工作的更多详细信息。

第二个选项(在任意操作后设置 WatermarkStrategy)仅在您无法直接在源上设置策略时使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

以这种方式使用 WatermarkStrategy 会获取一个流并生成一个带有时间戳元素和水印的新流。如果原始流已经有时间戳或水印,时间戳分配器会覆盖它们。

处理空闲源

如果其中一个输入的分割/分区/分片在一段时间内没有任何事件,这意味着水印生成器不会获取任何新的水印信息。我们称之为空闲输入或空闲源。这是一个问题,因为您的其他分区可能仍然存在事件。在这种情况下,水印将被阻止向下游继续传播,因为向下游传播的水印是所有并行水印中的最小值,由于上游水印被阻止了,所以当前阶段永远就不会获取所有水印,这样最小水印就不会被计算出来,所以flink程序就不会继续向下执行。

要处理这个问题,您可以使用WatermarkStrategy来检测空闲状态并将输入标记为空闲状态。WatermarkStrategy为此提供了一个方便的解决策略:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));
  • 1
  • 2
  • 3

编写 WatermarkGenerators

TimestampAssigner 是一个从事件中提取字段值的简单函数,因此我们不需要详细查看它们。另一方面,WatermarkGenerator 的编写有点复杂,我们将在接下来的两节中介绍如何做到这一点。以下是 WatermarkGenerator 接口:

/**
 * The {@code WatermarkGenerator} generates watermarks either based on events or
 * periodically (in a fixed interval).
 *
 * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
 * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * Called for every event, allows the watermark generator to examine 
     * and remember the event timestamps, or to emit a watermark based on
     * the event itself.
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * Called periodically, and might emit a new watermark, or not.
     *
     * <p>The interval in which this method is called and Watermarks 
     * are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     */
    void onPeriodicEmit(WatermarkOutput output);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

有两种不同风格的水印生成方式:周期生成和标点生成。

周期生成器通常通过onEvent()监视传入事件,然后在框架调用onPeriodicEmit()时发出水印。

标点生成器通常通过onEvent()监视传入事件,并等待流中带有特殊标记或者标记符号的事件,当监视到这些事件之一,它会立即生成水印,通常,标点生成器不会从 onPeriodicEmit() 发出水印。

接下来,我们将看看如何为每种风格实现生成器。

编写周期 WatermarkGenerator

周期生成器观察流事件并周期性的生成时间水印。

生成水印的间隔(每 n 毫秒)通过 ExecutionConfig.setAutoWatermarkInterval(...) 定义。每次都会调用生成器的 onPeriodicEmit() 方法,如果返回的水印为非空且大于前一个水印,则会发出新的水印。

在这里,我们展示了两个使用周期性水印生成器生成水印的简单示例。注意,Flink附带了BoundedOutOfOrdernessWatermarks,这是一个与下面所示的BoundedOutOfOrdernessGenerator工作类似的水印生成器。你可以在这里读到如何使用它。

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * This generator generates watermarks that are lagging behind processing time 
 * by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // don't need to do anything because we work on processing time
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

编写标点WatermarkGenerator

标点水印生成器将观察事件流并在看到带有水印信息的特殊元素时发出水印

这就是如何实现一个加标点的生成器,每当事件表明它带有某个标记时,它就会发出水印:

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // don't need to do anything because we emit in reaction to events above
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

注意:可以在每个单独的事件上生成水印。但是,由于每个水印都会引起下游的一些计算,过多的水印会降低性能。

水印策略和 Kafka 连接

当使用Apache Kafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(升序时间戳或有界无序)。然而,当从 Kafka 消费流时,多个分区通常会被并行消费,从而将来自分区的事件交织在一起,并破坏了每个分区的模式(这是Kafka的客户端工作方式中固有的)

在这种情况下,您可以使用 Flink 的 Kafka 分区感知水印生成。使用该功能,每个消费 Kafka 分区的消费者在其内部生成水印,并且每个分区水印的合并方式与水印在shuffle时的合并方式相同。

例如,如果每个 Kafka 分区的事件时间戳严格递增,则使用递增时间戳水印生成器生成每个分区的水印将产生完美的整体水印。

下面的插图展示了如何使用每个kafka分区的水印生成,以及在这种情况下水印如何通过流数据传播。

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
        WatermarkStrategy.
                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

DataStream<MyType> stream = env.addSource(kafkaSource);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

操作者如何处理水印

一般来说,操作员在将给定的水印转发给下游之前,需要对其进行处理。例如,WindowOperator 将首先评估所有应该被触发的窗口,由水印触发所产生的所有事件全部被输出后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素都将在水印之前发送。

同样的规则也适用于TwoInputStreamOperator,但是,在这种情况下,操作符的当前水印被定义为其两个输入的最小值。

已弃用的 AssignerWithPeriodicWatermarks 和 ssignerWithPunctuatedWatermarks

在介绍 WatermarkStrategyTimestampAssignerWatermarkGenerator 的抽象之前,Flink 使用了 AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks。您仍然会在 API 中看到它们,但建议使用新接口,因为它们提供了更清晰的关注点分离,并且统一了水印生成的周期性和标点样式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/928074
推荐阅读
相关标签
  

闽ICP备14008679号