当前位置:   article > 正文

Flink1.8实现wordcount_flink开发wordcount

flink开发wordcount

之前已经记录了在Mac上安装Flink1.8

https://blog.csdn.net/zhangvalue/article/details/93166895

1️⃣、开始创建一个项目名为flink_begin的maven项目

2️⃣、pom.xml文件添加如下dependency:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.8.0</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-streaming-java_2.12</artifactId>
  11. <version>1.8.0</version>
  12. <scope>provided</scope>
  13. </dependency>
  14. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-clients_2.12</artifactId>
  18. <version>1.8.0</version>
  19. </dependency>
  20. </dependencies>

wordcount.java 文件如下:

  1. /**
  2. * @ Author zhangsf
  3. * @CreateTime 2019-06-20 - 18:58
  4. */
  5. import org.apache.flink.api.common.functions.FlatMapFunction;
  6. import org.apache.flink.api.java.utils.ParameterTool;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.windowing.time.Time;
  11. import org.apache.flink.util.Collector;
  12. public class WordCount {
  13. public static void main(String[] args) throws Exception {
  14. //定义socket的端口号
  15. int port;
  16. try {
  17. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  18. port = parameterTool.getInt("port");
  19. } catch (Exception e) {
  20. System.err.println("指定port参数,默认值为9000");
  21. port = 9000;
  22. }
  23. //获取运行环境
  24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. //连接socket获取输入的数据
  26. DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n");
  27. //计算数据
  28. DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
  29. public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
  30. String[] splits = value.split("\\s");
  31. for (String word : splits) {
  32. out.collect(new WordWithCount(word, 1L));
  33. }
  34. }
  35. })//打平操作,把每行的单词转为<word,count>类型的数据
  36. .keyBy("word")//针对相同的word数据进行分组
  37. .timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
  38. .sum("count");
  39. //把数据打印到控制台
  40. windowCount.print()
  41. .setParallelism(1);//使用一个并行度
  42. //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
  43. env.execute("streaming word count");
  44. }
  45. /**
  46. * 主要为了存储单词以及单词出现的次数
  47. */
  48. public static class WordWithCount {
  49. public String word;
  50. public long count;
  51. public WordWithCount() {
  52. }
  53. public WordWithCount(String word, long count) {
  54. this.word = word;
  55. this.count = count;
  56. }
  57. @Override
  58. public String toString() {
  59. return "WordWithCount{" +
  60. "word='" + word + '\'' +
  61. ", count=" + count +
  62. '}';
  63. }
  64. }
  65. }

此时需要将本机的终端上开一个端口号为9000的监听

nc -l 9000

准备就绪就可以开始run 起来了

可能会出现问题 

java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream

 https://blog.csdn.net/zhangvalue/article/details/93165357 解决

启动起来之后在

输入hello hello world world world world flink flink flink flink flink,回车。在IDEA的控制台会显示如下单词和词频的信息

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/772752
推荐阅读
相关标签
  

闽ICP备14008679号