当前位置:   article > 正文

Flink学习(七)-单词统计

Flink学习(七)-单词统计

前言

Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

一、代码基础格式

  1. //1st 设置执行环境
  2. xxxEnvironment env = xxxEnvironment.getEnvironment;
  3. //2nd 设置流
  4. DataSource xxxDS=env.xxxx();
  5. //3rd 设置转换
  6. Xxx transformation =xxxDS.xxxx();
  7. //4th 设置sink
  8. transformation.print();
  9. //5th 可能需要
  10. env.execute();

二、Demo1 批处理

  • 源码

  1. public static void main(String[] args) throws Exception {
  2. //1,创建一个执行环境
  3. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  4. //2,获取输入流
  5. DataSource<String> lineDS = env.readTextFile("input/word.txt");
  6. //3,处理数据
  7. FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  8. @Override
  9. public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
  10. //3.1 分隔字符串
  11. String[] values = value.split(" ");
  12. //3.2 汇总统计
  13. for (String word : values) {
  14. Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);
  15. collector.collect(wordTuple);
  16. }
  17. }
  18. });
  19. //4,按单词聚合
  20. UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);
  21. //5,分组内聚合
  22. AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);
  23. //6,输出结果
  24. sum.print();
  25. }
  • 效果展示

三、Demo2 流处理

  • 源码

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");
  4. SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  5. @Override
  6. public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
  7. String[] words = value.split(" ");
  8. for (String word : words) {
  9. Tuple2<String, Integer> temp = Tuple2.of(word, 1);
  10. collector.collect(temp);
  11. }
  12. }
  13. });
  14. KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);
  15. SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);
  16. sum.print();
  17. env.execute();
  18. }
  • 效果展示

四、Demo3 无边界流处理

  • 源码

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);
  4. SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap(
  5. (String value, Collector<Tuple2<String, Integer>> out) -> {
  6. String[] words = value.split(" ");
  7. for (String word : words) {
  8. out.collect(Tuple2.of(word, 1));
  9. }
  10. }
  11. ).returns(Types.TUPLE(Types.STRING, Types.INT))
  12. .keyBy(value -> value.f0)
  13. .sum(1);
  14. sum.print();
  15. env.execute();
  16. }
  • 效果展示 

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号