当前位置:   article > 正文

Flink学习3-WordCount词频统计(流处理)_datastream wordcount =

datastream wordcount =

上一节,我们基于Flink批处理工作模式实现了词频统计任务,但是Flink最强大之处在于是流处理,后续我们会发现,Flink批处理也是基于流处理实现的,因此本节我们基于Flink流处理模式来实现词频统计任务。主要内容:

1.功能开发

还是基于之前的WordCount项目,我们直接修改WordCount.java,输入数据直接修改成监听本地9000端口,修改后的源码如下

  1. package com.windy.myflink;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.util.Collector;
  7. public class WordCount {
  8. private static String host = "localhost";
  9. private static int port = 9000;
  10. public static void main(String[] args) throws Exception {
  11. StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  12. DataStream<String> dataStream = streamEnv.socketTextStream(host, port);
  13. dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  14. @Override
  15. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  16. String[] fields = s.toLowerCase().split("\\s");
  17. for (String field : fields) {
  18. collector.collect(new Tuple2<>(field.trim(), 1));
  19. }
  20. }
  21. }).keyBy(x -> x.f0)
  22. .sum(1)
  23. .print();
  24. streamEnv.execute();
  25. }
  26. }

使用Build => Build Artifacts => build进行编译打包。

2.任务运行

2.1 启动集群

Flink流处理需要在集群模式下运行,因此我们需要先在本地启动Flink集群,首先进入flink安装目录,然后执行start脚本。

cd /Users/windy/Package/flink-1.15.0/bin && ./start-cluster.sh

执行成功后,打开浏览器,输入http://localhost:8081,查看Flink dashboard,由于当前没有提交任何任务,所以Running Jobs数是0。

2.2 提交任务

由于我们Job输入设置的是监听本地9000端口,因为启动任务前,需要先在终端下启动9000端口监听,并输入部分内容。

  1. WINDY-MB0:bin windy$ nc -l 9000
  2. hi, world
  3. hello world

使用fink run命令提交任务,返回Job has been submitted代表提交成功。

  1. WINDY-MB0:word-count windy$ flink run out/artifacts/word_count_jar/word-count.jar
  2. Job has been submitted with JobID 5be2909113ca3418f2ececf2d20e61cc

2.3 查看任务

浏览器打开Flin Dashboard,发现我们提交的任务已经处于RUNNING状态。

点击上图红框中的“RUNNING”,进入任务运行界面,中间区域显示的任务的拓扑图

 最终的统计结果是在Sink阶段,因此点击上图红框中的蓝色方块,找到LOG,并点击进入。

选择Stdout,并将进度条拉到最后,可以看到词频统计结果。细心的小伙伴可能会发现,world输出了两条统计结果,这是因为在FlinK流处理下,每处理一条输入,就会对应一条输出。

 

至此,我们完成了Flink流处理的词频统计任务,感兴趣的小伙伴可以自行拓展。 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/467604
推荐阅读
相关标签
  

闽ICP备14008679号