赞
踩
会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo
上节完成了 SlideWindow
时间窗口中的 时间驱动
和 事件驱动
/ 时间驱动
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
timeWindow.sum(1).print();
timeWindow.apply(new MyTimeWindowFunction()).print();
// 事件驱动
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream
.countWindow(3, 2);
countWindow.sum(1).print();
countWindow.apply(new MyCountWindowFunction()).print();
env.execute();
Flink 会话窗口
(Session Window)是一种基于会话活动来划分窗口
的机制。与固定
时间窗口(Tumbling Window)和滑动
窗口(Sliding Window)不同,会话窗口不依赖固定的时间间隔
,而是根据数据的活跃度来动态地划分窗口。具体来说,当数据流中存在一定时间的间隔(即没有数据到达),会话窗口
会根据这个间隔结束
一个窗口,并在新的数据
到来时开始
一个新的窗口
。
package icu.wzk.demo07; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import icu.wzk.demo06.MyTimeWindowFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; /** * 会话窗口 * @author wzk * @date 14:10 2024/6/24 **/ public class SessionWindow { private static final Random RANDOM = new Random(); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = RANDOM.nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<>(value, random); } }); KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); // 如果连续10s内,没有数据进来,则会话窗口断开。 WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyedStream .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))); window.sum(1).print(); window.apply(new MyTimeWindowFunction()).print(); env.execute(); } }
package icu.wzk.demo07; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import icu.wzk.demo06.MyTimeWindowFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; /** * 会话窗口 * @author wzk * @date 14:10 2024/6/24 **/ public class SessionWindow { private static final Random RANDOM = new Random(); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = RANDOM.nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<>(value, random); } }); KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); // 如果连续10s内,没有数据进来,则会话窗口断开。 WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyedStream .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))); window.sum(1).print(); window.apply(new MyCountWindowFunction()).print(); env.execute(); } }
package icu.wzk.demo06; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; /** * 基于时间驱动 TimeWindow * @author wzk * @date 10:26 2024/6/22 **/ public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, String, TimeWindow> { @Override public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for(Tuple2<String,Integer> tuple2 : input){ sum +=tuple2.f1; } long start = window.getStart(); long end = window.getEnd(); out.collect("key:" + s + " value: " + sum + "| window_start :" + format.format(start) + " window_end :" + format.format(end) ); } }
package icu.wzk.demo06; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; /** * 基于事件驱动 GlobalWindow * @author wzk * @date 10:27 2024/6/22 **/ public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, String, GlobalWindow> { @Override public void apply(String s, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for (Tuple2<String, Integer> tuple2 : input){ sum += tuple2.f1; } // 无用的时间戳,默认值为: Long.MAX_VALUE,因为基于事件计数的情况下,不关心时间。 long maxTimestamp = window.maxTimestamp(); out.collect("key:" + s + " value: " + sum + "| maxTimeStamp :" + maxTimestamp + "," + format.format(maxTimestamp) ); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。