赞
踩
Flink支持多种安装部署方式
这些安装方式我们主要讲一下standalone和on yarn。
如果是一个独立环境的话,可能会用到standalone集群模式。
在生产环境下一般还是用on yarn 这种模式比较多,因为这样可以综合利用集群资源。和我们之前讲的
spark on yarn是一样的效果,这个时候我们的Hadoop集群上面既可以运行MapReduce任务,Spark任务,还可以运行Flink任务,一举三得。
依赖环境
jdk1.8及以上【配置JAVA_HOME环境变量】
ssh免密码登录
在这我们使用bigdata01、02、03这三台机器,这几台机器的基础环境都是ok的,可以直接使用。
集群规划如下:
master:bigdata01
slave:bigdata02、bigdata03
注意:由于目前Flink各个版本之间差异比较大,属于快速迭代阶段,所以在这我们就使用最新版本了,使用Flink1.11.1版本。
1.安装包下载好以后上传到bigdata01的/data/soft目录中
- [root@bigdata01 soft]# ll flink-1.11.1-bin-scala_2.12.tgz
- -rw-r--r--. 1 root root 312224884 Aug 5 2026 flink-1.11.1-bin-scala_2.12.
- tgz
2. 解压
[root@bigdata01 soft]# tar -zxvf flink-1.11.1-bin-scala_2.12.tgz
3.修改配置
- [root@bigdata01 soft]# cd flink-1.11.1
- [root@bigdata01 flink-1.11.1]# cd conf/
- [root@bigdata01 conf]# vi flink-conf.yaml
- ......
- jobmanager.rpc.address: bigdata01
- ......
- [root@bigdata01 conf]# vi masters
- bigdata01:8081
- [root@bigdata01 conf]# vi workers
- bigdata02
- bigdata03
3:将修改完配置的flink目录拷贝到其它两个从节点
- [root@bigdata01 soft]# scp -rq flink-1.11.1 bigdata02:/data/soft/
- [root@bigdata01 soft]# scp -rq flink-1.11.1 bigdata03:/data/soft/
4:启动Flink集群
- [root@bigdata01 soft]# cd flink-1.11.1
- [root@bigdata01 flink-1.11.1]# bin/start-cluster.sh
- Starting cluster.
- Starting standalonesession daemon on host bigdata01.
- Starting taskexecutor daemon on host bigdata02.
- Starting taskexecutor daemon on host bigdata03.
5:验证一下进程
在bigdata01上执行jps
- [root@bigdata01 flink-1.11.1]# jps
- 3986 StandaloneSessionClusterEntrypoint
在bigdata02上执行jps
- [root@bigdata02 ~]# jps
- 2149 TaskManagerRunner
在bigdata03上执行jps
- [root@bigdata03 ~]# jps
- 2150 TaskManagerRunner
6:访问Flink的web界面
http://bigdata01:8081
7:停止集群,在主节点上执行停止集群脚本
- [root@bigdata01 flink-1.11.1]# bin/stop-cluster.sh
- Stopping taskexecutor daemon (pid: 2149) on host bigdata02.
- Stopping taskexecutor daemon (pid: 2150) on host bigdata03.
- Stopping standalonesession daemon (pid: 3986) on host bigdata01.
1:slot是静态的概念,是指taskmanager具有的并发执行能力
2:parallelism是动态的概念,是指程序运行时实际使用的并发能力
3:设置合适的parallelism能提高程序计算效率,太多了和太少了都不好
Flink ON YARN模式就是使用客户端的方式,直接向Hadoop集群提交任务即可。不需要单独启动Flink进程。
注意:
1:Flink ON YARN 模式依赖Hadoop 2.4.1及以上版本
2:Flink ON YARN支持两种使用方式
下面来看一下第一种方式
第一步:在集群中初始化一个长时间运行的Flink集群
使用yarn-session.sh脚本
第二步:使用flink run命令向Flink集群中提交任务
注意:使用flink on yarn需要确保hadoop集群已经启动成功
1. 首先在bigdata04机器上安装一个Flink客户端,其实就是把Flink的安装包上传上去解压即可,不需要启动
[root@bigdata04 soft]# tar -zxvf flink-1.11.1-bin-scala_2.12.tgz
2. 接下来在执行 yarn-session.sh 脚本之前我们需要先设置 HADOOP_CLASSPATH 这个环境变量,否则,执行yarn-session.sh 是会报错的,提示找不到hadoop的一些依赖。
- [root@bigdata01 flink-1.11.1]# bin/yarn-session.sh -jm 1024m -tm 1024m -d
- Error: A JNI error has occurred, please check your installation and try again
- Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
- at java.lang.Class.getDeclaredMethods0(Native Method)
- at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
- at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
- at java.lang.Class.getMethod0(Class.java:3018)
- at java.lang.Class.getMethod(Class.java:1784)
- at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
- at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
- Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
- at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
- at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
- at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
- ... 7 more
在 /etc/profile 中配置 HADOOP_CLASSPATH
- [root@bigdata04 flink-1.11.1]# vi /etc/profile
- export JAVA_HOME=/data/soft/jdk1.8
- export HADOOP_HOME=/data/soft/hadoop-3.2.0
- export HIVE_HOME=/data/soft/apache-hive-3.1.2-bin
- export SPARK_HOME=/data/soft/spark-2.4.3-bin-hadoop2.7
- export SQOOP_HOME=/data/soft/sqoop-1.4.7.bin__hadoop-2.6.0
- export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
- export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$SPARK_HO
- ME/bin:$SQOOP_HOME/bin:$PATH
刷新配置
[root@bigdata01 flink-1.11.1]# source /etc/profile
3. 接下来,使用 yarn-session.s h在YARN中创建一个长时间运行的Flink集群
[root@bigdata04 flink-1.11.1]# bin/yarn-session.sh -jm 1024m -tm 1024m -d
这个表示创建一个Flink集群, -jm 是指定主节点的内存, -tm 是指定从节点的内存, -d 是表示把这个进程放到后台去执行。启动之后,会看到类似这样的日志信息,这里面会显示flink web界面的地址,以及这个flink集群在yarn中对应的applicationid。
此时到YARN的web界面中确实可以看到这个flink集群。
可以使用屏幕中显示的flink的web地址或者yarn中这个链接都是可以进入这个flink的web界面的
4. 接下来向这个Flink集群中提交任务,此时使用Flink中的内置案例
[root@bigdata04 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar
注意:这个时候我们使用flink run的时候,它会默认找这个文件,然后根据这个文件找到刚才我们
创建的那个永久的Flink集群,这个文件里面保存的就是刚才启动的那个Flink集群在YARN中对应
的applicationid。
- 2023-02-19 02:11:19,306 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
- 2023-02-19 02:11:19,306 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
-
- [root@bigdata04 flink-1.11.1]# more /tmp/.yarn-properties-root
- #Generated YARN properties file
- #Tue Jan 20 22:50:06 CST 2026
- dynamicPropertiesString=
- applicationID=application_1768906309581_0005
5.任务提交上去执行完成之后,再来看flink的web界面,发现这里面有一个已经执行结束的任务了。
注意:这个任务在执行的时候,会动态申请一些资源执行任务,任务执行完毕之后,对应的资源会自动释放掉。
6. 最后把这个Flink集群停掉,使用yarn的kill命令
[root@bigdata04 flink-1.11.1]# yarn application -kill application_1768906309581_0005
7. 针对 yarn-session 命令,它后面还支持一些其它参数,可以在后面传一个 -help 参数
- [root@bigdata04 flink-1.11.1]# bin/yarn-session.sh -help
- Usage:
- Optional
- -at,--applicationType <arg> Set a custom application type for the
- application on YARN
- -D <property=value> use value for given property
- -d,--detached If present, runs the job in detached m
- ode
- -h,--help Help for the Yarn session CLI.
- -id,--applicationId <arg> Attach to running YARN session
- -j,--jar <arg> Path to Flink jar file
- -jm,--jobManagerMemory <arg> Memory for JobManager Container with o
- ptional unit (default: MB)
- -m,--jobmanager <arg> Address of the JobManager to which to
- connect. Use this flag to connect to a different JobManager than the one sp
- ecified in the configuration.
- -nl,--nodeLabel <arg> Specify YARN node label for the YARN a
- pplication
- -nm,--name <arg> Set a custom name for the application
- on YARN
- -q,--query Display available YARN resources (memo
- ry, cores)
- -qu,--queue <arg> Specify YARN queue.
- -s,--slots <arg> Number of slots per TaskManager
- -t,--ship <arg> Ship files in the specified directory
- (t for transfer)
- -tm,--taskManagerMemory <arg> Memory per TaskManager Container with
- optional unit (default: MB)
- -yd,--yarndetached If present, runs the job in detached m
- ode (deprecated; use non-YARN specific option instead)
- -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-
- paths for high availability mode
在这我对一些常见的命令进行了整理,添加了中文注释
注意:这里的-j 是指定Flink任务的jar包,此参数可以省略不写也可以
flink run -m yarn-cluster (创建Flink集群+提交任务)
使用flink run直接创建一个临时的Flink集群,并且提交任务
此时这里面的参数前面加上了一个 y 参数
- [root@bigdata04 flink-1.11.1]# bin/flink run -m yarn-cluster -yjm 1024
- -ytm 1024 ./examples/batch/WordCount.jar
提交上去之后,会先创建一个Flink集群,然后在这个Flink集群中执行任务。
针对Flink命令的一些用法汇总:
1:提高大数据集群机器的利用率
2:一套集群,可以执行MR任务,Spark任务,Flink任务等
接下来我们希望把前面我们自己开发的Flink任务提交到集群上面,在这我就使用flink on yarn的第二种方式来向集群提交一个Flink任务。
1. 在pom.xml中添加打包配置
- <build>
- <plugins>
- <!-- 编译插件 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.6.0</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- <!-- scala编译插件 -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.6</version>
- <configuration>
- <scalaCompatVersion>2.12</scalaCompatVersion>
- <scalaVersion>2.12.11</scalaVersion>
- <encoding>UTF-8</encoding>
- </configuration>
- <executions>
- <execution>
- <id>compile-scala</id>
- <phase>compile</phase>
- <goals>
- <goal>add-source</goal>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>test-compile-scala</id>
- <phase>test-compile</phase>
- <goals>
- <goal>add-source</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!-- 打jar包插件(会包含所有依赖) -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.6</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- <archive>
- <manifest>
- <!-- 可以设置jar包的入口类(可选) -->
- <mainClass></mainClass>
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
2. 打包代码
mvn clean package -DskipTests
3. 将 db_flink-1.0-SNAPSHOT-jar-with-dependencies.jar 上传到bigdata04机器上
的 /data/soft/flink-1.11.1 目录中(上传到哪个目录都可以)
4. 提交Flink任务
注意:提交任务之前,先开启socket
[root@bigdata04 ~]# nc -l 9001
[root@bigdata04 flink-1.11.1]#bin/flink run -m yarn-cluster -c com.imooc.scala.stream.SocketWindowWordCountScala -yjm 1024 -ytm 1024 db_flink-1.0-SNAPSHOT-jar-with-dependencies.jar
6. 此时到yarn上面可以看到确实新增了一个任务,点击进去可以看到flink的web界面
通过socket输入一串内容
然后到flink的web界面查看日志
7. 接下来我们希望把这个任务停掉,因为这个任务是一个流处理的任务,提交成功之后,它会一直运行。
注意:此时如果我们使用ctrl+c关掉之前提交任务的那个进程,这里的flink任务是不会有任何影响的,可以一直运行,因为flink任务已经提交到hadoop集群里面了。
此时如果想要停止Flink任务,有两种方式:
方式一:停止yarn中任务
[root@bigdata04 flink-1.11.1]# yarn application -kill application_1768962956138_0001
方式二:停止flink任务。可以在界面上点击这个按钮,或者在命令行中执行flink cancel停止都可以
或者
[root@bigdata04 flink-1.11.1]# bin/flink cancel -yid application_1768962956138_0001 7b99bfb261a92f84a89d87bcca3a3e23
这个flink任务停止之后,对应的那个yarn-session(Flink集群)也就停止了。
注意:此时flink任务停止之后就无法再查看flink的web界面了,如果想看查看历史任务的执行信息就看不了了,怎么办呢?
咱们之前在学习spark的时候其实也遇到过这种问题,当时是通过启动spark的historyserver进程解决的。flink也有historyserver进程,也是可以解决这个问题的。historyserver进程可以在任意一台机器上启动,在这我们选择在bigdata04机器上启动在启动historyserver进程之前,需要先修改bigdata04中的flink-conf.yaml配置文件。
- [root@bigdata04 flink-1.11.1]# vi conf/flink-conf.yaml
- jobmanager.archive.fs.dir: hdfs://bigdata01:9000/completed-jobs/
- historyserver.web.address: 192.168.182.103
- historyserver.web.port: 8082
- historyserver.archive.fs.dir: hdfs://bigdata01:9000/completed-jobs/
- historyserver.archive.fs.refresh-interval: 10000
然后启动flink的historyserver进程
[root@bigdata04 flink-1.11.1]# bin/historyserver.sh start
注意:hadoop集群中的historyserver进程也需要启动
此时Flink任务停止之后也是可以访问flink的web界面的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。