赞
踩
对于流式数据处理,最大的特点是数据上具有时间的属性特征,Flimk 根据时间产生的位置不同,将时间区分为三种时间语义,分别为事件生成时间(Event Time)、事件接入时间(Ingestion Time)和事件处理时间(Processing Time)。
在 Flink 中默认情况下使用是 Process Time 时间语义,如果用户选择使用 Event Time或 者 Ingestion Time 语 义 , 则 需 要 在 创 建 的 StreamExecutionEnvironment 中 调 用setStreamTimeCharacteristic() 方 法 设 定 系 统 的 时 间 概 念 , 如 下 代 码 使 用TimeCharacteristic.EventTime 作为系统的时间语义:
//设置使用EventTime
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置使用IngestionTime
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
注意:但是上面的代码还没有指定具体的时间到底是什么值,所以后面还有代码需要设置!
在使用 EventTime 处理 Stream 数据的时候会遇到数据乱序的问题,流处理从 Event(事件)产生,流经 Source,再到 Operator,这中间需要一定的时间。虽然大部分情况下,传输到 Operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因而导致乱序的产生,特别是使用 Kafka 的时候,多个分区之间的数据无法保证有序。因此,在进行 Window 计算的时候,不能无限期地等下去,必须要有个机制来保证在特定的时间后,必须触发 Window 进行计算,这个特别的机制就是 Watermark(水位线)。Watermark 是用于处理乱序事件的。
在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据(全部)到达 Flink 系统,或者在乱序及延迟到达时,也能够像预期一样计算出正确并且连续的结果。当任何 Event 进入到 Flink系统时,会根据当前最大事件时间产生 Watermarks 时间戳。那么 Flink 是怎么计算 Watermak 的值呢?
Watermark = = 进入 Flink 的最大的事件时间( mxtEventTime )— 指定的延迟时间(t)
那么有 Watermark 的 Window 是怎么触发窗口函数的呢?
如果有窗口的停止时间等于或者小于 e maxEventTime – t t (当时的 warkmark ),那么这个窗口被触发执行。
注意:Watermark 本质可以理解成一个延迟触发机制。
Watermark 的使用存在三种情况:
1.4.1有序数据流中引入 Watermark 和 EventTime
对于有序的数据,代码比较简洁,主要需要从源 Event 中抽取 EventTime。
//读取文件数据
val data = streamEnv.socketTextStream("hadoop101",8888)
.map(line=>{
var arr =line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
//根据EventTime有序的数据流
data.assignAscendingTimestamps(_.callTime)
//StationLog对象中抽取EventTime就是callTime属性
1.4.2乱序序数据流中引入 Watermark 和 EventTime
对于乱序数据流,有两种常见的引入方法:周期性和间断性。
With Periodic(周期性的) Watermark
周期性地生成 Watermark 的生成,默认是 100ms。每隔 N 毫秒自动向流里注入一个Watermark,时间间隔由 streamEnv.getConfig.setAutoWatermarkInterval()决定。最简单的写法如下:
//读取文件数据
val data = streamEnv.socketTextStream("hadoop101",8888)
.map(line=>{
var arr =line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
//如果EventTime是乱序的,需要考虑一个延迟时间t
//当前代码设置的延迟时间为3秒
data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(3)) //延迟时间
{
override def extractTimestamp(element: StationLog) = {
element.callTime //设置EventTime的值
}
})
另外还有一种复杂的写法:
//读取文件数据 val data = streamEnv.socketTextStream("hadoop101",8888) .map(line=>{ var arr =line.split(",") new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong) }) //如果EventTime是乱序的,需要考虑一个延迟时间t //当前代码设置的延迟时间为3秒 data.assignTimestampsAndWatermarks( new MyCustomerPeriodicWatermark(3000L)) //自定义延迟3秒 } class MyCustomerPeriodicWatermark(delay: Long) extends AssignerWithPeriodicWatermarks[StationLog]{ var maxTime :Long=0 override def getCurrentWatermark: Watermark = { new Watermark(maxTime-delay) //创建水位线 } override def extractTimestamp(element: StationLog, previousElementTimestamp: Long): Long = { maxTime=maxTime.max(element.callTime) //maxtime永远是最大值 element.callTime } }
With Punctuated(间断性的) Watermark
间断性的生成 Watermark 一般是基于某些事件触发 Watermark 的生成和发送,比如:在我们的基站数据中,有一个基站的 CallTime 总是没有按照顺序传入,其他基站的时间都是正常的,那我们需要对这个基站来专门生成 Watermark。
//读取文件数据 val data = streamEnv.socketTextStream("hadoop101",8888) .map(line=>{ var arr =line.split(",") new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong) }) //只有station_1的EventTime是无序的,所以只需要针对station_1做处理 //当前代码设置station_1基站的延迟处理时间为3秒 data.assignTimestampsAndWatermarks( new MyCustomerPunctuatedWatermarks(3000L)) //自定义延迟 } class MyCustomerPunctuatedWatermarks(delay:Long) extends AssignerWithPunctuatedWatermarks[StationLog]{ var maxTime :Long=0 override def checkAndGetNextWatermark(element: StationLog, extractedTimestamp: Long): Watermark = { if(element.sid.equals("station_1")){//当基站ID为:station_1 才生成水位线 maxTime =maxTime.max(extractedTimestamp) new Watermark(maxTime-delay) }else{ return null //其他情况下不返回水位线 } } override def extractTimestamp(element: StationLog, previousElementTimestamp: Long): Long = { element.callTime //抽取EventTime的值 } }
需求:每隔 5 秒中统计一下最近 10 秒内每个基站中通话时间最长的一次通话发生的呼叫时间、主叫号码,被叫号码,通话时长。并且还得告诉我到底是哪个时间范围(10 秒)内的。
注意:基站日志数据传入的时候是无序的,通过观察发现时间最多延迟了 3 秒。
/** * 每隔5秒中统计一下最近10秒内每个基站中通话时间最长的一次通话发生的 * 呼叫时间、主叫号码,被叫号码,通话时长。 * 并且还得告诉我到底是哪个时间范围(10秒)内的。 */ object MaxLongCallTime { def main(args: Array[String]): Unit = { //初始化Flink的Streaming(流计算)上下文执行环境 val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题 import org.apache.flink.streaming.api.scala._ //读取文件数据 val data = streamEnv.socketTextStream("hadoop101",8888) .map(line=>{ var arr =line.split(",") new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.to Long) }) .assignTimestampsAndWatermarks( //引入Watermark new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(3)){//延迟3秒 override def extractTimestamp(element: StationLog) = { element.callTime } }) //分组,开窗处理 data.keyBy(_.sid) .timeWindow(Time.seconds(10),Time.seconds(5)) //reduce 函数做增量聚合 ,MaxTimeAggregate能做到来一条数据处理一条, //ReturnMaxTime 在窗口触发的时候调用 .reduce(new MaxTimeReduce,new ReturnMaxTime) .print() streamEnv.execute() } class MaxTimeReduce extends ReduceFunction[StationLog]{ override def reduce(t: StationLog, t1: StationLog): StationLog = { //通话时间比较 if(t.duration > t1.duration) t else t1 } } class ReturnMaxTime extends WindowFunction[StationLog,String,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[StationLog], out: Collector[String]): Unit = { var sb =new StringBuilder sb.append("窗口范围是:").append(window.getStart).append("----").append(window.getEnd) sb.append("\n") sb.append("通话日志:").append(input.iterator.next()) out.collect(sb.toString()) } } }
测流:
注意:如果有 k Watermark 同时也有 d Allowed Lateness 。那么窗口函数再次触发的条件是:k watermark < < w end-of-window + + allowedLateness
案例:每隔 5 秒统计最近 10 秒,每个基站的呼叫数量。要求:
1、每个基站的数据会存在乱序
2、大多数数据延迟 2 秒到,但是有些数据迟到时间比较长
3、迟到时间超过两秒的数据不能丢弃,放入侧流
object LateDataOnWindow { def main(args: Array[String]): Unit = { //初始化Flink的Streaming(流计算)上下文执行环境 val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题 import org.apache.flink.streaming.api.scala._ //读取文件数据 val data = streamEnv.socketTextStream("hadoop101",8888) .map(line=>{ var arr =line.split(",") new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong) }) .assignTimestampsAndWatermarks( //引入Watermark new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(2)){//延迟2秒 override def extractTimestamp(element: StationLog) = { element.callTime } }) 84 //分组,开窗处理 //定义一个侧输出流 的标签 var lateTag =new OutputTag[StationLog]("late") val mainStream: DataStream[String] = data.keyBy(_.sid) .timeWindow(Time.seconds(10), Time.seconds(5)) //注意:只要符合watermark < end-of-window + allowedLateness之内到达的数据,都会被再次触发窗口的计算 //超过之外的迟到数据会被放入侧输出流 .allowedLateness(Time.seconds(5)) //允许数据迟到5秒 .sideOutputLateData(lateTag) .aggregate(new AggregateCount, new OutputResult) mainStream.getSideOutput(lateTag).print("late")//迟到很久的数据可以另外再处理 mainStream.print("main") streamEnv.execute() } class AggregateCount extends AggregateFunction[StationLog,Long,Long]{ override def createAccumulator(): Long = 0 override def add(in: StationLog, acc: Long): Long = acc+1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc+acc1 } class OutputResult extends WindowFunction[Long,String,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = { var sb =new StringBuilder sb.append("窗口范围是:").append(window.getStart).append("----").append(window.getEnd) sb.append("\n") sb.append("当前基站是:").append(key) .append(" 呼叫数量是: ").append(input.iterator.next()) out.collect(sb.toString()) } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。