赞
踩
在滚动窗口中使用 WindowFunction 函数来实现对数据的统计。
样例数据:
9,3
9,2
9,7
4,9
2,6
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- public class WindowsReduceDemo {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
- source.setParallelism(1);
- SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapData = source.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
- @Override
- public Tuple2<Integer, Integer> map(String value) throws Exception {
- String[] arr = value.split(",");
- return Tuple2.of(Integer.valueOf(arr[0]), Integer.valueOf(arr[1]));
- }
- });
- mapData.keyBy(line->line.f0)
- // todo 滚动窗口 每三秒一计算
- .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
- .apply(new WindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer, TimeWindow>() {
- @Override
- public void apply(Integer integer, TimeWindow window, Iterable<Tuple2<Integer, Integer>> input, Collector<Tuple2<Integer, Integer>> out) throws Exception {
- Integer key = 0;
- Integer count = 0;
- for (Tuple2<Integer, Integer> line : input){
- // key 获取的是 参数
- key = line.f0;
- // count 获取的是对指定参数进行累加操作
- count += line.f1;
- }
- out.collect(Tuple2.of(key,count));
- }
- })
- .print();
- env.execute();
- }
- }
给程序开一个单独的端口:nc -lk 9999
当程序运行后,在开的端口处发送数据并在控制台打印输出结果:
在滑动窗口之使用 ReduceFunction 实现对数据的统计并输出。
样例数据:
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
- import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- public class WindowsReduceDemo {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
- source.setParallelism(1);
- SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapData = source.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
- @Override
- public Tuple2<Integer, Integer> map(String value) throws Exception {
- String[] arr = value.split(",");
- return Tuple2.of(Integer.valueOf(arr[0]), Integer.valueOf(arr[1]));
- }
- });
- mapData.keyBy(line->line.f0)
-
- // todo 滑动窗口 每三秒计算一次前六秒的数据量
- .window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
- .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
- @Override
- public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
- return Tuple2.of(value1.f0,value1.f1 + value2.f1);
- }
- })
-
- .print();
- env.execute();
- }
- }
给程序开一个单独的端口:nc -lk 9999
当程序运行后,在开的端口处发送数据并在控制台打印输出结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。