赞
踩
一步一个脚印,一天一道面试题。
感觉我现在很难把水印描述的很好,但,完成比完美更重要。后续我再补充。各位如果有什么建议或补充也欢迎留言。(已更新1)
在实时处理任务时,由于网络延迟,人工异常,各种问题,数据往往会出现乱序,不按照我们的预期到达处理框架。
WaterMark 水印,就是为了一定程度的解决数据,延迟乱序问题的。
使用 WaterMark 一般有以下几个步骤:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 分配时间戳和水位线
DataStream<Tuple2<Long, Integer>> withTimestampsAndWatermarks = parsedStream
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.f0));
水位线是插入到数据流中的一个标记,可以认为是一个特殊数据。
水位线主要的内容是一个时间戳,用来表示当前事件时间的进展。
水位线是基于数据的时间戳生成的。
水位线必须**单调递增,**以确保任务的时间时间时钟一直向前推进。
水位线可以设置延迟,来尽量保证正确处理乱序数据。
一个水位线 Watermark (t), 表示在当前流中事件时间已经达到了时间戳 t,这代表 t 之前的所有数据都到齐了,之后不会出现在时间戳 (t) 之前的数据。出现了在 t 之前的数据就会被抛弃不处理。
话不多说,直接给个 Watermark 水印样例代码。
public class SimpleWatermarkExample { public static void main(String[] args) throws Exception { // 设置流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从 socket 文本流接收数据 DataStream<String> input = env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1)); // 解析输入的数据 DataStream<Tuple2<Long, Integer>> parsedStream = input .map(new MapFunction<String, Tuple2<Long, Integer>>() { @Override public Tuple2<Long, Integer> map(String value) throws Exception { String[] parts = value.split(","); return new Tuple2<>(Long.parseLong(parts[0]), Integer.parseInt(parts[1])); } }); // 分配时间戳和水位线 DataStream<Tuple2<Long, Integer>> withTimestampsAndWatermarks = parsedStream .assignTimestampsAndWatermarks(WatermarkStrategy .<Tuple2<Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> element.f0)); // 使用窗口函数统计每10秒内的最大值 DataStream<String> maxValues = withTimestampsAndWatermarks .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))) .apply(new WindowFunction<Tuple2<Long, Integer>, String, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Tuple2<Long, Integer>> values, Collector<String> out) throws Exception { int maxValue = Integer.MIN_VALUE; for (Tuple2<Long, Integer> value : values) { maxValue = Math.max(maxValue, value.f1); } out.collect("Window: " + window + " Max Value: " + maxValue); } }); // 打印结果 maxValues.print(); // 执行程序 env.execute("Simple Flink Watermark Example"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。