赞
踩
flink 下载地址:https://flink.apache.org/downloads/
下载 flink 安装包:flink-1.15.4-bin-scala_2.12.tgz
将安装包上传到虚拟机节点并解压缩
tar -xzf flink-1.15.4-bin-scala_2.12.tgz -C /opt/module
cd /opt/module
mv flink-1.10.1 flink
进入 flink 安装目录,执行启动命令,并查看进程
cd /opt/module/flink
bin/start-cluster.sh
通过jps 查询进程
17409 TaskManagerRunner
13597 StandaloneSessionClusterEntrypoint
若出现上述进程,则代表启动成功。StandaloneSessionClusterEntrypoint为Flink主进程,即JobManager;TaskManagerRunner为Flink从进程,即TaskManager。
访问 http://node1:8081 进入 flink 集群和任务监控管理 Web 页面,从WebUI中可以看出,当前本地模式的Task Slot数量和TaskManager数量。访问结果如下图所示:
关闭 flink:bin/stop-cluster.sh
注:如果出现访问不通的情况,使用如下方式解决:
正常启动集群后,只能通过http://localhost:8081访问Flink Web UI,如果通过IP地址访问,则出现连接被拒绝的情况。此时需要修改Flink配置文件flink-conf.yaml,将localhost改成0.0.0.0,修改之后重启服务就可以使用IP地址访问。修改部分见下图:稍后再集群中会统一讲解
1:环境准备
根据业务需要是否依赖:
2:给集群的每台服务器分配好角色
节点IP | 节点名称 | Flink服务 |
---|---|---|
192.168.180.1 | node1 | JobManager,TaskManager |
192.168.180.2 | node2 | TaskManager |
192.168.180.3 | node3 | TaskManager |
3:集群的服务器之间配置好ssh免密登录
* 在master机器执行ssh-keygen -t rsa
* 在master机器执行命令,将密钥拷贝到其余服务器: ssh-copy-id -i /root/.ssh/id_rsa.pub 192.168.180.2
* 测试每个节点:
ssh -p22 isi@node2
flink-1.15.4-bin-scala_2.12.tgz
flink 下载地址:https://flink.apache.org/downloads/
将安装包上传到虚拟机节点并解压缩
tar -xzf flink-1.15.4-bin-scala_2.12.tgz -C /opt/module
cd /opt/module
mv flink-1.10.1 flink
Flink的配置文件都存放于安装目录下的conf目录。在JobManager服务器,进入该目录,执行以下操作。
cd flink
vim conf/flink-conf.yaml
# JobManager地址
jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
# JobManager JVM heap 内存大小
jobmanager.heap.size: 1024
# JobManager地址绑定设置
jobmanager.bind-host: 0.0.0.0
# TaskManager JVM heap 内存大小
taskmanager.heap.size: 1024
# TaskManager地址(不同TaskManager节点host配置对应的host)
taskmanager.host: node1
# 每个 TaskManager 提供的任务 slots 数量大小(每台机器可用的CPU数量,默认值:1)
taskmanager.numberOfTaskSlots: 3
# 是否进行预分配内存,默认不进行预分配,这样在我们不使用flink集群时候不会占用集群资源
taskmanager.memory.preallocate: false
# 程序默认并行计算的个数(就是flink算子的并行度)
parallelism.default: 1
# WEB UI 节点(只需JobManager节点设置,TaskManager节点设置了也无所谓)
rest.address: node1
# WEB UI 节点端口,默认8081
rest.port: 8081
#如果 rest.port 的端口占用, 则使用 rest.bind-port 指定的端口范围
rest.bind-port: 8080-8090
# WEB UI节点绑定设置(只需JobManster节点设置)
rest.bind-address: 0.0.0.0
#是否从基于 Web 的 jobmanager 启用作业提交
# 注意: 即使禁用此功能,会话集群仍会通过 REST 请求(HTTP 调用)接受作业, 该配置仅保护在 UI 中上传作业的功能
web.submit.enable: true
# 启用通过 Flink UI 取消作业(默认为 true)
# 注意: 即使禁用此功能,会话集群仍会通过 REST 请求(HTTP 调用)取消作业, 该配置仅保护取消 UI 中的作业的功能
web.cancel.enable: true
# 存储上传作业的目录, 仅在 web.submit.enable 为 true 时使用
web.upload.dir
注意:以上设置的0.0.0.0代表监听当前节点每一个可用的网络接口,0.0.0.0不再是一个真正意义上的ip地址,而表示一个集合,监听0.0.0.0的端口相当于是可以监听本机中的所有ip端口。以上配置的0.0.0.0 表示想要让外部访问需要设置具体ip,或者直接设置为"0.0.0.0"。
另外,在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件进行优化配置,主要配置项如下:
配置Master节点就是配置JobManager节点,在$FLINK_HOME/conf/masters文件中配置jobManager节点如下:
#vim $FLINK_HOME/conf/masters
node1:8081
配置Worker节点就是配置TaskManager节点,在$FLINK_HOME/conf/workers文件中配置taskManager节点如下:
#vim $FLINK_HOME/conf/workers
node1
node2
node3
#分发到node2、node3节点上
[root@node1 ~]# scp -r /opt/module/flink node2:/opt/module/
[root@node1 ~]# scp -r /opt/module/flink node3:/opt/module/
#修改node2、node3 节点flink-conf.yaml文件中的TaskManager,TaskManager地址(不同TaskManager节点host配置对应的host)
【node2节点】 vim flink-conf.yaml
【node2节点】 taskmanager.host: node2
【node3节点】 vim flink-conf.yaml
【node3节点】 taskmanager.host: node3
#在node1节点中,启动Flink集群
[root@node1 ~]# cd /opt/module/flink/bin/
[root@node1 bin]# ./start-cluster.sh
将编码好的 Flink maven 工程打成 jar 包
访问 http://node1:8081 进入 flink 监控页面,选择左侧的 Submit New Job 选项菜单
点击 +Add New 按钮,然后选择 jar 包进行上传
点击页面上上传好的 jar 包项,配置填写主程序类全类名、启动参数项、并行度等;点击 submit 提交任务
在页面左侧的 overview 和 jobs 等菜单选项下查看任务运行情况
一个 job 所占据的 TaskSlots 数等于该 job 中最大的并行度
在WebUI中点击对应的任务Job,进入如下页面点击"Cancel Job"取消任务执行:
#提交任务:bin/flink run -m [jobmanager主机和端口] -c [主程序类全类名] -p [并行度] [jar包的绝对路径] [--param1 value1 --param2 value2 ...]
cd flink
bin/flink run -m node1:8081 -c com.app.wc.StreamWordCount2 -p 3 /project/FlinkTutorial/target/FlinkTutorial-1.0-SNAPSHOT.jar --host localhost --port 7777
#查看job:-a 可以查看已经取消的job
bin/flink list [-a]
#取消job
bin/flink cancel [jobId]
不能使用 start-cluster.sh 命令启动集群
将编码好的 Flink maven 工程打成 jar 包,并将 jar 包上传到 flink 安装目录下的 lib 目录
启动 JobManager
cd /opt/module/flink
bin/standalone-job.sh start --job-classname com.app.wc.StreamWordCount2
启动 TaskManager
cd /opt/module/flink
bin/taskmanager.sh start
访问 http://node1:8081 查看 flink 监控页面的作业执行
关闭
cd /opt/module/flink
bin/standalone-job.sh stop
bin/taskmanager.sh stop
Flink可以基于Yarn来运行任务,Yarn作为资源提供方,可以根据Flink任务资源需求动态的启动TaskManager来提供资源。Flink基于Yarn提交任务通常叫做Flink On Yarn,Yarn资源调度框架运行需要有Hadoop集群,Hadoop版本最低是2.8.5。
Flink 基于Yarn提交任务,向Yarn集群中提交Flink任务的客户端需要满足以下两点
#修改
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
Yarn Session模式首先需要在Yarn中初始化一个Flink集群(称为Flink Yarn Session 集群),开辟指定的资源,以后的Flink任务都提交到这里。这个Flink集群会常驻在YARN集群中,除非手工停止(yarn application -kill id),当手动停止yarn application对应的id时,运行在当前application上的所有flink任务都会被kill。这种方式创建的Flink集群会独占资源,不管有没有Flink任务在执行,YARN上面的其他任务都无法使用这些资源。
[root@node3 ~] cd /software/flink-1.16.0/bin/
#启动Yarn Session集群,名称为lansonjy,每个TM有3个slot
[root@node3 bin] ./yarn-session.sh -s 3 -nm lansonjy -d
参数 | 解释 |
---|---|
-d | –detached,Yarn Session集群启动后在后台独立运行,退出客户端,也可不指定,则客户端不退出。 |
-nm | –name,自定义在YARN上运行Application应用的名字。 |
-jm | –jobManagerMemory,指定JobManager所需内存,单位MB。 |
-tm | –taskManagerMemory,指定每个TaskManager所需的内存,单位MB。 |
-s | –slots,指定每个TaskManager上Slot的个数。 |
-id | –applicationId,指定YARN集群上的任务ID,附着到一个后台独立运行的yarn session中。 |
-qu | –queue,指定Yarn的资源队列。 |
目前在Yarn Session集群WebUI中看不到启动的TaskManager ,这是因为Yarn会按照提交任务的需求动态分配TaskManager数量,所以Flink 基于Yarn Session运行任务资源是动态分配的。
此外,创建出Yarn Session集群后会在node5节点/tmp/下创建一个隐藏的".yarn-properties-<用户名>" Yarn属性文件,有了该文件后,在当前节点提交Flink任务时会自动发现Yarn Session集群并进行任务提交。
[root@node3 ~] cd /software/flink-1.16.0/bin/
#执行如下命令,会根据.yarn-properties-<用户名>文件,自动发现yarn session 集群
[root@node3 bin] ./flink run -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
#也可以使用如下命令指定Yarn Session集群提交任务,-t 指定运行的模式
[root@node3 bin] ./flink run -t yarn-session -Dyarn.application.id=application_1671607810626_0001 -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
以上命令执行之后,可以查看对应的Yarn Session 对应的Flink集群,可以看到启动了2个Flink Job任务、启动1个TaskManager,分配了3个Slot。
停止Yarn Session集群可以在Yarn WebUI中找到对应的ApplicationId,执行如下命令关闭任务即可
[root@node3 bin]# yarn application -kill application_1671607810626_0001
Yarn Session 模式下提交任务首先创建Yarn Session 集群,创建该集群实际上就是启动了JobManager,启动JobManager同时会启动Dispatcher和ResourceManager,当客户端提交任务时,才会启动JobMaster以及根据提交的任务需求资源情况来动态分配启动TaskManager。
Yarn Session模式下提交任务流程如下:
Per-Job 模式目前只有yarn支持,Per-job模式在Flink1.15中已经被弃用,后续版本可能会完全剔除。Per-Job模式就是直接由客户端向Yarn中提交Flink作业,每个作业形成一个单独的Flink集群。
Flink On Yarn Per-Job模式提交命令如下:
[root@node5 bin] ./flink run -t yarn-per-job -d -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
参数 | 解释 |
---|---|
-t | –target,指定运行模式,可以跟在flink run 命令后,可以指定"remote", “local”, “kubernetes-session”, “yarn-per-job”(deprecated), “yarn-session”;也可以跟在 flink run-application 命令后,可以指定"kubernetes-application", “yarn-application”。 |
-c | –class,指定运行的class主类。 |
-d | –detached,任务提交后在后台独立运行,退出客户端,也可不指定。 |
-p | –parallelism,执行应用程序的并行度。 |
以上命令提交后,我们可以通过Yarn WebUI看到有2个Application 启动,对应2个Flink的集群,进入对应的Flink集群WebUI可以看到运行提交的Flink Application中的不同Job任务:
这说明Per-Job模式针对每个Flink Job会启动一个Flink集群。
可以使用yarn application -kill ApplicationId也可以执行如下命令:
#取消任务命令执行后对应的 Flink集群也会停止 :flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
[root@node5 bin] ./flink cancel -t yarn-per-job -Dyarn.application.id=application_1671610064817_0002 805542d84c9944480196ef73911d1b59
[root@node5 bin] ./flink cancel -t yarn-per-job -Dyarn.application.id=application_1671610064817_0003 56365ae67b8e93b1184d22fa567d7ddf
Flink基于Yarn Per-Job 提交任务时,在提交Flink Job作业的同时启动JobManager并启动Flink的集群,根据提交任务所需资源的情况会动态申请启动TaskManager给当前提交的job任务提供资源。
Yarn Per-Job模式下提交任务流程如下:
Yarn Per-job模式在客户端提交任务,如果在客户端提交大量的Flink任务会对客户端节点性能又非常大的压力,所以在Flink1.15中已经被弃用,后续版本可能会完全剔除,使用Yarn Application模式来替代。
Yarn Application 与Per-Job 模式类似,只是提交任务不需要客户端进行提交,直接由JobManager来进行任务提交,每个Flink Application对应一个Flink集群,如果该Flink Application有多个job任务,所有job任务共享该集群资源,TaskManager也是根据提交的Application所需资源情况动态进行申请。
#Yarn Application模式提交任务命令
[root@node5 bin] ./flink run-application -t yarn-application -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
参数 | 解释 |
---|---|
-t | –target,指定运行模式,可以跟在flink run 命令后,可以指定"remote", “local”, “kubernetes-session”, “yarn-per-job”(deprecated), “yarn-session”;也可以跟在 flink run-application 命令后,可以指定"kubernetes-application", “yarn-application”。 |
-c | –class,指定运行的class主类。 |
-d | –detached,任务提交后在后台独立运行,退出客户端,也可不指定。 |
-p | –parallelism,执行应用程序的并行度。 |
#查看Flink 集群中的Job作业:flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
[root@node3 bin] flink list -t yarn-application -Dyarn.application.id=application_1671610064817_0004
------------------ Running/Restarting Jobs -------------------
108a7b91cf6b797d4b61a81156cd4863 : first job (RUNNING)
5adacb416f99852408224234d9027cc7 : second job (RUNNING)
--------------------------------------------------------------
#取消Flink集群中的Job作业:flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
[root@node3 bin] flink cancel -t yarn-application -Dyarn.application.id=application_1671610064817_0004 108a7b91cf6b797d4b61a81156cd4863
#停止集群,当取消Flink集群中所有任务后,Flink集群停止,也可以使用yarn application -kill ApplicationID 停止集群
[root@node3 bin] yarn application -kill application_1671610064817_0004
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。