赞
踩
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html
streaming流式计算是⼀种被设计用于处理⽆限数据集的数据处理引擎,而⽆限数据集是指一种不断增长的本质上无限数据集,⽽window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将⼀个⽆限stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
Window可以分成两类:
对于TimeWindow,可以根据窗⼝实现原理的不同分成三类:滚动窗⼝(Tumbling Window)、滑动
窗⼝(Sliding Window)和会话窗⼝(Session Window)。
将数据依据固定的窗⼝⻓度对数据进行切片。
特点:时间对⻬,窗口⻓度固定,没有重叠。
滚动窗⼝分配器将每个元素分配到⼀个指定窗⼝⼤小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,⼝口的创建如下图所示:
适用场景:适合做BI统计等(做每个时间段的聚合计算)。
滑动窗⼝是固定窗口的更⼴义的⼀种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
特点:时间对齐,窗口长度固定,有重叠
该滑动窗口分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,窗口大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。
例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。
适⽤场景:对最近⼀个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。
由⼀系列事件组合⼀个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
特点:时间⽆对⻬。
在会话窗口中按活动会话分配器组中的元素。会话窗口不重叠,没有固定的开始和结束时间,与翻滚窗口和滑动窗口相反。相反,当会话窗口在一段时间内没有接收到元素时,即当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态会话间隙或 会话间隙提取器功能,该功能定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续元素将分配给新的会话窗口。
Count Window根据窗⼝中相同key元素的数量来触发执行,执行时只计算元素数量达到窗⼝⼤小的key对应的结果**。
注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。
默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。
- package com.wedoctor.flink.window
-
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-
- /**
- * 目的:测试分组后的CountWindow
- * CountWindow是以条数划分Window(逻辑划分的一个部分数据,然后对这些数据进行处理)
- *
- */
- object CountWindowDemo2 {
-
- def main(args: Array[String]): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val lines: DataStream[String] = env.socketTextStream("localhost", 8888)
-
- val wordAndOne = lines.map((_, 1))
-
- val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
-
- //KeyedStream才可以调用countWindow
-
- val window = keyed.countWindow(5)
-
- val result: DataStream[(String, Int)] = window.sum(1)
-
- result.print()
-
- env.execute(this.getClass.getSimpleName)
- }
- }
滑动窗⼝和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。
下⾯代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围是5个元素。
- // 当相同key的元素个数达到2个时,触发窗口计算,计算的窗口范围为5
- val streamWindow = streamKeyBy.countWindow(5,2)
TimeWindow是将指定时间范围内的所有数据组成⼀个window,⼀次对一个window⾥面的所有数据进行计算。
Flink默认的时间窗⼝根据Processing Time 进⾏窗⼝的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。
- package com.wedoctor.flink.window
-
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- import org.apache.log4j.{Level, Logger}
-
- object TumblingTimeWindow {
-
- Logger.getLogger("org").setLevel(Level.ERROR)
- def main(args: Array[String]): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val lines: DataStream[String] = env.socketTextStream("localhost",8888)
- val words: DataStream[String] = lines.flatMap(_.split(" "))
- var wordAndOne: DataStream[(String, Int)] = words.map((_, 1))
- val keyed = wordAndOne.keyBy(0)
- //分组后可以调用timeWindow
- val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(5))
- val result = window.sum(1)
- result.print()
- env.execute(this.getClass.getSimpleName)
- }
- }
时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的⼀个来指定。
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。
下⾯代码中的sliding_size设置为了2s,也就是说,窗口每2s就计算一次,每一次计算的window范围是5s内的所有元素。
- package com.wedoctor.flink.window
-
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- import org.apache.log4j.{Level, Logger}
-
- object SlideTimeWindow {
- Logger.getLogger("org").setLevel(Level.ERROR)
-
- def main(args: Array[String]): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val lines = env.socketTextStream("localhost",8888)
- val nums = lines.map(_.toInt)
- //如果是timeWindowAll 则不需要分组
- val window: AllWindowedStream[Int, TimeWindow] = nums.timeWindowAll(Time.seconds(10),Time.seconds(5))
- val result: DataStream[Int] = window.sum(0)
- result.print()
- env.execute(this.getClass.getSimpleName)
- }
-
- }
时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。
WindowedStream 转换成 DataStream:给window赋一个reduce功能的函数,并返回⼀个聚合的结果。
- // 引⼊时间窗⼝
- val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
- val streamReduce = streamWindow.reduce(
- (a, b) => (a._1, a._2 + b._2)
- )
在Flink的流式处理中,绝⼤部分的业务都会使⽤eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 从调⽤时刻开始给env创建的每一个stream追加时间特征
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
我们知道,流处理从事件产⽣,到流经source,再到operator,中间是有⼀个过程和时间的,虽然⼤部分情况下,流到operato的数据都是按照事件产⽣的时间顺序来的,但是也不排除由于⽹络、背压等原因,导致乱序的产⽣,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
那么此时出现⼀个问题,⼀旦出现乱序,如果只根据eventTime决定window的运⾏,我们不能明确数据是否全部到位,但⼜不能⽆限期的等下去,此时必须要有个机制来保证⼀个特定的时间后,必须触发window去进⾏计算了,这个特别的机制,就是Watermark。Watermark是⼀种衡量Event Time进展的机制,它是数据本身的⼀个隐藏属性,数据本身携带着对应Watermark。
Watermark是⽤于处理乱序事件的,⽽正确的处理乱序事件,通常⽤Watermark机制结合window来实现。数据流中的Watermark⽤于表示timestamp⼩于Watermark的数据,都已经到达了,因此,window的执⾏也是由Watermark触发的。
Watermark可以理解成⼀个延迟触发机制,我们可以设置Watermark的延时时⻓t,每次系统会校验已经到达的数据中最⼤的maxEventTime,然后认定eventTime⼩于maxEventTime- t的所有数据都已经到达,如果有窗⼝的停⽌时间等于maxEventTime – t,那么这个窗⼝被触发执⾏。
有序流的Watermarker如下图所示:(Watermark设置为0)
乱序流的Watermarker如下图所示:(Watermark设置为2)
当Flink接收到每一条数据时,都会产⽣一条Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 延迟时⻓长,也就是说,Watermark是由数据携带的,一旦数据携带的Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于Watermark是由数据携带的,因此,如果运行过程中⽆法获取新的数据,那么没有被触发的窗口将永远都不不被触发。
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 从调⽤时刻开始给env创建的每⼀个stream追加时间特征
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val stream = env.readTextFile("eventTest.txt").assignTimestampsAndWatermarks(
- new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200)) {
- override def extractTimestamp(t: String): Long = {
- // EventTime是⽇志⽣成时间,我们从⽇志中解析EventTime
- t.split(" ")(0).toLong
- }
- })
当使⽤EventTimeWindow时,所有的Window在EventTime的时间轴上进⾏划分,也就是说,在Window启动后,会根据初始的EventTime时间每隔⼀段时间划分⼀个窗⼝,如果Window⼤⼩是3秒,那么1分钟内会把Window划分为如下的形式:
- [00:00:00,00:00:03)
- [00:00:03,00:00:06)
- ...
- [00:00:57,00:01:00)
如果Window⼤⼩是10秒,则Window会被分为如下的形式:
- [00:00:00,00:00:10)
- [00:00:10,00:00:20)
- ...
- [00:00:50,00:01:00)
注意,窗⼝是左闭右开的,形式为:[window_start_time,window_end_time)。
Window的设定⽆关数据本身,⽽是系统定义好了的,也就是说,Window会⼀直按照指定的时间间隔进⾏划分,不论这个Window中有没有数据,EventTime在这个Window期间的数据会进⼊这个Window。
Window会不断产⽣,属于这个Window范围的数据会被不断加⼊到Window中,所有未被触发的Window都会等待触发,只要Window还没触发,属于这个Window范围的数据就会⼀直被加⼊到Window中,直到Window被触发才会停⽌数据的追加,⽽当Window触发之后才接受到的属于被触发Window的数据会被丢弃。
Window会在以下的条件满⾜时被触发执⾏:
l watermark时间 >= window_end_time;
l 在[window_start_time,window_end_time)中有数据存在。
我们通过下图来说明Watermark、EventTime和Window的关系。
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 创建SocketSource
- val stream = env.socketTextStream("localhost", 8888)
- // 对stream进⾏处理并按key聚合
- val streamKeyBy = stream.assignTimestampsAndWatermarks(
- new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {
- override def extractTimestamp(element: String): Long = {
- val sysTime = element.split(" ")(0).toLong
- println(sysTime)
- sysTime
- }}).map(item => (item.split(" ")(1), 1)).keyBy(0)
- // 引⼊滚动窗⼝
- val streamWindow = streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(10)))
- // 执⾏聚合操作
- val streamReduce = streamWindow.reduce(
- (a, b) => (a._1, a._2 + b._2)
- )
- // 将聚合数据写⼊⽂件
- streamReduce.print
- // 执⾏程序
- env.execute("TumblingWindow")
结果是按照Event Time的时间窗⼝计算得出的,⽽⽆关系统的时间(包括输⼊的快慢)。
- // 获取执⾏环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 创建SocketSource
- val stream = env.socketTextStream("localhost", 11111)
- // 对stream进⾏处理并按key聚合
- val streamKeyBy = stream.assignTimestampsAndWatermarks(
- new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) {
- override def extractTimestamp(element: String): Long = {
- val sysTime = element.split(" ")(0).toLong
- println(sysTime)
- sysTime
- }}).map(item => (item.split(" ")(1), 1)).keyBy(0)
- // 引⼊滚动窗⼝
- val streamWindow = streamKeyBy.window(SlidingEventTimeWindows.of(Time.seconds(10),
- Time.seconds(5)))
- // 执⾏聚合操作
- val streamReduce = streamWindow.reduce(
- (a,b) => (a._1, a._2 + b._2)
- )
- // 将聚合数据写⼊⽂件
- streamReduce.print
- // 执⾏程序
- env.execute("TumblingWindow")
相邻两次数据的EventTime的时间差超过指定的时间间隔就会触发执⾏。如果加⼊Watermark,那么当触发执⾏时,所有满⾜时间间隔⽽还没有触发的Window会同时触发执⾏。
- // 获取执⾏环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 创建SocketSource
- val stream = env.socketTextStream("localhost", 11111)
- // 对stream进⾏处理并按key聚合
- val streamKeyBy = stream.assignTimestampsAndWatermarks(
- new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) {
- override def extractTimestamp(element: String): Long = {
- val sysTime = element.split(" ")(0).toLong
- println(sysTime)
- sysTime
- }}).map(item => (item.split(" ")(1), 1)).keyBy(0)
- // 引⼊滚动窗⼝
- val streamWindow =
- streamKeyBy.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
- // 执⾏聚合操作
- val streamReduce = streamWindow.reduce(
- (a, b) => (a._1, a._2 + b._2)
- )
- // 将聚合数据写⼊⽂件
- streamReduce.print
- // 执⾏程序
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。