当前位置:   article > 正文

Flink入门之WordCount(一)_flink wordcount代码

flink wordcount代码

一、需求

1、Linux端通过nc -lk 开启一个服务, 监听9999端口, 然后不断输入单词;

2、编写Flink程序,连接上述服务端9999端口,实时读取单词, 并对结果进行累加

二、代码实现

1、服务端

[root@master ~]# nc -lk 9999

 Tips:如果没有安装nc,按如下方式安装(Centos):

[root@master ~]# yum install -y nc

2、Flink代码

  1. package com.wakedata.stuty;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.functions.FlatMapIterator;
  4. import org.apache.flink.api.java.functions.KeySelector;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.util.Collector;
  11. import org.apache.flink.util.TimeUtils;
  12. import org.omg.PortableInterceptor.INACTIVE;
  13. import java.util.Iterator;
  14. import java.util.List;
  15. public class wordcount {
  16. public static void main(String[] args) throws Exception {
  17. //1. 获取Flink编程入口环境
  18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. //2. 通过socketTextStream 连接服务端9999端口,获取 input DataSource
  20. DataStreamSource<String> inputStreaming = env.socketTextStream("master", 9999);
  21. //3.分词
  22. SingleOutputStreamOperator<Tuple2<String, Integer>> wordcount = inputStreaming.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  23. @Override
  24. public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
  25. String[] words = s.split("\\s+");
  26. for (String word : words) {
  27. out.collect(new Tuple2(word, 1));
  28. }
  29. }
  30. });
  31. //4.按单词分组
  32. KeyedStream<Tuple2<String, Integer>, String> wordgroup = wordcount.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
  33. @Override
  34. public String getKey(Tuple2<String, Integer> wordcount) throws Exception {
  35. //按照单词分组
  36. return wordcount.getField(0);
  37. }
  38. });
  39. SingleOutputStreamOperator<Tuple2<String, Integer>> result =
  40. //5.累加
  41. wordgroup.sum(1);
  42. //打印结果
  43. result.print();
  44. //开启flink程序
  45. env.execute();
  46. }
  47. }

3、执行结果

(1)服务端不断输入单词

  1. hadoop hive spark
  2. jave
  3. hello hello
  4. hello
  5. hi
  6. word
  7. word

(2)Flink程序实时计算单词累加结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/683318
推荐阅读
相关标签
  

闽ICP备14008679号