当前位置:   article > 正文

Flink之函数使用

Flink之函数使用

1. WindowFunction

滚动窗口中使用 WindowFunction 函数来实现对数据的统计。

样例数据:

9,3
9,2
9,7
4,9
2,6

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.common.functions.ReduceFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  8. import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
  9. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  10. import org.apache.flink.streaming.api.windowing.time.Time;
  11. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  12. import org.apache.flink.util.Collector;
  13. public class WindowsReduceDemo {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
  17. source.setParallelism(1);
  18. SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapData = source.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
  19. @Override
  20. public Tuple2<Integer, Integer> map(String value) throws Exception {
  21. String[] arr = value.split(",");
  22. return Tuple2.of(Integer.valueOf(arr[0]), Integer.valueOf(arr[1]));
  23. }
  24. });
  25. mapData.keyBy(line->line.f0)
  26. // todo 滚动窗口 每三秒一计算
  27. .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
  28. .apply(new WindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer, TimeWindow>() {
  29. @Override
  30. public void apply(Integer integer, TimeWindow window, Iterable<Tuple2<Integer, Integer>> input, Collector<Tuple2<Integer, Integer>> out) throws Exception {
  31. Integer key = 0;
  32. Integer count = 0;
  33. for (Tuple2<Integer, Integer> line : input){
  34. // key 获取的是 参数
  35. key = line.f0;
  36. // count 获取的是对指定参数进行累加操作
  37. count += line.f1;
  38. }
  39. out.collect(Tuple2.of(key,count));
  40. }
  41. })
  42. .print();
  43. env.execute();
  44. }
  45. }

给程序开一个单独的端口:nc -lk 9999

当程序运行后,在开的端口处发送数据并在控制台打印输出结果:

2. ReduceFunction

在滑动窗口之使用 ReduceFunction 实现对数据的统计并输出。

样例数据:

9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.common.functions.ReduceFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  8. import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
  9. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
  10. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  13. import org.apache.flink.util.Collector;
  14. public class WindowsReduceDemo {
  15. public static void main(String[] args) throws Exception {
  16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
  18. source.setParallelism(1);
  19. SingleOutputStreamOperator<Tuple2<Integer, Integer>> mapData = source.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
  20. @Override
  21. public Tuple2<Integer, Integer> map(String value) throws Exception {
  22. String[] arr = value.split(",");
  23. return Tuple2.of(Integer.valueOf(arr[0]), Integer.valueOf(arr[1]));
  24. }
  25. });
  26. mapData.keyBy(line->line.f0)
  27. // todo 滑动窗口 每三秒计算一次前六秒的数据量
  28. .window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
  29. .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
  30. @Override
  31. public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
  32. return Tuple2.of(value1.f0,value1.f1 + value2.f1);
  33. }
  34. })
  35. .print();
  36. env.execute();
  37. }
  38. }

给程序开一个单独的端口:nc -lk 9999

当程序运行后,在开的端口处发送数据并在控制台打印输出结果:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/886429
推荐阅读
相关标签
  

闽ICP备14008679号