赞
踩
前篇中,我们已经初略讲解了Flink中的数量窗口与时间窗口。
无论是哪一种窗口,他们的作用都类似于计算器(计算数量、时间)仅仅只是让数据堆积(不会像默认的流处理,来一条处理一条),只有当满足触发计算时机的时候,便开始计算,比如五个数据才计算,或者五秒钟才计算一次…
时间窗口的核心,便是时间的定义,与数据窗口不同的是,我们只能定义时间窗口的时间范围,无法从定义上确定每个时间窗口元素的个数;比如我们定义了时间窗口五秒钟触发一次,但我们无法确定,上一个五秒与下一个五秒窗口中数据量一致…
那么时间窗口究竟如何使用呢?时间指的是什么时间呢?哎,让我们来一探究竟!
时间窗口中时间指的是什么时间呢?
Flink程序支持针对三种时间进行数据处理:EventTime、IngestionTime、ProcessingTime
EventTime:事件时间
即事件元素中本身的时间(通常指时间元素中含有某时间列)
IngestionTime:摄入时间
事件元素到达Flink的时间
ProcessingTime:处理时间
事件元素经过第一个Flink算子处理时的时间
1.12版本默认是根据EventTime
,之前版本默认是采用ProcessingTime
我们可以手动指定根据什么时间来处理事件
Ex:
渐渐的,摄入时间(IngestionTime)越来越不推荐使用了,实际上就生产而言,我们更多选择是根据业务采用不同的窗口分配器,选择根据事件时间还是处理时间进行计算。
官网示例
时间滚动窗口核心代码
时间间隔可以通过使用Time.milliseconds(x)
,Time.seconds(x)
, Time.minutes(x)
…等等指定窗口大小(毫秒、秒、分、时、天…)。
EX:设置了每隔五秒滚动一次,即每五秒触发一次窗口计算逻辑
TumblingProcessingTimeWindows.of(Time.seconds(5))
如果按照窗口区间来划分的话,将会是这样(假如第一个事件元素的处理时间为12点,那么每五秒滚动窗口生命周期将会按照如下进行)
即每五秒窗口执行一次,但需要注意的是,由于我们采用了KeyBy ,每个不同的KEY 均会有各自的窗口不断触发,但仍每个并行度的subTask只能同时处理一个KEY的数据。
代码示例
public class TumbleWindowFunction extends RichWindowFunction<Location,List<Location>, Integer,TimeWindow> { public TumbleWindowFunction() {} String uuid; /** * @param window 当前窗口 * @param locations 入站数据集合 * @param out 出站数据收集器 泛型为出站数据类型 */ @Override public void apply(Integer integer, TimeWindow window, Iterable<Location> locations, Collector<List<Location>> out) { out.collect(Lists.newArrayList(locations)); } @Override public void open(Configuration parameters) throws Exception { uuid = UUID.randomUUID().toString(); System.out.println(uuid + "窗口打开了"); } @Override public void close() throws Exception { System.out.println(uuid + "窗口关闭了"); } }
滚动窗口也是支持时间偏移量的
// TumblingEventTimeWindows.of(窗口大小, 偏移量) 例如窗口大小为5s,偏移时间为5s
TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(5))
注意:偏移量必须小于窗口大小
时间滑动窗口核心代码
// 设置窗口为滑动处理时间滑动窗口,且窗口大小为10s,滑动为5s 即每5秒,计算一次最近10秒的数据
SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
滑动窗口也支持偏移量设置
如果窗口是基于事件时间,在中国时区内,必须指定的偏移量Time.hours(-8)
。
有时候,我们的业务决定了我们必须以事件事件进行处理
比如,某用户在2021-05-24 11:59:28
进行了支付操作,由于某原因(网络延迟、数据量过大…)此条支付消息到Flink时已经 2021-05-24 12:01:10
了,如果我们现在要计算平台在2021-05-24 11:00-2021-05-24 12:00
这个时间段的支付总量,那么此条数据就应包括进来…如果按照处理时间计算,那么此条数据归属窗口则是2021-05-24 12:01:10
之内(后)了,就会导致2021-05-24 11:00-2021-05-24 12:00
这个时间段的支付总量漏计算了数据。
正是由于这些场景的存在,Flink推出了基于事件时间的处理窗口,根据事件本身的事件进行归属窗口分配,然后触发对应窗口计算
事件时间,就是事件元素中,本身携带着的一列时间属性(必须为1970后的时间戳)
模拟定位数据源产生
核心代码
指定窗口分配器为TumblingEventTimeWindows
,且窗口大小为10s
TumblingEventTimeWindows.of(Time.seconds(10))
启动Job发现…报错
完整错误信息:
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker).
Is the time characteristic set to 'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?
错误意思是当前时间没有时间戳标记,您需要将其切换为基于处理时间ProcessTime
进行作业或者是您忘记了设置Watermarks
这难道不是时间戳标记吗?为什么还是提示说找不到时间戳标记呢?
因为Flink不会识别事件中的时间列(避免属性中出现多个时间列无法选择),我们必须要手动指定事件元素的时间列属性
根据错误提示,我们便来玩上一玩
指定水印,设置事件时间列
dataStreamSource.assignTimestampsAndWatermarks();
我们先将DEMO跑起来,后边有watermaker-blog
详细讲解水印机制
EX:
SingleOutputStreamOperator<Location> watermarks = locationSource.
assignTimestampsAndWatermarks(WatermarkStrategy.
<Location>forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 设置事件时间为devTime属性
.withTimestampAssigner((event, timestamp) -> event.getDevTime()));
先放出设置后运算结果吧,WaterMaker后边会详细讲到以及验证!
时间窗口大小由size决定
时间窗口运行示例:
注意点:时间窗口大小为左闭右开属性,即窗口中,包含着12:00:05
的数据,但不含12:00:10
的数据
注意:一个事件窗口窗口中最大的时间(根据窗口的类型 ,ex:所有元素中 最大的摄入时间、处理时间、事件时间)为窗口结束毫秒时间戳-1
一旦应属于该窗口的第一个元素到达,就会创建一个窗口,并且当时间(事件时间、处理时间、摄入时间)超过其结束时间戳(可加上用户指定的时间)后 ,该窗口将被完全删除,Flink只会删除基于时间的窗口。
Ex:采用基于处理时间的开窗策略,该策略每5分钟创建一次不重叠(或翻滚)的窗口
现在第一个元素的处理时间为2021-05-26:12:00,那么Flink会创建一个2021-05-26 12:00 - 2021-05-26 12:05
的时间窗口,当处理时间大于等于12:05时,便会触发该窗口进行计算,且计算后将此窗口删除,后续在无法打开2021-05-26 12:00 - 2021-05-26 12:05
这个窗口
但在使用事件时间窗口时,元素可能会延迟到达,即Flink 用来跟踪事件时间进度的水印已经超过了元素所属窗口的结束时间戳。有关Flink 如何处理事件时间的更深入讨论,请参阅 事件时间,尤其是后期元素。
默认情况下,当水印超过窗口末尾时,将删除后期元素。但是,Flink 允许为窗口操作符指定最大允许延迟。Allowed lateness 指定元素在被丢弃之前可以延迟多少时间,其默认值为 0。 在 watermark 已经通过窗口末尾之后但在它通过窗口末尾之前到达的元素加上允许的延迟,仍然添加到窗口中。根据使用的触发器,延迟但未丢弃的元素可能会导致窗口再次触发。对于EventTimeTrigger
.
为了完成这项工作,Flink 会保持窗口的状态直到它们允许的延迟到期。一旦发生这种情况,Flink 会移除窗口并删除其状态,如窗口生命周期部分所述。
默认情况下,允许的延迟设置为0
。也就是说,到达水印之后的元素将被丢弃。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。