当前位置:   article > 正文

04_Flink-HA高可用、Standalone集群模式、Flink-Standalone集群重要参数详解、集群节点重启及扩容、启动组件、Flink on Yarn、启动命令等_flink 扩容步骤

flink 扩容步骤

1.4.Flink集群安装部署standalone+yarn
1.4.1.Standalone集群模式
1.4.2.Flink-Standalone集群重要参数详解
1.4.3.集群节点重启及扩容
1.4.3.1.启动jobmanager
1.4.3.2.启动taskmanger
1.4.3.3.Flink standalone集群中job的容错
1.4.4.Flink on Yarn
1.4.4.1.原理介绍
1.4.4.2.FLINK on yarn集群部署
1.4.5.Flink on Yarn的两种运行方式
1.4.5.1.第一种[yarn-session.sh(开辟资源)+flink run(提交任务)]
1.4.5.2.第二种[flink run -m yarn-cluster(开辟资源 + 提交任务)]
1.4.5.3…/bin/yarn-session.sh命令分析
1.4.5.4./bin/flink run命令分析
1.4.5.5.Flink在yarn上的分布
1.4.5.6.Flink on yarn内部实现

1.4.Flink集群安装部署standalone+yarn

1.4.1.Standalone集群模式

Standalone集群架构展示:Client客户端提交任务给JobManager,JobManager负责Flink集群计算资源管理,并分发任务给TaskManager执行,TaskManager定期向JobManager汇报状态。
在这里插入图片描述
依赖环境
Jdk1.8及以上[配置JAVA_HOME环境变量]
ssh免密登录[集群内节点之间免密登录]
集群规划
master(JobManager) + slave/worker(TaskManager)
hadoop4(master)
hadoop5(salve)
hadoop6(salve)
集群安装
A:修改conf/flink-conf.yaml

jobmanager.rpc.address: hadoop4
taskmanager.numberOfTaskSlots: 32    (由于服务器上的CPU核数是32核,所以,此处写成32)
  • 1
  • 2

B:修改conf/masters文件内容:

hadoop4:8081
  • 1

C:修改conf/workers (这里将三台机器都作为worker节点使用)
hadoop4
hadoop5
hadoop6
D:拷贝到其它节点

cd /home/admin/installed/flink-1.11.1        (在hadoop4机器上)
scp -r * root@hadoop5:$PWD
scp -r * root@hadoop6:$PWD
  • 1
  • 2
  • 3

E:在hadoop4节点启动

[root@hadoop4 flink-1.11.1]# pwd
/home/admin/installed/flink-1.11.1
[root@hadoop4 flink-1.11.1]# bin/start-cluster.sh
  • 1
  • 2
  • 3

F:访问集群:http://xxx.xxx.xxxx.xxx:8081/#/overview
在这里插入图片描述

1.4.2.Flink-Standalone集群重要参数详解

jobmanager.memory.process.size: 1600m         The total process memory size for the JobManager.
taskmanager.memory.process.size: 1728m        The total process memory size for the TaskManager
taskmanager.numberOfTaskSlots: 20             每台机器上可用CPU数量
parallelism.default: 1                          The parallelism used for programs that did not specify and other parallelism
  • 1
  • 2
  • 3
  • 4

slot和parallelism总结
1.slot是静态的概念,是指taskmanager具有的并发执行能力。
2.parallelism是动态的概念,是指程序运行时实际使用的并发能力。
3.设置合适的parallelism能提高运算效率,太多了和太少了都不行。

1.4.3.集群节点重启及扩容

1.4.3.1.启动jobmanager

如果集群中的jobmanager进程挂了,执行下面命令启动。

bin/jobmanager.sh start
bin/jobmanager.sh stop
  • 1
  • 2
1.4.3.2.启动taskmanger

添加新的taskmanager节点或者重启taskmanager节点

bin/taskmanager.sh start
bin/taskmanager.sh stop
  • 1
  • 2
1.4.3.3.Flink standalone集群中job的容错

jobmanager挂掉
正在执行的任务会失败
存在单点故障,(Flink支持HA)
taskmanager挂掉
如果有多余的taskmanger节点,flink会自动把任务调度到其它节点执行。

1.4.4.Flink on Yarn

FLINK on yarn模式的原理是依靠YARN来调度FLINK任务,目前在企业中使用较多。这种模式的好处是可以充分利用集群资源,提高集群机器的利用率,并且只需要1套Hadoop集群,就可以执行MapReduce、Spark和FLINK任务,操作非常方便,运维方面也很轻松。

1.4.4.1.原理介绍

在这里插入图片描述

1)当启动一个新的 Flink YARN Client 会话时,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传 Flink 配置和 JAR 文件到 HDFS。。
2)客 户 端 请 求 一个 YARN 容 器 启动 ApplicationMaster 。 JobManager 和ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM 就能够知道JobManager 的地址,它会为 TaskManager 生成一个新的 Flink 配置文件(这样它才能连上 JobManager),该文件也同样会被上传到 HDFS。另外,AM 容器还提供了 Flink 的Web 界面服务。Flink 用来提供服务的端口是由用户和应用程序 ID 作为偏移配置的,这使得用户能够并行执行多个 YARN 会话。
3)之后,AM 开始为 Flink 的 TaskManager 分配容器(Container),从 HDFS 下载 JAR 文件和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了。
Flink n on n Yarn 模式在使用的时候又可以分为两Session-Cluster和Per-Job-Cluster
Session-Cluster
这种模式是在 YARN 中提前初始化一个 Flink 集群(称为 Flinkyarn-session),开辟指定的资源,以后的 Flink 任务都提交到这里。这个 Flink 集群会常驻在 YARN 集群中,除非手工停止。这种方式创建的 Flink 集群会独占资源,不管有没有 Flink 任务在执行,YARN 上面的其他任务都无法使用这些资源。
在这里插入图片描述
Per-Job-Cluster
这种模式,每次提交 Flink 任务都会创建一个新的 Flink 集群,每个 Flink 任务之间相互独立、互不影响,管理方便。任务执行完成之后创建的 Flink集群也会消失,不会额外占用资源,按需使用,这使资源利用率达到最大,在工作中推荐使用这种模式。

