赞
踩
assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用时,直接用 DataStream 调用该方法即可,与普通的 transform 方法完全一样。DataStream<Event> stream = env.addSource(new ClickSource()); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy) { final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy); // match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship // and chain final int inputParallelism = getTransformation().getParallelism(); final TimestampsAndWatermarksTransformation<T> transformation = new TimestampsAndWatermarksTransformation<>( "Timestamps/Watermarks", inputParallelism, getTransformation(), cleanedStrategy); getExecutionEnvironment().addOperator(transformation); return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation); }
termarkStrategy
包含了时间戳分配器TimestampAssigner
和水位线生成器WatermarkGenerator
。 stream.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return null;
}
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return null;
}
});
WatermarkGenerator
接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()
。WatermarkOutput
,可以基于事件做各种操作onPeriodicEmit
:周期性调用的方法,可以由 WatermarkOutput
发出水位线。周期时间为处理时间,可以调用环境配置的setAutoWatermarkInterval()
方法来设置,默认为200ms。env.getConfig().setAutoWatermarkInterval(60 * 1000L);
WatermarkStrategy.forMonotonousTimestamps()
方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。将数据中的timestamp
字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。WatermarkStrategy. forBoundedOutOfOrderness()
方法就可以实现。这个方法需要传入一个 maxOutOfOrderness
参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。我们同样提取了timestamp
字段作为时间戳,并且以 5 秒的延迟时间创建了处理乱序流的水位线生成器。事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水位线生成器。BoundedOutOfOrdernessWatermarks
的源码中明显地看到:package com.atguigu.chapter06; import com.atguigu.chapter05.Event; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.time.Duration; public class WatermarkTest { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3300L), new Event("Bob", "./home", 3500L), new Event("Alice", "./prod?id=200", 3000L), new Event("Bob", "./prod?id=2", 3800L), new Event("Bob", "./prod?id=3", 4200L) ); // 有序的水位线生成 stream.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); // 乱序流水位线生成 stream.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 5s WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ).print(); env.execute(); } }
package com.atguigu.chapter06; import com.atguigu.chapter05.ClickSource; import com.atguigu.chapter05.Event; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; // 自定义水位线的产生 public class CustomWatermarkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.addSource(new ClickSource()) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print(); env.execute(); } public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> { @Override public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段 } }; } @Override public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new CustomPeriodicGenerator(); } } public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> { private Long delayTime = 5000L; // 延迟时间 private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳 @Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { // 每来一条数据就调用一次 maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳 } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发射水位线,默认 200ms 调用一次 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } } }
onPeriodicEmit()
发出水位线。自定义的断点式水位线生成器代码如下:public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> { @Override public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) { // 只有在遇到特定的 itemId 时,才发出水位线if (r.user.equals("Mary")) { output.emitWatermark(new Watermark(r.timestamp - 1)); } } @Override public void onPeriodicEmit(WatermarkOutput output) { // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线 } }
onEvent()
中判断当前事件的 user 字段,只有遇到Mary这个特殊的值时,才调用output.emitWatermark()
发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。assignTimestampsAndWatermarks
方法来生成水位线了 。 在自定义数据源中生成水位线和在程 序中使用assignTimestampsAndWatermarks
方法生成水位线二者只能取其一。import com.atguigu.chapter05.Event;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.Calendar;
import java.util.Random;
public class EmitWatermarkInSourceFunction {
public static void main(String[] args) throws
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。