赞
踩
Flink on Yarn的HA高可用模式,首先依赖于Yarn自身的高可用机制(ResourceManager高可用),并通过Yarn对JobManager进行管理,当JobManager失效时,Yarn将重新启动JobManager。其次Flink Job在恢复时,需要依赖Checkpoint进行恢复,而Checkpoint的快照依赖于远端的存储:HDFS,所以HDFS也必须是高可用,同时JobManager的元数据信息也依赖于HDFS的高可用(namenode的高可用,和多副本机制),再者JobManager元数据的指针信息要依赖于Zookeeper的高可用。
这里采用二进制包的部署方式。另外还有parcel包的方式
kerberos环境下parcel方式部署flink1.15.3 基于CDH6.3.2 Flink on Yarn_Mumunu-的博客-CSDN博客
相对于 Standalone 模式,在Yarn 模式下有以下几点好处:
1.资源按需使用,提高集群的资源利用率;
2.任务有优先级,根据优先级运行作业;
3.基于 Yarn 调度系统,能够自动化地处理各个角色的 Failover:
JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控;
如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器;
如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager。
2.1 Application
Application模式:简答的说就是直接run job,每次提交的任务Yarn都会分配一个JobManager,执行完之后整个资源会释放,包括JobManager和TaskManager。
Application模式适合比较大的任务、执行时间比较长的任务。
2.2 Session
Session模式:在Session模式中, Dispatcher 和 ResourceManager 是可以复用的;当执行完Job之后JobManager并不会释放,Session 模式也称为多线程模式,其特点是资源会一直存在不会释放。使用时先启动yarn-session,然后再提交job,每次提交job,也都会分配一个JobManager。
Session模式适合比较小的任务、执行时间比较短的任务。该模式不用频繁的申请资源和释放资源。
所以一般生产情况下我们都会选取 on Yarn 部署Application方式运行
废话不多说开始部署
官网下载安装包
解压后进入目录
打开conf/flink-conf.yaml
修改或者可能修改的都在下面写了
Flink on yarn将会覆盖掉几个参数:
jobmanager.rpc.address因为jobmanager的在集群的运行位置并不是事先确定的,它就是am的地址;
taskmanager.tmp.dirs使用yarn给定的临时目录;
parallelism.default也会被覆盖掉,如果在命令行里指定了slot数。
提前创建flink在hadoop上的逻辑数据目录
- jobmanager.rpc.address: 0.0.0.0 #感觉on yarn改这个没什么用处,随便改改
-
- jobmanager.bind-host: 0.0.0.0 #感觉on yarn改这个没什么用处,随便改改
-
- taskmanager.bind-host: 0.0.0.0 #感觉on yarn改这个没什么用处,随便改改
-
- taskmanager.host: 0.0.0.0 #感觉on yarn改这个没什么用处,随便改改
-
- high-availability.storageDir: hdfs:///flink/ha/
-
- state.checkpoints.dir: hdfs://nameservice1/flink-checkpoints
-
- state.savepoints.dir: hdfs://nameservice1/flink-savepoints
-
- rest.address: 0.0.0.0
-
- rest.bind-address: 0.0.0.0
-
-
- #web.submit.enable: false #允许web提交任务 按需 我不需要
-
- #web.cancel.enable: false #允许web取消任务 按需 我不需要
-
- 还有一些针对kafka zookeeoer的kerberos配置 我这边用不着 大同小异。另外这个配置的用处是提交任务之前不用kinit ,我这已经习惯了kinit 所以也用不着 都是字面意思很好配
- # security.kerberos.login.use-ticket-cache: true
- # security.kerberos.login.keytab: /path/to/kerberos/keytab
- # security.kerberos.login.principal: flink-user
-
- jobmanager.archive.fs.dir: hdfs:///flink/completed-jobs/
-
- historyserver.web.address: 0.0.0.0
-
- historyserver.archive.fs.dir: hdfs:///flink/completed-jobs/
-
-
- 添加一行
- classloader.check-leaked-classloader: false
-
然后就可以运行命令测试了
kinit 你的kerberos用户 如果不是hdfs用户的话需要在hdfs上的/user配置好权限,因为会在/user/{username}/.flink/ 下输出临时文件
命令行设置hadoop环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
我配置了flink环境变量 如果你没配置 那就bin/flink ,在flink安装目录下运行 使用官方example 运行看看
flink run -m yarn-cluster ./examples/batch/WordCount.jar
没报错 出来一堆
这种 就是安装成功了
可以在yarn的界面和cdh的界面查到flink的任务
如果测试flink session模式,或者运行sql-client
必须首先启动一个session任务
yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm test1 -d
-tm 表示每个TaskManager的内存大小
-s 表示每个TaskManager的slots数量
-d 表示以后台程序方式运行
启动了一个名叫test1的flink 常驻集群
一些参数如下
- -n,--container <arg> 表示分配容器的数量(也就是 TaskManager 的数量)。
- -D <arg> 动态属性。
- -d,--detached 在后台独立运行。
- -jm,--jobManagerMemory <arg>:设置 JobManager 的内存,单位是 MB。
- -nm,--name:在 YARN 上为一个自定义的应用设置一个名字。
- -q,--query:显示 YARN 中可用的资源(内存、cpu 核数)。
- -qu,--queue <arg>:指定 YARN 队列。
- -s,--slots <arg>:每个 TaskManager 使用的 Slot 数量。
- -tm,--taskManagerMemory <arg>:每个 TaskManager 的内存,单位是 MB。
- -z,--zookeeperNamespace <arg>:针对 HA 模式在 ZooKeeper 上创建 NameSpace。
- -id,--applicationId <yarnAppId>:指定 YARN 集群上的任务 ID,附着到一个后台独立运行的 yarn session 中
然后提交job
flink run ./example/batch/WordCount.jar
这样就完成了两种方式的测试
别忘了把包和profile里配置的环境变量 分发到所有节点。便于在任何节点提交任务
最后启动一下histrory web ui 如果是session 这种常驻集群的话会输出一个8173端口的web ui 会提示在页面输出的最后
bin/historyserver.sh start
访问ip:端口即可 具体什么端口看你的配置
查看Flink Yarn Application
yarn application -list
停止Flink Yarn Application
yarn application -kill appID
如果要用 flink sql-client的话必须启动一个常驻集群。flink sql 依赖一个正在运行的flink集群 standalone 和yarn session 都可以
- sql-client.sh
- 建表
- 跑select等语句即可
flink web界面,首先运行一个flink实时程序,然后在yarn里面查看应用程序,点击进去
当启动sesssion之后 提交任务必须指定类型 不然就直接提交到session里去了
- ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
- 想指定id的话可以用list参数
- ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
- 用list就必须指定 不然报错
- No cluster id was specified. Please specify a cluster to which you would like to connect
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。