赞
踩
1.4.Flink集群安装部署standalone+yarn
1.4.1.Standalone集群模式
1.4.2.Flink-Standalone集群重要参数详解
1.4.3.集群节点重启及扩容
1.4.3.1.启动jobmanager
1.4.3.2.启动taskmanger
1.4.3.3.Flink standalone集群中job的容错
1.4.4.Flink on Yarn
1.4.4.1.原理介绍
1.4.4.2.FLINK on yarn集群部署
1.4.5.Flink on Yarn的两种运行方式
1.4.5.1.第一种[yarn-session.sh(开辟资源)+flink run(提交任务)]
1.4.5.2.第二种[flink run -m yarn-cluster(开辟资源 + 提交任务)]
1.4.5.3…/bin/yarn-session.sh命令分析
1.4.5.4./bin/flink run命令分析
1.4.5.5.Flink在yarn上的分布
1.4.5.6.Flink on yarn内部实现
Standalone集群架构展示:Client客户端提交任务给JobManager,JobManager负责Flink集群计算资源管理,并分发任务给TaskManager执行,TaskManager定期向JobManager汇报状态。
依赖环境
Jdk1.8及以上[配置JAVA_HOME环境变量]
ssh免密登录[集群内节点之间免密登录]
集群规划
master(JobManager) + slave/worker(TaskManager)
hadoop4(master)
hadoop5(salve)
hadoop6(salve)
集群安装
A:修改conf/flink-conf.yaml
jobmanager.rpc.address: hadoop4
taskmanager.numberOfTaskSlots: 32 (由于服务器上的CPU核数是32核,所以,此处写成32)
B:修改conf/masters文件内容:
hadoop4:8081
C:修改conf/workers (这里将三台机器都作为worker节点使用)
hadoop4
hadoop5
hadoop6
D:拷贝到其它节点
cd /home/admin/installed/flink-1.11.1 (在hadoop4机器上)
scp -r * root@hadoop5:$PWD
scp -r * root@hadoop6:$PWD
E:在hadoop4节点启动
[root@hadoop4 flink-1.11.1]# pwd
/home/admin/installed/flink-1.11.1
[root@hadoop4 flink-1.11.1]# bin/start-cluster.sh
F:访问集群:http://xxx.xxx.xxxx.xxx:8081/#/overview
jobmanager.memory.process.size: 1600m The total process memory size for the JobManager.
taskmanager.memory.process.size: 1728m The total process memory size for the TaskManager
taskmanager.numberOfTaskSlots: 20 每台机器上可用CPU数量
parallelism.default: 1 The parallelism used for programs that did not specify and other parallelism
slot和parallelism总结
1.slot是静态的概念,是指taskmanager具有的并发执行能力。
2.parallelism是动态的概念,是指程序运行时实际使用的并发能力。
3.设置合适的parallelism能提高运算效率,太多了和太少了都不行。
如果集群中的jobmanager进程挂了,执行下面命令启动。
bin/jobmanager.sh start
bin/jobmanager.sh stop
添加新的taskmanager节点或者重启taskmanager节点
bin/taskmanager.sh start
bin/taskmanager.sh stop
jobmanager挂掉
正在执行的任务会失败
存在单点故障,(Flink支持HA)
taskmanager挂掉
如果有多余的taskmanger节点,flink会自动把任务调度到其它节点执行。
FLINK on yarn模式的原理是依靠YARN来调度FLINK任务,目前在企业中使用较多。这种模式的好处是可以充分利用集群资源,提高集群机器的利用率,并且只需要1套Hadoop集群,就可以执行MapReduce、Spark和FLINK任务,操作非常方便,运维方面也很轻松。
1)当启动一个新的 Flink YARN Client 会话时,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传 Flink 配置和 JAR 文件到 HDFS。。
2)客 户 端 请 求 一个 YARN 容 器 启动 ApplicationMaster 。 JobManager 和ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM 就能够知道JobManager 的地址,它会为 TaskManager 生成一个新的 Flink 配置文件(这样它才能连上 JobManager),该文件也同样会被上传到 HDFS。另外,AM 容器还提供了 Flink 的Web 界面服务。Flink 用来提供服务的端口是由用户和应用程序 ID 作为偏移配置的,这使得用户能够并行执行多个 YARN 会话。
3)之后,AM 开始为 Flink 的 TaskManager 分配容器(Container),从 HDFS 下载 JAR 文件和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了。
Flink n on n Yarn 模式在使用的时候又可以分为两Session-Cluster和Per-Job-Cluster
Session-Cluster
这种模式是在 YARN 中提前初始化一个 Flink 集群(称为 Flinkyarn-session),开辟指定的资源,以后的 Flink 任务都提交到这里。这个 Flink 集群会常驻在 YARN 集群中,除非手工停止。这种方式创建的 Flink 集群会独占资源,不管有没有 Flink 任务在执行,YARN 上面的其他任务都无法使用这些资源。
Per-Job-Cluster
这种模式,每次提交 Flink 任务都会创建一个新的 Flink 集群,每个 Flink 任务之间相互独立、互不影响,管理方便。任务执行完成之后创建的 Flink集群也会消失,不会额外占用资源,按需使用,这使资源利用率达到最大,在工作中推荐使用这种模式。
依赖环境
本次部署hadoop3.1.0版本
Hdfs & yarn
Flink on Yarn的两种使用方式
第一种: 在Yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在Yarn集群中,除非手工停止。
第二中(推荐): 每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
启动一个一直运行的flink集群
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]
附着到一个已存在的flink yarn session
./bin/yarn-session.sh -id application_1463870264508_0029
执行任务
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt
停止任务 【web界面或者命令行执行cancel命令】
关于命令含义:
[root@hadoop4 flink-1.11.1]# ./bin/yarn-session.sh --help Usage: 必选: -n,--container <arg> 表示分配容器的数量(也就是 TaskManager 的数量) 可选 -at,--applicationType <arg> Set a custom application type for the application on YARN -D <property=value> use value for given property (动态属性) -d,--detached If present, runs the job in detached mode (独立运行) -h,--help Help for the Yarn session CLI. -id,--applicationId <arg> Attach to running YARN session (附着到一个已存在的flink yarn session) -j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) (JobManager容器的内存大小) -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -nl,--nodeLabel <arg> Specify YARN node label for the YARN application -nm,--name <arg> Set a custom name for the application on YARN (在YARN上为一个自定义的应用设置一个名字) -q,--query Display available YARN resources (memory, cores) 显示yarn中可用的资源 (内存, cpu核数) -qu,--queue <arg> Specify YARN queue. 指定YARN队列 -s,--slots <arg> taskmanager分配多少个slots(处理进程)。建议设置为每个机器的CPU核数 -t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) 每个TaskManager容器的内存 [in MB] -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode 针对HA模式在zookeeper上创建NameSpace
启动集群,执行任务
$FLINK_HOME/bin/flink run -d -m yarn-cluster \
-p 1 \
-yjm 1024m \
-ytm 1024m \
-ynm IssuePassComplete \
-c com.tianque.issue.flink.handler.IssuePassCompleteFlinkHandlerByCustomRedisSink \
/home/admin/installed/flink-jars/IssuePassCompleteFlinkHandler.jar \
--port 38092
注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败
用法:
必选
-n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量)
可选
-D <arg> 动态属性
-d,--detached 独立运行
-jm,--jobManagerMemory <arg> JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列.
-s,--slots <arg> 每个TaskManager使用的slots数量
-tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
-id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中
run [OPTIONS] <jar-file> <arguments> "run" 操作参数: -c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定 -m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。 默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】: ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 连接指定host和port的jobmanager: ./bin/flink run -m hadoop100:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 启动一个新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀 例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
再如以下运行案例:
如果一直报akka,AskTimeoutException错误,可以尝试添加akka.ask.timeout=120000s, 依然显示该错误。
注意这里数字和时间单位之间,必须有个空格。
flink run \
-m yarn-cluster \
-ynm applicaiton-name \
-yqu queue \
-p 4000 \
-ys 4 \
-yjm 10g \
-ytm 10g \
-yD akka.client.timeout="120000 s" \
-yD akka.ask.timeout="120000 s" \
-yD web.timeout=120000 \
-yD rest.retry.max-attempts=100 \
-c your-class \
your-jar your-input your-output
Flink on Yarn
ResourceManager
NodeManager
AppMaster(jobmanager和它运行在一个Container中)
Container(taskmanager运行在上面)
使用on-yarn的好处
提高集群机器的利用率
一套集群,可以执行MR任务,spark任务,flink任务等…
步骤如下:
1、上传flink jar包和配置
2、申请资源和请求AppMaster容器
3、分配AppMaster容器
分配worker
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。