当前位置:   article > 正文

Flink WordCount实践_wordcount 流处理

wordcount 流处理

目录

前提条件

基本准备

批处理API实现WordCount

流处理API实现WordCount

数据源是文件

数据源是socket文本流

打包

提交到集群运行

命令行提交作业

Web UI提交作业

上传代码到gitee


前提条件

Windows安装好jdk8、Maven3、IDEA

Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
 

基本准备

创建项目

使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo

添加依赖

在项目的pom.xml文件中添加Flink的依赖。

  1. <properties>
  2. <flink.version>1.17.1</flink.version>
  3. </properties>
  4. <dependencies>
  5. <dependency>
  6. <groupId>org.apache.flink</groupId>
  7. <artifactId>flink-streaming-java</artifactId>
  8. <version>${flink.version}</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.apache.flink</groupId>
  12. <artifactId>flink-clients</artifactId>
  13. <version>${flink.version}</version>
  14. </dependency>
  15. </dependencies>

刷新依赖

刷新依赖后,能看到相关依赖如下

刷新依赖过程需要等待一些时间来下载相关依赖。

如果依赖下载慢,可以设置阿里云仓库镜像:

 1.设置maven的settings.xml

</mirrors>上面一行添加阿里云仓库镜像

  1. <mirror>
  2. <id>alimaven</id>
  3. <name>aliyun maven</name>
  4. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  5. <mirrorOf>central</mirrorOf>
  6. </mirror>

2.IDEA设置maven

数据准备

在工程的根目录下,新建一个data文件夹

并在data文件夹下创建文本文件words.txt

内容如下

hello world
hello java
hello flink

新建包

右键src/main下的java,新建Package

填写包名org.example,包名与groupId的内容一致。

批处理API实现WordCount

org.exmaple下新建wc包及BatchWordCount

填写wc.BatchWordCount

效果如下

BatchWordCount.java代码如下:

  1. package org.example.wc;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.AggregateOperator;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.operators.FlatMapOperator;
  7. import org.apache.flink.api.java.operators.UnsortedGrouping;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.util.Collector;
  10. public class BatchWordCount {
  11. public static void main(String[] args) throws Exception {
  12. // 1. 创建执行环境
  13. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  14. // 2. 从文件读取数据 按行读取
  15. DataSource<String> lineDS = env.readTextFile("data/words.txt");
  16. // 3. 转换数据格式
  17. FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  18. @Override
  19. public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
  20. String[] words = line.split(" ");
  21. for (String word : words) {
  22. out.collect(Tuple2.of(word,1L));
  23. }
  24. }
  25. });
  26. // 4. 按照 word 进行分组
  27. UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
  28. // 5. 分组内聚合统计
  29. AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
  30. // 6. 打印结果
  31. sum.print();
  32. }
  33. }

运行程序,查看结果

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理API实现WordCount

数据源是文件

org.example.wc包下新建Java类StreamWordCount,代码如下:

  1. package org.example.wc;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. public class StreamWordCount {
  9. public static void main(String[] args) throws Exception {
  10. // 1. 创建流式执行环境
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. // 2. 读取文件
  13. DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");
  14. // 3. 转换、分组、求和,得到统计结果
  15. SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  16. @Override
  17. public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
  18. String[] words = line.split(" ");
  19. for (String word : words) {
  20. out.collect(Tuple2.of(word, 1L));
  21. }
  22. }
  23. }).keyBy(data -> data.f0)
  24. .sum(1);
  25. // 4. 打印
  26. sum.print();
  27. // 5. 执行
  28. env.execute();
  29. }
  30. }

运行结果

与批处理程序BatchWordCount的区别:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。

  • 转换处理之后,得到的数据对象类型不同。

  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。

  • 代码末尾需要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。

