赞
踩
watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp,例如1472693399700(2016-09-01 09:29:59.700),而这条数据的watermark时间则可能是:
watermark(1472693399700) = 1472693396700(2016-09-01 09:29:56.700)
这条数据的watermark时间是什么含义呢?即:timestamp小于1472693396700(2016-09-01 09:29:56.700)的数据,都已经到达了。
图中蓝色虚线和实线代表着watermark的时间。
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后,应用简单的map或者filter操作,然后再生成watermark。
生成watermark的方式主要有2大类:
(1):With Periodic Watermarks
(2):With Punctuated Watermarks
第一种可以定义一个最大允许乱序的时间,这种情况应用较多。
我们主要来围绕Periodic Watermarks来说明,下面是生成periodic watermark的方法:
/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L; // 3.5 seconds
var currentMaxTimestamp: Long;
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}

程序中有一个extractTimestamp方法,就是根据数据本身的Event time来获取;还有一个getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness来获取的。
这里的概念有点抽象,下面我们结合数据,在window中来实际演示下每个element的timestamp和watermark是多少,以及何时触发window。
4.1、程序说明
我们从socket接收数据,然后经过map后立刻抽取timetamp并生成watermark,之后应用window来看看watermark和event time如何变化,才导致window被触发的。
4.2、代码如下
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala._
import org.apache.flink
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。