1.4.4.2.FLINK on yarn集群部署

依赖环境
本次部署hadoop3.1.0版本
Hdfs & yarn
Flink on Yarn的两种使用方式
第一种: 在Yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在Yarn集群中,除非手工停止。
第二中(推荐): 每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
在这里插入图片描述

1.4.5.Flink on Yarn的两种运行方式

1.4.5.1.第一种[yarn-session.sh(开辟资源)+flink run(提交任务)]

启动一个一直运行的flink集群

./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
  • 1

附着到一个已存在的flink yarn session

./bin/yarn-session.sh -id application_1463870264508_0029
  • 1

执行任务

./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt
  • 1

停止任务 【web界面或者命令行执行cancel命令】

关于命令含义:

[root@hadoop4 flink-1.11.1]# ./bin/yarn-session.sh --help
Usage:
   必选:
     -n,--container <arg>           表示分配容器的数量(也就是 TaskManager 的数量)
   可选
     -at,--applicationType <arg>     Set a custom application type for the application on YARN
     -D <property=value>             use value for given property (动态属性)
     -d,--detached                   If present, runs the job in detached mode  (独立运行)
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session  (附着到一个已存在的flink yarn session)
     -j,--jar <arg>                  Path to Flink jar file   
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB) (JobManager容器的内存大小)
     -m,--jobmanager <arg>           Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN     (在YARN上为一个自定义的应用设置一个名字)
     -q,--query                      Display available YARN resources (memory, cores)   显示yarn中可用的资源 (内存, cpu核数)
     -qu,--queue <arg>               Specify YARN queue.                           指定YARN队列
     -s,--slots <arg>                taskmanager分配多少个slots(处理进程)。建议设置为每个机器的CPU核数
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)     
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)    每个TaskManager容器的内存 [in MB]
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode    针对HA模式在zookeeper上创建NameSpace
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
1.4.5.2.第二种[flink run -m yarn-cluster(开辟资源 + 提交任务)]

启动集群,执行任务

$FLINK_HOME/bin/flink run -d -m yarn-cluster \
-p 1 \
-yjm 1024m \
-ytm 1024m \
-ynm IssuePassComplete \
-c com.tianque.issue.flink.handler.IssuePassCompleteFlinkHandlerByCustomRedisSink \
/home/admin/installed/flink-jars/IssuePassCompleteFlinkHandler.jar \
--port 38092
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败

1.4.5.3…/bin/yarn-session.sh命令分析
用法:  
   必选  
     -n,--container <arg>                             分配多少个yarn容器 (=taskmanager的数量)  
   可选  
     -D <arg>                                       动态属性  
     -d,--detached                                   独立运行  
     -jm,--jobManagerMemory <arg>                    JobManager的内存 [in MB]  
     -nm,--name                                     在YARN上为一个自定义的应用设置一个名字  
     -q,--query                                      显示yarn中可用的资源 (内存, cpu核数)  
     -qu,--queue <arg>                                指定YARN队列.  
     -s,--slots <arg>                                   每个TaskManager使用的slots数量  
     -tm,--taskManagerMemory <arg>                    每个TaskManager的内存 [in MB]  
     -z,--zookeeperNamespace <arg>                     针对HA模式在zookeeper上创建NameSpace 
     -id,--applicationId <yarnAppId>                      YARN集群上的任务id,附着到一个后台运行的yarn session中
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
1.4.5.4./bin/flink run命令分析
run [OPTIONS] <jar-file> <arguments>  
 "run" 操作参数:  
-c,--class <classname>  如果没有在jar包中指定入口类,则需要在这里通过这个参数指定  
-m,--jobmanager <host:port>  指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager  
-p,--parallelism <parallelism>   指定程序的并行度。可以覆盖配置文件中的默认值。

默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1

连接指定host和port的jobmanager:
./bin/flink run -m hadoop100:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1

启动一个新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

再如以下运行案例:
如果一直报akka,AskTimeoutException错误,可以尝试添加akka.ask.timeout=120000s, 依然显示该错误。
注意这里数字和时间单位之间,必须有个空格。

flink run \
  -m yarn-cluster \
  -ynm applicaiton-name \
  -yqu queue \
  -p 4000 \
  -ys 4 \
  -yjm 10g \
  -ytm 10g \
  -yD akka.client.timeout="120000 s" \
  -yD akka.ask.timeout="120000 s" \
  -yD web.timeout=120000 \
  -yD rest.retry.max-attempts=100 \
  -c your-class \
  your-jar your-input your-output
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
1.4.5.5.Flink在yarn上的分布

Flink on Yarn
ResourceManager
NodeManager
AppMaster(jobmanager和它运行在一个Container中)
Container(taskmanager运行在上面)
使用on-yarn的好处
提高集群机器的利用率
一套集群,可以执行MR任务,spark任务,flink任务等…

1.4.5.6.Flink on yarn内部实现

步骤如下:
1、上传flink jar包和配置
2、申请资源和请求AppMaster容器
3、分配AppMaster容器
分配worker

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/764766
推荐阅读
相关标签
  

闽ICP备14008679号