赞
踩
1)、针对KeyedStream
窗口API:window
keyBy
函数分组,获取KeyedStreamKeyedStream.window
设置窗口也就是调用了keyBy
算子的,使用window
// TODO: KeyedStream窗口操作,先分组,再窗口,最后聚合 SingleOutputStreamOperator<String> windowDataStream = tupleStream // 设置Key进行分组 .keyBy(0) // 设置窗口,每5秒中统计最近5秒的数据,滚动窗口 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 聚合操作 .apply(new WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { } });
2)、非KeyedStream窗口API:windowAll
windowAll
,对窗口所有数据进行处理,未进行分组reduce、fold、aggregate
函数apply()
函数也就是没有使用keyBy
算子的,使用windowAll
// TODO: 未进行分组,直接窗口window,再聚合
SingleOutputStreamOperator<String> allWindowDataStream = tupleStream
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>() {
@Override
public void apply(TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<String> out) throws Exception {
}
});
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * @author liu a fu * @version 1.0 * @date 2021/3/7 0007 * @DESC 窗口统计案例演示:滚动时间窗口(Tumbling Time Window),实时交通卡口车流量统计 * TODO: 滚动窗口数据不会重复 想成水流在流动 * 滑动窗口数据会重复 想成窗口在走动 */ public class StreamTumblingTimeWindow { public static void main(String[] args) throws Exception { //1-环境准备 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2-数据源source DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999); //3-数据的transformation /* 数据: a,3 a,2 a,7 d,9 b,6 a,5 b,3 e,7 e,4 */ SingleOutputStreamOperator<Tuple2<String, Integer>> mapDataStream = inputDataStream .filter(line -> line != null && line.trim().split(",").length == 2) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String line) throws Exception { String[] split = line.trim().split(","); return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1])); } }); // TODO: 先按照卡口分组,再进行窗口操作,最后聚合累加 使用了keyBy 所以使用window SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = mapDataStream .keyBy(0) // 下标索引,卡口编号 .timeWindow(Time.seconds(5)) // 滚动时间窗口,仅仅设置窗口大小即可 .sum(1); //4-数据的sink resultDataStream.printToErr(); //5-execute env.execute(StreamTumblingTimeWindow.class.getSimpleName()) ; } }
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * 窗口统计案例演示:滑动时间窗口(Sliding Time Window),实时交通卡口车流量统计 */ public class StreamSlidingTimeWindow { public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2. 数据源-source DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999); // 3. 数据转换-transformation /* 数据: a,3 a,2 a,7 d,9 b,6 a,5 b,3 e,7 e,4 */ SingleOutputStreamOperator<Tuple2<String, Integer>> mapDataStream = inputDataStream .filter(line -> null != line && line.trim().split(",").length == 2) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String line) throws Exception { String[] split = line.trim().split(","); return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1])); } }); // TODO: 先按照卡口分组,再进行窗口操作,最后聚合累加 SingleOutputStreamOperator<Tuple2<String, Integer>> sumDataStream = mapDataStream .keyBy(0) // 下标索引,卡口编号 // public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) .timeWindow(Time.seconds(10), Time.seconds(5)) // 滑动时间窗口 大小10s 滑动时间5秒 .sum(1); // 4. 数据终端-sink sumDataStream.printToErr(); // 5. 触发执行-execute env.execute(StreamSlidingTimeWindow.class.getSimpleName()) ; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。