赞
踩
streaming流式计算是⼀种被设计用于处理⽆限数据集的数据处理引擎,而⽆限数据集是指一种不断增长的本质上无限数据集,⽽window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将⼀个⽆限stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
TimeWindow是将指定时间范围内的所有数据组成⼀个window,⼀次对一个window⾥面的所有数据进行计算。
Flink默认的时间窗⼝根据Processing Time 进⾏窗⼝的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。
将数据依据固定的窗⼝⻓度对数据进行切片。
特点:时间对⻬,窗口⻓度固定,没有重叠。
滚动窗⼝分配器将每个元素分配到⼀个指定窗⼝⼤小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,如下图所示:
适用场景:适合做BI统计等(做每个时间段的聚合计算)。
2.1.1.1 timeWindowAll(全局数据,默认Processing Time)
package com.wedoctor.flink; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class TumblingTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。 SingleOutputStreamOperator<Integer> num = lines.map(Integer::parseInt); //划分窗口 AllWindowedStream<Integer, TimeWindow> timeWindowAll = num.timeWindowAll(Time.seconds(5)); //对窗口数据进行计算 SingleOutputStreamOperator<Integer> sum = timeWindowAll.sum(0); sum.print(); env.execute(); } }
2.1.1.2 timeWindow(窗口滚动的时候,所有组都要执行,并行处理,默认Processing Time)
package com.wedoctor.flink; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class TumblingTimeWindow2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> { String[] fileds = line.split(","); return Tuple2.of(fileds[0], Integer.parseInt(fileds[1])); }).returns(Types.TUPLE(Types.STRING,Types.INT)); KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0); WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(5)); timeWindow.sum(1).print(); env.execute(); } }
2.1.1.3 timeWindowAll(全局数据,使用Event Time)
package com.wedoctor.flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.ParseException; import java.text.SimpleDateFormat; public class EventTimeTumbingWindwAllDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置EventTime作为时间标准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //创建一个DataStream //2020-11-08 18:22:43,1 DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); //提取数据中的时间 SingleOutputStreamOperator<String> watermarksDataStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public long extractTimestamp(String element) { long timestamp = 0; try { timestamp = sdf.parse(element.split(",")[0]).getTime(); } catch (ParseException e) { timestamp = System.currentTimeMillis(); } return timestamp; } }); SingleOutputStreamOperator<Integer> nums = watermarksDataStream.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) throws Exception { return Integer.parseInt(value.split(",")[1]); } }); AllWindowedStream<Integer, TimeWindow> windowed = nums.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); SingleOutputStreamOperator<Integer> summed = windowed.sum(0); summed.print(); env.execute(); } }
2.1.1.4 timeWindow(分组数据,使用Event Time)
package com.wedoctor.flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class EventTimeTumblingWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); //设置EventTime作为时间标准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1000,hadoop,1 DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); SingleOutputStreamOperator<String> watermarksDataStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { @Override public long extractTimestamp(String element) { return Long.parseLong(element.split(",")[0]); } }); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = watermarksDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] fileds = value.split(","); String word = fileds[1]; int count = Integer.parseInt(fileds[2]); return Tuple2.of(word, count); } }); //先分组 KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0); //划分窗口 WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5))); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1); summed.print(); env.execute(); } }
2.1.2 滑动窗口
滑动窗⼝是固定窗口的更⼴义的⼀种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
特点:时间对齐,窗口长度固定,有重叠
该滑动窗口分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,窗口大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。
例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。
适⽤场景:对最近⼀个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。
2.1.2.1 全局滑动(默认Processing Time)
package com.wedoctor.flink; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class TumblingTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。 SingleOutputStreamOperator<Integer> num = lines.map(Integer::parseInt); //划分滑动窗口 AllWindowedStream<Integer, TimeWindow> timeWindowAll = num.timeWindowAll(Time.seconds(10),Time.seconds(5)); //对窗口数据进行计算 SingleOutputStreamOperator<Integer> sum = timeWindowAll.sum(0); sum.print(); env.execute(); } }
2.1.2.2 分组滑动(默认Processing Time)
package com.wedoctor.flink; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class TumblingTimeWindow2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> { String[] fileds = line.split(","); return Tuple2.of(fileds[0], Integer.parseInt(fileds[1])); }).returns(Types.TUPLE(Types.STRING,Types.INT)); KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0); WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10),Time.seconds(5)); timeWindow.sum(1).print(); env.execute(); } }
2.1.3 会话窗口
由⼀系列事件组合⼀个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
特点:时间⽆对⻬。
在会话窗口中按活动会话分配器组中的元素。会话窗口不重叠,没有固定的开始和结束时间,与翻滚窗口和滑动窗口相反。相反,当会话窗口在一段时间内没有接收到元素时,即当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态会话间隙或 会话间隙提取器功能,该功能定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续元素将分配给新的会话窗口。
2.1.3.1 不分组(默认Processing Time)
package com.wedoctor.flink;
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class ProcessingTimeSessionWindowAllDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt); //不分组,划分会话窗口 AllWindowedStream<Integer, TimeWindow> windowed = nums.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); //划分完窗口要调用WindowFunction对窗口内的数据进行计算 SingleOutputStreamOperator<Integer> summed = windowed.sum(0); summed.print(); env.execute(); } }
2.1.3.2 分组(单个组出发,不是全部触发,默认Processing Time)
package com.wedoctor.flink; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class ProcessingTimeSessionWindwDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); //spark,3 //hadoop,2 //flink,1 DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> { String[] fields = line.split(","); return Tuple2.of(fields[0], Integer.parseInt(fields[1])); }).returns(Types.TUPLE(Types.STRING, Types.INT)); //先分组 KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0); WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = windowed.sum(1); summed.print(); env.execute(); } }
2.1.3.3 不分组(使用Event time)
package com.wedoctor.flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class EventTimeSessionWindowAllDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1000,1 DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); //提取数据中的时间 SingleOutputStreamOperator<String> watermarksDataStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { @Override public long extractTimestamp(String element) { return Long.parseLong(element.split(",")[0]); } }); SingleOutputStreamOperator<Integer> nums = watermarksDataStream.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) throws Exception { return Integer.parseInt(value.split(",")[1]); } }); //不分组划分窗口 AllWindowedStream<Integer, TimeWindow> windowed = nums.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))); windowed.sum(0).print(); env.execute(); } }
2.1.3.3 分组(使用Event time)
package com.wedoctor.flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class EventTimeSessionWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); //设置EventTime作为时间标准 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1000,spark,1 DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); SingleOutputStreamOperator<String> watermarksDataStream = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { @Override public long extractTimestamp(String element) { return Long.parseLong(element.split(",")[0]); } }); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = watermarksDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] fileds = value.split(","); String word = fileds[1]; int count = Integer.parseInt(fileds[2]); return Tuple2.of(word, count); } }); //先分组 KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0); //划分窗口 //keyed.timeWindow(Time.seconds(5)); WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyed.window(EventTimeSessionWindows.withGap(Time.seconds(5))); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1); summed.print(); env.execute(); } }
2.2 GlobalWindow(CountWindow)
按照指定的数据条数生成⼀个Window,与时间无关
2.2.1 countWindowAll
全部数据发送到一个task里面 并不是分布式执行
package com.wedoctor.flink; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; public class CountWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。 SingleOutputStreamOperator<Integer> num = lines.map(Integer::parseInt); //划分窗口 AllWindowedStream<Integer, GlobalWindow> windowd = num.countWindowAll(5); //对窗口数据进行计算 SingleOutputStreamOperator<Integer> sum = windowd.sum(0); sum.print(); env.execute(); } }
2.2.2 countWindow
分组满足触发条件即可,并不是触发后每个分区都会执行
package com.wedoctor.flink; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; public class CountWindow2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("192.168.xx.xx", 9999); //默认的CountWindow是⼀个滚动窗⼝,只需要指定窗⼝⼤小即可,当元素数量达到窗口⼤小时,就会触发窗⼝的执⾏。 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(line -> { String[] fileds = line.split(","); return Tuple2.of(fileds[0], Integer.parseInt(fileds[1])); }).returns(Types.TUPLE(Types.STRING,Types.INT)); KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndCount.keyBy(t -> t.f0); WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream.countWindow(5); countWindow.sum(1).print(); env.execute(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。