赞
踩
上一节,我们基于Flink批处理工作模式实现了词频统计任务,但是Flink最强大之处在于是流处理,后续我们会发现,Flink批处理也是基于流处理实现的,因此本节我们基于Flink流处理模式来实现词频统计任务。主要内容:
还是基于之前的WordCount项目,我们直接修改WordCount.java,输入数据直接修改成监听本地9000端口,修改后的源码如下
- package com.windy.myflink;
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- public class WordCount {
- private static String host = "localhost";
- private static int port = 9000;
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<String> dataStream = streamEnv.socketTextStream(host, port);
- dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
- String[] fields = s.toLowerCase().split("\\s");
- for (String field : fields) {
- collector.collect(new Tuple2<>(field.trim(), 1));
- }
- }
- }).keyBy(x -> x.f0)
- .sum(1)
- .print();
- streamEnv.execute();
- }
- }
使用Build => Build Artifacts => build进行编译打包。
Flink流处理需要在集群模式下运行,因此我们需要先在本地启动Flink集群,首先进入flink安装目录,然后执行start脚本。
cd /Users/windy/Package/flink-1.15.0/bin && ./start-cluster.sh
执行成功后,打开浏览器,输入http://localhost:8081,查看Flink dashboard,由于当前没有提交任何任务,所以Running Jobs数是0。
由于我们Job输入设置的是监听本地9000端口,因为启动任务前,需要先在终端下启动9000端口监听,并输入部分内容。
- WINDY-MB0:bin windy$ nc -l 9000
- hi, world
- hello world
使用fink run命令提交任务,返回Job has been submitted代表提交成功。
- WINDY-MB0:word-count windy$ flink run out/artifacts/word_count_jar/word-count.jar
- Job has been submitted with JobID 5be2909113ca3418f2ececf2d20e61cc
浏览器打开Flin Dashboard,发现我们提交的任务已经处于RUNNING状态。
点击上图红框中的“RUNNING”,进入任务运行界面,中间区域显示的任务的拓扑图
最终的统计结果是在Sink阶段,因此点击上图红框中的蓝色方块,找到LOG,并点击进入。
选择Stdout,并将进度条拉到最后,可以看到词频统计结果。细心的小伙伴可能会发现,world输出了两条统计结果,这是因为在FlinK流处理下,每处理一条输入,就会对应一条输出。
至此,我们完成了Flink流处理的词频统计任务,感兴趣的小伙伴可以自行拓展。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。