赞
踩
目录
Windows安装好jdk8、Maven3、IDEA
Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo
在项目的pom.xml文件中添加Flink的依赖。
- <properties>
- <flink.version>1.17.1</flink.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
- </dependencies>
刷新依赖
刷新依赖后,能看到相关依赖如下
刷新依赖过程需要等待一些时间来下载相关依赖。
如果依赖下载慢,可以设置阿里云仓库镜像:
1.设置maven的settings.xml
在</mirrors>
上面一行添加阿里云仓库镜像
- <mirror>
- <id>alimaven</id>
- <name>aliyun maven</name>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- <mirrorOf>central</mirrorOf>
- </mirror>
2.IDEA设置maven
在工程的根目录下,新建一个data文件夹
并在data文件夹下创建文本文件words.txt
内容如下
hello world hello java hello flink
右键src/main下的java,新建Package
填写包名org.example
,包名与groupId
的内容一致。
在org.exmaple
下新建wc
包及BatchWordCount
类
填写wc.BatchWordCount
效果如下
BatchWordCount.java
代码如下:
- package org.example.wc;
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.AggregateOperator;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.operators.FlatMapOperator;
- import org.apache.flink.api.java.operators.UnsortedGrouping;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
-
- public class BatchWordCount {
- public static void main(String[] args) throws Exception {
- // 1. 创建执行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // 2. 从文件读取数据 按行读取
- DataSource<String> lineDS = env.readTextFile("data/words.txt");
-
- // 3. 转换数据格式
- FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
-
- @Override
- public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
-
- String[] words = line.split(" ");
-
- for (String word : words) {
- out.collect(Tuple2.of(word,1L));
- }
- }
- });
-
- // 4. 按照 word 进行分组
- UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
-
- // 5. 分组内聚合统计
- AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
-
- // 6. 打印结果
- sum.print();
- }
- }
运行程序,查看结果
注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
在org.example.wc
包下新建Java类StreamWordCount,代码如下:
- package org.example.wc;
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- public class StreamWordCount {
- public static void main(String[] args) throws Exception {
- // 1. 创建流式执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 2. 读取文件
- DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");
-
- // 3. 转换、分组、求和,得到统计结果
- SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
- @Override
- public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
-
- String[] words = line.split(" ");
-
- for (String word : words) {
- out.collect(Tuple2.of(word, 1L));
- }
- }
- }).keyBy(data -> data.f0)
- .sum(1);
-
- // 4. 打印
- sum.print();
-
- // 5. 执行
- env.execute();
- }
- }
运行结果
与批处理程序BatchWordCount的区别:
创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
转换处理之后,得到的数据对象类型不同。
分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
代码末尾需要调用env的execute方法,开始执行任务。
流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。
在org.example.wc
包下新建Java类SocketStreamWordCount,代码如下:
- package org.example.wc;
-
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- public class SocketStreamWordCount {
- public static void main(String[] args) throws Exception {
- // 1. 创建流式执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
- DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);
-
- // 3. 转换、分组、求和,得到统计结果
- SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
- String[] words = line.split(" ");
-
- for (String word : words) {
- out.collect(Tuple2.of(word, 1L));
- }
- }).returns(Types.TUPLE(Types.STRING, Types.LONG))
- .keyBy(data -> data.f0)
- .sum(1);
-
- // 4. 打印
- sum.print();
-
- // 5. 执行
- env.execute();
- }
- }
进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:
[hadoop@node2 ~]$ sudo yum install nc -y
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
IDEA中,运行SocketStreamWordCount程序。
往7777端口发送数据,例如发送hello world
控制台输出
继续往7777端口发送数据,例如发送hello flink
控制台输出
停止SocketStreamWordCount程序。
按Ctrl+c停止nc命令。
这里的打包是将写好的程序打成jar包。
点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。
打包成功后,看到如下输出信息,生成的jar包在项目的target目录下
把jar包提交到flink集群运行有两种方式:
1.通过命令行提交作业
2.通过Web UI提交作业
[hadoop@node2 ~]$ start-cluster.sh Starting cluster. Starting standalonesession daemon on host node2. Starting taskexecutor daemon on host node2. Starting taskexecutor daemon on host node3. Starting taskexecutor daemon on host node4.
[hadoop@node2 ~]$ nc -lk 7777
开启另一个node2终端,使用flink run
命令提交作业到flink集群
[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar
-m指定提交到的JobManager,-c指定程序入口类。
在nc监听终端,往7777端口发送数据
浏览器访问
node2:8081
看到正在运行的作业如下
查看结果
在nc终端继续发送数据
打开新的node2终端,查看结果
[hadoop@node2 ~]$ cd $FLINK_HOME/log [hadoop@node2 log]$ ls flink-hadoop-client-node2.log flink-hadoop-standalonesession-0-node2.out flink-hadoop-standalonesession-0-node2.log flink-hadoop-taskexecutor-0-node2.log flink-hadoop-standalonesession-0-node2.log.1 flink-hadoop-taskexecutor-0-node2.log.1 flink-hadoop-standalonesession-0-node2.log.2 flink-hadoop-taskexecutor-0-node2.log.2 flink-hadoop-standalonesession-0-node2.log.3 flink-hadoop-taskexecutor-0-node2.log.3 flink-hadoop-standalonesession-0-node2.log.4 flink-hadoop-taskexecutor-0-node2.log.4 flink-hadoop-standalonesession-0-node2.log.5 flink-hadoop-taskexecutor-0-node2.out [hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out (hello,1) (flink,1) (hello,2) (world,1)
点击Cancel Job取消作业
按Ctrl+c停止nc命令
开启nc监听发送数据
[hadoop@node2 ~]$ nc -lk 7777
浏览器访问
node2:8081
点击Submit New Job
点击Add New
选择flink作业jar包所在路径
点击jar包名称
填写相关内容,点击Submit提交作业
Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount
Parallesim填写作业的并行度,例如:1
提交后,在Running Jobs里看到运行的作业
往7777端口发送数据
按住Ctrl+c停止nc命令
[hadoop@node2 ~]$ stop-cluster.sh Stopping taskexecutor daemon (pid: 2283) on host node2. Stopping taskexecutor daemon (pid: 1827) on host node3. Stopping taskexecutor daemon (pid: 1829) on host node4. Stopping standalonesession daemon (pid: 1929) on host node2.
https://gitee.com/
注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。
使用IDEA提交代码
提示有警告,忽略警告,继续提交
提交成功后,IDEA显示如下
刷新浏览器查看gitee界面,看到代码已上传成功
完成!enjoy it!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。