当前位置:   article > 正文

7、Flink 自定义 WaterMarkGenerator 案例

7、Flink 自定义 WaterMarkGenerator 案例

1、MyWaterMarkWatermarkGeneratorPeriodic
watermark 生成器场景:数据源在一定程度上乱序,即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。

class MyBoundedOutOfOrdernessGenerator implements WatermarkGenerator<_01_MyEvent> {
    // 3 秒
    private final long maxOutOfOrderness = 3000;

    private long currentMaxTimestamp;

    // 每到达一条数据执行一次,获取当前的最大时间戳
    @Override
    public void onEvent(_01_MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
        System.out.println("当前的数据ID为=>" + event.getId() + ",currentMaxTimestamp=>" + currentMaxTimestamp);
    }

    // 每到达一个周期触发一次,下发 Watermark
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发出的 watermark = 当前最大时间戳 - 最大乱序时间
        Watermark watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1);
        output.emitWatermark(watermark);
        System.out.println("下发的 Watermark 为=>" + watermark.getTimestamp());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

数据输入与输出案例

  输入输出demo数据演示如下
 
  下发的 Watermark 为=>-3001
 
  1,a,1714028400000
 
  res=>(1,a,1714028400000,-3001)
  (1,a,1714028400000,-3001)
  当前的数据ID为=>1,currentMaxTimestamp=>1714028400000
  下发的 Watermark 为=>1714028396999
 
  1,b,1714028410000
 
  res=>(1,b,1714028410000,1714028396999)
  (1,b,1714028410000,1714028396999)
  当前的数据ID为=>1,currentMaxTimestamp=>1714028410000
  下发的 Watermark 为=>1714028406999
 
  1,c,1714028410001
 
  res=>(1,c,1714028410001,1714028406999)
  (1,c,1714028410001,1714028406999)
  当前的数据ID为=>1,currentMaxTimestamp=>1714028410001
  下发的 Watermark 为=>1714028407000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

2、MyTimeLagWatermarkGenerator
生成器生成的 watermark 滞后于处理时间固定量,它假定元素会在有限延迟后到达 Flink。

class MyTimeLagWatermarkGenerator implements WatermarkGenerator<_01_MyEvent> {
    // 3 秒
    private final long maxTimeLag = 3000;

    @Override
    public void onEvent(_01_MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // 处理时间场景下,不需要实现 onEvent
        System.out.println("处理时间场景下,不需要实现 onEvent");
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        Watermark watermark = new Watermark(System.currentTimeMillis() - maxTimeLag);
        output.emitWatermark(watermark);
        System.out.println("下发的 Watermark 为=>" + watermark.getTimestamp());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

数据输入与输出案例

  下发的 Watermark 为=>1714284213511
 
  1,a,1714028400000
 
  res=>(1,a,1714028400000,1714284230336)
  (1,a,1714028400000,1714284230336)
  处理时间场景下,不需要实现 onEvent
  下发的 Watermark 为=>1714284230540
 
  1,b,1714028410000
 
  res=>(1,b,1714028410000,1714284254960)
  (1,b,1714028410000,1714284254960)
  处理时间场景下,不需要实现 onEvent
  下发的 Watermark 为=>1714284255166
 
  1,c,1714028410001
 
  res=>(1,c,1714028410001,1714284266639)
  (1,c,1714028410001,1714284266639)
  处理时间场景下,不需要实现 onEvent
  下发的 Watermark 为=>1714284266846
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3、完整测试用例

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class _01_MyWaterMarkWatermarkGeneratorPeriodic {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 设置水位线生成的时间间隔
        env.getConfig().setAutoWatermarkInterval(2000L);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);
        source.map(new MapFunction<String, _01_MyEvent>() {
                    @Override
                    public _01_MyEvent map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new _01_MyEvent(Integer.parseInt(fields[0])
                                , fields[1]
                                , Long.parseLong(fields[2]));
                    }
                    // 分配水位线策略
                }).assignTimestampsAndWatermarks(new WatermarkStrategy<_01_MyEvent>() {
                    @Override
                    public WatermarkGenerator<_01_MyEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
//                        return new MyBoundedOutOfOrdernessGenerator();
                        return new MyTimeLagWatermarkGenerator();
                    }
                    // 分配时间戳字段
                }.withTimestampAssigner(new SerializableTimestampAssigner<_01_MyEvent>() {
                    @Override
                    public long extractTimestamp(_01_MyEvent element, long recordTimestamp) {
                        return element.getEventTime();
                    }
                }))
                .process(new ProcessFunction<_01_MyEvent, Tuple4<Integer,String,Long,Long>>() {
                    @Override
                    public void processElement(_01_MyEvent value, ProcessFunction<_01_MyEvent, Tuple4<Integer, String, Long, Long>>.Context ctx, Collector<Tuple4<Integer, String, Long, Long>> out) throws Exception {
                        Tuple4<Integer, String, Long, Long> res = new Tuple4<>(value.getId(), value.getName(), value.getEventTime(), ctx.timerService().currentWatermark());
                        System.out.println("res=>"+res);
                        out.collect(res);
                    }
                })
                .print();

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

用到的 pojo 类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class _01_MyEvent implements Serializable {
    private Integer id;
    private String name;
    private Long eventTime;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/541899
推荐阅读
相关标签
  

闽ICP备14008679号