org.example.wc包下新建Java类SocketStreamWordCount,代码如下:

  1. package org.example.wc;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. public class SocketStreamWordCount {
  9. public static void main(String[] args) throws Exception {
  10. // 1. 创建流式执行环境
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
  13. DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);
  14. // 3. 转换、分组、求和,得到统计结果
  15. SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
  16. String[] words = line.split(" ");
  17. for (String word : words) {
  18. out.collect(Tuple2.of(word, 1L));
  19. }
  20. }).returns(Types.TUPLE(Types.STRING, Types.LONG))
  21. .keyBy(data -> data.f0)
  22. .sum(1);
  23. // 4. 打印
  24. sum.print();
  25. // 5. 执行
  26. env.execute();
  27. }
  28. }

进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:

[hadoop@node2 ~]$ sudo yum install nc -y

开启nc监听

[hadoop@node2 ~]$ nc -lk 7777

IDEA中,运行SocketStreamWordCount程序。

往7777端口发送数据,例如发送hello world

控制台输出

继续往7777端口发送数据,例如发送hello flink

控制台输出

停止SocketStreamWordCount程序。

按Ctrl+c停止nc命令。

打包

这里的打包是将写好的程序打成jar包。

点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。

打包成功后,看到如下输出信息,生成的jar包在项目的target目录下

提交到集群运行

把jar包提交到flink集群运行有两种方式:

1.通过命令行提交作业   

2.通过Web UI提交作业

命令行提交作业

将jar包上传Linux

启动flink集群
[hadoop@node2 ~]$ start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.
​
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
​
命令提交作业

开启另一个node2终端,使用flink run命令提交作业到flink集群

[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar

-m指定提交到的JobManager,-c指定程序入口类。

发送测试数据

在nc监听终端,往7777端口发送数据

查看结果
Web UI查看结果

浏览器访问

node2:8081

看到正在运行的作业如下

查看结果

继续发送测试数据

在nc终端继续发送数据

Web UI刷新结果

命令行查看结果

打开新的node2终端,查看结果

[hadoop@node2 ~]$ cd $FLINK_HOME/log
[hadoop@node2 log]$ ls
flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
flink-hadoop-standalonesession-0-node2.log    flink-hadoop-taskexecutor-0-node2.log
flink-hadoop-standalonesession-0-node2.log.1  flink-hadoop-taskexecutor-0-node2.log.1
flink-hadoop-standalonesession-0-node2.log.2  flink-hadoop-taskexecutor-0-node2.log.2
flink-hadoop-standalonesession-0-node2.log.3  flink-hadoop-taskexecutor-0-node2.log.3
flink-hadoop-standalonesession-0-node2.log.4  flink-hadoop-taskexecutor-0-node2.log.4
flink-hadoop-standalonesession-0-node2.log.5  flink-hadoop-taskexecutor-0-node2.out
[hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out 
(hello,1)
(flink,1)
(hello,2)
(world,1)
​

取消flink作业

点击Cancel Job取消作业 

停止nc监听

按Ctrl+c停止nc命令

Web UI提交作业

开启nc监听

开启nc监听发送数据

[hadoop@node2 ~]$ nc -lk 7777

Web UI提交作业

浏览器访问

node2:8081

点击Submit New Job

点击Add New

选择flink作业jar包所在路径

点击jar包名称

填写相关内容,点击Submit提交作业

Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount

Parallesim填写作业的并行度,例如:1

提交后,在Running Jobs里看到运行的作业

发送测试数据

往7777端口发送数据

查看结果

继续发送测试数据

刷新结果

取消作业

停止nc监听

按住Ctrl+c停止nc命令

关闭flink集群
[hadoop@node2 ~]$ stop-cluster.sh 
Stopping taskexecutor daemon (pid: 2283) on host node2.
Stopping taskexecutor daemon (pid: 1827) on host node3.
Stopping taskexecutor daemon (pid: 1829) on host node4.
Stopping standalonesession daemon (pid: 1929) on host node2.

上传代码到gitee

登录gitee

https://gitee.com/

注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。

创建仓库

提交代码

使用IDEA提交代码

提示有警告,忽略警告,继续提交

提交成功后,IDEA显示如下

刷新浏览器查看gitee界面,看到代码已上传成功

完成!enjoy it!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/562804
推荐阅读
相关标签
  

闽ICP备14008679号