赞
踩
介绍:流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段,其分为两种类型:1、时间窗口,2:计数窗口
时间窗口根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)
介绍:将数据依据固定的窗口长度(时间)对数据进行切片
特点:时间对齐,窗口长度固定,没有重叠
package com.xx.window; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** * @author aqi * @since 2023/8/30 15:46 */ @Slf4j public class WindowReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送) SingleOutputStreamOperator<Demo> sensorDS = env .socketTextStream("127.0.0.1", 7777) .map(new DemoMapFunction()); // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次) WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS .keyBy(Demo::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // 聚合(也可以使用别的算子进行聚合) SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce( (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue()) ); // 打印计算结果 reduce.print(); // 触发计算 env.execute(); } } @Data @AllArgsConstructor @NoArgsConstructor class Demo { private String id; private Long value; } class DemoMapFunction implements MapFunction<String, Demo> { @Override public Demo map(String value) { String[] datas = value.split(","); return new Demo(datas[0], Long.valueOf(datas[1])); } }
介绍:滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
特点:时间对齐,窗口长度固定,有重叠
package com.xx.window; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** * @author aqi * @since 2023/8/30 15:46 */ @Slf4j public class WindowReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送) SingleOutputStreamOperator<Demo> sensorDS = env .socketTextStream("127.0.0.1", 7777) .map(new DemoMapFunction()); // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次) // WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS // .keyBy(Demo::getId) // .window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // 滑动窗口(每5秒钟统计一次,过去的10秒钟内的数据) WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS .keyBy(Demo::getId) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))); // 聚合(也可以使用别的算子进行聚合) SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce( (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue()) ); // 打印计算结果 reduce.print(); // 触发计算 env.execute(); } } @Data @AllArgsConstructor @NoArgsConstructor class Demo { private String id; private Long value; } class DemoMapFunction implements MapFunction<String, Demo> { @Override public Demo map(String value) { String[] datas = value.split(","); return new Demo(datas[0], Long.valueOf(datas[1])); } }
介绍:由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口
特点:时间无对齐
package com.xx.window; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** * @author aqi * @since 2023/8/30 15:46 */ @Slf4j public class WindowReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送) SingleOutputStreamOperator<Demo> sensorDS = env .socketTextStream("127.0.0.1", 7777) .map(new DemoMapFunction()); // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次) // WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS // .keyBy(Demo::getId) // .window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // 滑动窗口(每5秒钟统计一次,过去的10秒钟内的数据) // WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS // .keyBy(Demo::getId) // .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))); // 会话窗口(超时间隔5s) WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS .keyBy(Demo::getId) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); // 聚合(也可以使用别的算子进行聚合) SingleOutputStreamOperator<Demo> reduce = sensorWS.reduce( (value1, value2) -> new Demo(value1.getId(), value1.getValue() + value2.getValue()) ); // 打印计算结果 reduce.print(); // 触发计算 env.execute(); } } @Data @AllArgsConstructor @NoArgsConstructor class Demo { private String id; private Long value; } class DemoMapFunction implements MapFunction<String, Demo> { @Override public Demo map(String value) { String[] datas = value.split(","); return new Demo(datas[0], Long.valueOf(datas[1])); } }
滚动窗口:TumblingProcessingTimeWindows.of(Time.seconds(10))
滑动窗口:SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))
会话窗口:ProcessingTimeSessionWindows.withGap(Time.seconds(5))
和时间窗口类似,同样也分为三种,使用方法也基本相同
窗口长度=5个元素
sensorKs.countWindow(5);
窗口长度=5个元素,滑动步长=2个元素
sensorKs.countWindow(5, 2);
来一条数据,计算一条数据,窗口触发的时候输出计算结果
函数:reduce、aggregate等,除了process都是增量函数
数据来了不计算,存储起来,窗口触发的时候,计算并输出结果,并且可以获取到窗口信息、上下文信息等,灵活性非常的强
函数:process
package com.xx.window; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * @author aqi * @since 2023/8/30 15:46 */ @Slf4j public class WindowReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 接收socket数据(windows使用:nc -lp 7777,linux使用:nc -lk 7777进行数据推送) SingleOutputStreamOperator<Demo> sensorDS = env .socketTextStream("127.0.0.1", 7777) .map(new DemoMapFunction()); // 滚动窗口(固定窗口长度为:10秒,每隔10s统计一次) WindowedStream<Demo, String, TimeWindow> sensorWS = sensorDS .keyBy(Demo::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))); SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<Demo, String, String, TimeWindow>() { /** * 全窗口函数的计算逻辑,窗口触发时才会调用一次,统一计算窗口的所有数据 * @param s 分组的key * @param context 上下文 * @param elements 存的数据 * @param out 采集器 */ @Override public void process(String s, ProcessWindowFunction<Demo, String, String, TimeWindow>.Context context, Iterable<Demo> elements, Collector<String> out) { long start = context.window().getStart(); long end = context.window().getEnd(); String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss"); String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss"); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + startWindow + "," + endWindow + "]包含:" + count + "条数据===>" + elements); } }); // 打印计算结果 process.print(); // 触发计算 env.execute(); } } @Data @AllArgsConstructor @NoArgsConstructor class Demo { private String id; private Long value; } class DemoMapFunction implements MapFunction<String, Demo> { @Override public Demo map(String value) { String[] datas = value.split(","); return new Demo(datas[0], Long.valueOf(datas[1])); } }
package com.xx.window; import com.xx.entity.WaterSensor; import com.xx.functions.WaterSensorMapFunction; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * @author aqi * @since 2023/8/30 15:46 */ public class WindowAggregateAndProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("127.0.0.1", 7777) .map(new WaterSensorMapFunction()); KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId); WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); SingleOutputStreamOperator<String> result = sensorWS.aggregate( // 第一个参数:输入数据的类型,第二个参数:累加器的类型,存储的中间计算结果的类型,第三个参数:输出的类型 new AggregateFunction<WaterSensor, Integer, String>() { @Override public Integer createAccumulator() { System.out.println("初始化累加器"); return null; } @Override public Integer add(WaterSensor value, Integer accumulator) { if (accumulator == null) { accumulator = 0; } Integer add = value.getVc() + accumulator; System.out.println("调用add方法,累加结果:" + add); return add; } @Override public String getResult(Integer accumulator) { System.out.println("获取最终结果"); return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { System.out.println("调用merge方法"); return null; } }, new ProcessWindowFunction<String, String, String, TimeWindow>() { @Override public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception { long start = context.window().getStart(); long end = context.window().getEnd(); String startWindow = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss"); String endWindow = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss"); long count = elements.spliterator().estimateSize(); out.collect("key=" + s + "的窗口[" + startWindow + "," + endWindow + "]包含:" + count + "条数据===>" + elements); } }); result.print(); env.execute(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。