赞
踩
在Flink的流式操作中, 会涉及不同的时间概念
处理时间是指的执行操作的各个设备的时间
对于运行在处理时间上的流程序,所有的基于时间的操作(比如时间窗口)都是使用的设备时钟。例如一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据。假设应用程序在 9:15am分启动,第1个小时窗口将会包含9:15am到10:00am所有的数据,然后下个窗口是10:00am-11:00am,以此类推。
处理时间是最简单时间语义,数据流和设备之间不需要做任何的协调。他提供了最好的性能和最低的延迟。但在分布式和异步的环境下,处理时间没有办法保证确定性,容易受到数据传递速度的影响: 事件的延迟和乱序。
在使用窗口的时候,如果使用处理时间,就指定时间分配器为处理时间分配器
事件时间是指的这个事件发生的时间.
在event进入Flink之前,通常被嵌入到了event中,一般作为这个event的时间戳存在。
在事件时间体系中,时间的进度依赖于数据本身,和任何设备的时间无关。事件时间程序必须制定如何产生Event Time Watermarks(水印)。在事件时间体系中,水印是表示时间进度的标志(作用就相当于现实时间的时钟)。
在理想情况下,不管事件时间何时到达或者他们的到达的顺序如何,事件时间处理将产生完全一致且确定的结果。事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可使用性。
假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有事件时间戳的所有记录,该记录落入该小时,无论它们到达的顺序或处理时间。
在使用窗口的时候,如果使用事件时间,就指定时间分配器为事件时间分配器。
支持event time的流式处理框架需要一种能够测量event time 进度的方式。比如一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间,从而在程序中去关闭这个窗口。
事件时间可以不依赖处理时间来表示时间的进度。例如在程序中, 即使处理时间和事件时间有相同的速度,事件时间可能会轻微的落后处理时间。另外一方面,使用事件时间可以在几秒内处理已经缓存在Kafka中多周的数据,这些数据可以照样被正确处理,就像实时发生的一样能够进入正确的窗口。
这种在Flink中去测量事件时间的进度的机制就是 watermark(水印)。watermark作为数据流的一部分在流动,并且携带一个时间戳。
一个Watermark(t)表示在这个流里面事件时间已经到了时间t,意味着此时,流中不应该存在这样的数据: 他的时间戳t2<=t (时间比较旧或者等于时间戳)。
在下图中,事件是有序的,watermark是流中一个简单的周期性的标记
在下图中,按照他们时间戳来看,这些事件是乱序的,则watermark对于这些乱序的流来说至关重要。
通常情况下,水印是一种标记,是流中的一个点,所有在这个时间戳(水印中的时间戳)前的数据应该已经全部到达。一旦水印到达了算子,则这个算子会提高他内部的时钟的值为这个水印的值。
在 Flink 中,水印由应用程序开发人员生成,这通常需要对相应的领域有 一定的了解。完美的水印永远不会错:时间戳小于水印标记时间的事件不会再出现。在特殊情况下(例如非乱序事件流),最近一次事件的时间戳就可能是完美的水印。
启发式水印则相反,它只估计时间,因此有可能出错, 即迟到的事件 (其时间戳小于水印标记时间)晚于水印出现。针对启发式水印,Flink 提供了处理迟到元素的机制。
设定水印通常需要用到领域知识。举例来说,如果知道事件的迟到时间不会超过 5 秒,就可以将水印标记时间设为收到的最大时间戳减去 5 秒。另 一种做法是,采用一个 Flink 作业监控事件流,学习事件的迟到规律,并以此构建水印生成模型。
代码示例:
package com.zenitera.bigdata.watermark; import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.List; public class Flink01_Watermark_01 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env.getConfig().setAutoWatermarkInterval(2000); env .socketTextStream("localhost", 6666) .map(line -> { String[] data = line.split(","); return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .assignTimestampsAndWatermarks(new WatermarkStrategy<WaterSensor>() { @Override public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new NewWaterMark(); } } .withTimestampAssigner((ele, ts) -> ele.getTs()) ) .keyBy(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String key, Context ctx, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { List<WaterSensor> list = BigdataUtil.toList(elements); String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd()); out.collect(key + " " + stt + " " + edt + " " + list); } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } public static class NewWaterMark implements WatermarkGenerator<WaterSensor> { long maxTs = Long.MIN_VALUE + 3000 + 1; // 每来一条数据执行一次 @Override public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) { System.out.println("执行了-onEvent"); maxTs = Math.max(maxTs, eventTimestamp); output.emitWatermark(new Watermark(maxTs - 3000)); } // 周期性的值: 200ms执行一次 @Override public void onPeriodicEmit(WatermarkOutput output) { System.out.println("执行了-onPeriodicEmit"); output.emitWatermark(new Watermark(maxTs - 3000)); } } }
代码示例:
package com.zenitera.bigdata.watermark; import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.time.Duration; import java.util.List; public class Flink01_Watermark_02 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env.getConfig().setAutoWatermarkInterval(2000); env .socketTextStream("localhost", 6666) .map(line -> { String[] data = line.split(","); return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); } ) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((ele, ts) -> ele.getTs()) // 当某个并行度水印超过5s没有更新, 则以其此为准 .withIdleness(Duration.ofSeconds(5)) ) .keyBy(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { @Override public void process(String key, Context ctx, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { List<WaterSensor> list = BigdataUtil.toList(elements); String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd()); out.collect(key + " " + stt + " " + edt + " " + list); } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
多并行度的条件下,向下游传递WaterMark的时候,总是以最小的那个WaterMark为准。
已经添加了wartemark之后,但仍有数据会迟到的情况发生,Flink的窗口,也允许迟到数据。
当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,则触发一次这条数据所在窗口计算(增量计算)。
那么什么时候会真正的关闭窗口呢? wartermark 超过了窗口结束时间+等待时间。
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
注意: 允许迟到只能运用在Event time上
代码示例:
package com.zenitera.bigdata.watermark; import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.time.Duration; import java.util.List; public class Flink01_Watermark_03 { public static void main(String[] args) { Configuration conf = new Configuration(); conf.setInteger("rest.port", 2000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env.getConfig().setAutoWatermarkInterval(2000); env .socketTextStream("localhosst", 6666) .map(line -> { String[] data = line.split(","); return new WaterSensor( data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .assignTimestampsAndWatermarks( WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((ele, ts) -> ele.getTs()) ) .keyBy(WaterSensor::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 允许迟到: 当到了窗口的关闭时间,先对窗口内的元素做计算,窗口不管. 过2s后窗口真正的关闭 .allowedLateness(Time.seconds(2)) .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { // 在允许迟到期间, 每来一个属于这个窗口的元素, 这个方法就会执行 @Override public void process(String key, Context ctx, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { List<WaterSensor> list = BigdataUtil.toList(elements); String stt = BigdataUtil.toDateTime(ctx.window().getStart()); String edt = BigdataUtil.toDateTime(ctx.window().getEnd()); out.collect(key + " " + stt + " " + edt + " " + list); } }) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。