当前位置:   article > 正文

Flink On Yarn_flink1.14 on yarn 安装

flink1.14 on yarn 安装

序言

Flink的Standalone和on Yarn模式都属于集群运行模式,但是有很大的不同,在实际环境中,使用Flink on Yarn模式者居多。

另Flink On Yarn要做的事情不多, 总的来说就是使用Flink包的自带命令提交Jar包到Yarn就行了cuiyaonan200)@163.com

Flink要运行在Yarn上则需要使用Flink的命令来在Yarn申请资源以及提交任务,所以就需要使用到相关的Yarn的命令.则需要下载相关的jar包到 /soft/flink/flink-1.14.4/lib 目录下.同理如果要使用Kakfa也要去下载相关的jar放置到这里cuiyaonan2000@163.com

关联的jar在网址:Downloads | Apache Flink   中可以找到.比如Flink支持的Hadoop的包如下所示(我就不信邪 使用2.8.3的版本 发布到Hadoop3.3.1的服务上,不想被Flink限制版本):

优点

  • Yarn的资源可以按需使用,提高集群的资源利用率.能会有很多的集群实例包括MapReduce、Spark、Flink等等,那么如果它们全基于on Yarn就可以完成资源分配,减少单个实例集群的维护,提高集群的利用率。
  • Yarn的任务有优先级,根据优先级运行作业
  • 基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)
  • yarn当中 jobManger 宕机挂掉了之后, appmaster会重新跟RM申请资源 去运行,间接的实现了高可用.同时Flink的checkpoint也可以存放在Hdfs上,这样也是实现了高可用性


 

模式

Flink ON YARN有两种使用方式:

  • 长占Yarn内存模式:  在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的Flink Jon都在这个Flink yarn-session中,也就是说不管提交多少个job,这些job都会共用开始时在yarn中申请的资源。这个Flink集群会常驻在Yarn集群中,除非手动停止。
  • 短占Yarn内存模式: 在Yarn中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。 

长占Yarn内存模式(Session模式)

申请资源

首先需要将Yarn关联的Jar 放置到flink安装包的/lib目录下

首先需要再Yarn中申请资源,用于运行Flink集群(这个申请资源的命令是Flink包的bin目录下的cuiyaonan2000@163.com)

命令(这里里面有个Flink的坑就是 Flink没有检查创建参数是否合法,在不同的Flink版本中有的参数被取消了,如果有了错误的参数,则其它正确的启动参数不会生效cuiyaonan2000@163.com):

  1. [root@localhost bin]# ./yarn-session.sh -nm cuiyaonan_name -jm 1024m -tm 1024m -s 1 -d
  2. #很容易理解上面的参数
  3. 参数解释:
  4. -n 2 -n 表示申请2个容器,这里指的就是多少个taskmanager ---被取消了在1.14.4
  5. -s 表示每个TaskManager的slots数量
  6. -jm 1024 表示jobmanager 1024M内存
  7. -tm 1024表示taskmanager 1024M内存
  8. -d 任务后台运行
  9. -nm,--name YARN上为一个自定义的应用设置一个名字
  10. -q,--query 显示yarn中可用的资源 (内存, cpu核数)
  11. -z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
  12. -id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中

运行日志如下所示: 虽然使用了-d 后台运行,但是不能生效,因为参数中有个错误参数,会造成其它正确参数全都失效cuiyaonan2000@163.com

 如上FlinkOnYarn 其实就是在Yarn管理的Node里创建JobManager.可以通过ip:38495 这个地址访问访问Flink的管理界面cuiyaonan2000@163.com

也可以在Yarn的管理界面中直接访问

 展示结果如下所示:

另在1.14.4 版本中我们不需要使用-n来设置TaskManager的数量,同时可以看到启动的JobManager中的TaskManager的数量为空.只有我们在提交了一个任务的时候,才会动态的去Yarn中申请资源创建TaskManager 资源cuiyaonan2000@163.com

释放资源

关闭该Yarn-Session使用命令:

 yarn application -kill application的id

id位置在管理界面中可以查询到,同时也可以在日志中看到

同时Flink也会在HDFS上创建JobManager先关的数据信息

提交任务

 bin/flink run ./cuiyaonan2000@163.com.jar

更多信息的配置

比如jobmanager的内存大小,taskManager的内存大小 ,taskmanager的数量,算子的并行度设置可以修改/soft/flink/flink-1.14.4/conf/flink-conf.yaml 中的默认配置.

短占Yarn内存模式(Per-Job模式)

这个就简单了,直接使用flink  run 命令就可以.

./flink run  -m yarn-cluster -yjm 1024 -ytm 1024 -yd ./spring_flink_tableapi_sqlapi-0.0.1-SNAPSHOT.jar 

因为-m 可以选择是在on yarn上执行,也可以是 ip:port 表示jobmanager地址 2中方式,故此分了2中类型的配置信息cuiyaonan2000@163.com

On Yarn

特点显然就是加了一个y

-m 是yarn-cluster模式
-yd,–yarndetached : 后台
-yjm,–yarnjobManager : jobmanager的内存
-ytm,–yarntaskManager : taskmanager的内存
-yn,–yarncontainer : TaskManager的个数
-yid,–yarnapplicationId : job依附的applicationId
-ynm,–yarnname : application的名称
-ys,–yarnslots : 分配的slots个数
 

On JobManager

-c,–class : 需要指定的main方法的类,如果打的包指定了mainclass 就不需要

-C,–classpath : 向每个用户代码添加url,他是通过UrlClassLoader加载。url需要指定文件的schema如(file://)

-d,–detached : 在后台运行

-p,–parallelism : job需要指定env的并行度,这个一般都需要设置。

-q,–sysoutLogging : 禁止logging输出作为标准输出。

-s,–fromSavepoint : 基于savepoint保存下来的路径,进行恢复。

-sae,–shutdownOnAttachedExit : 如果是前台的方式提交,当客户端中断,集群执行的job任务也会shutdown。

Application Model(Per-Job模式 升级版)

具体优化的地方有2方面

  1. Session模式,PreJob模式都是使用命令提交jar包到yarn服务器上.但是不是我们自以为的物理上传,而是在当前上传的客户机器运行Jar程序,然后生成了Flink可以运行的程序上传到Yarn.Application的第一个优化就是,让这部分的程序运行放在了Jobmanager上,并且跟PerJob一样,提交一次就是一个独立的Flink集群cuiyaonan2000@163.com
  2. 第二个就是Flink的应用所需的Jar,我们程序所需的Jar可以放在HDFS,是上传服务器的IO负载减轻.同时也可以重复利用

Upload File

这里有2中类型的jar上传可以共享

  1. Flink程序运行相关的Jar,同理适用于Session和PreJob模式,避免每次submit任务都会上传jar,且重复占用空间cuiyaonan2000@163.com
  2. 我们要提交的任务Jar

使用命令创建/flink/lib和/flink/plugins 目录同时对应把Flink安装包队形目录下的文件上传,注意plugins这个文件夹必须要命名成这个cuiyaonan2000@163.com

  1. [root@cuiyaonan2000@163.com bin]# hdfs dfs -mkdir /flink
  2. [root@cuiyaonan2000@163.com bin]# hdfs dfs -mkdir /flink/lib
  3. [root@cuiyaonan2000@163.com bin]# hdfs dfs -mkdir /flink/plugin
  4. [root@cuiyaonan2000@163.com bin]# hdfs dfs -put ./* /flink/lib
  5. [root@cuiyaonan2000@163.com bin]# hdfs dfs -put ./* /flink/plugin
  6. [root@cuiyaonan2000@163.com bin]# hdfs dfs -mv /flink/plugin /flink/plugins

Launch

application 模式使用 bin/flink run-application 提交作业;通过 -t 指定部署环境,目前 application 模式支持部署在 yarn 上(-t yarn-application)k8s 上(-t kubernetes-application);并支持通过 -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。
通过 bin/flink run-application -h 可以看到 -D/-t 的详细说明:(-e 已经被废弃,可以忽略)

这个命令参数比较少,主要是通过通过-D来设置Jobmanager和TaskManager的服务参数

  1. [root@localhost bin]# ./flink run-application -h
  2. Action "run-application" runs an application in Application Mode.
  3. Syntax: run-application [OPTIONS] <jar-file> <arguments>
  4. Options for Generic CLI mode:
  5. -D <property=value> Allows specifying multiple generic configuration
  6. options. The available options can be found at
  7. https://nightlies.apache.org/flink/flink-docs-stable/
  8. ops/config.html
  9. -e,--executor <arg> DEPRECATED: Please use the -t option instead which is
  10. also available with the "Application Mode".
  11. The name of the executor to be used for executing the
  12. given job, which is equivalent to the
  13. "execution.target" config option. The currently
  14. available executors are: "remote", "local",
  15. "kubernetes-session", "yarn-per-job", "yarn-session".
  16. -t,--target <arg> The deployment target for the given application,
  17. which is equivalent to the "execution.target" config
  18. option. For the "run" action the currently available
  19. targets are: "remote", "local", "kubernetes-session",
  20. "yarn-per-job", "yarn-session". For the
  21. "run-application" action the currently available
  22. targets are: "kubernetes-application",
  23. "yarn-application".

启动命令

[root@localhost bin]# ./flink run-application -t yarn-application -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=4096m -Dyarn.application.name="MyFlinkWordCount" -Dtaskmanager.numberOfTaskSlots=3 -Dyarn.provided.lib.dirs="hdfs://localhost:9000/flink/lib;hdfs://localhost:9000/flink/plugins" ./spring_flink_tableapi_sqlapi-0.0.1-SNAPSHOT.jar

补1

指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D+通用配置代替客户端命令参数(比如 -p)。

补2

同时我们的业务jar也可以上传到hdfs上,

然后直接使用hdfs://ip:端口/路径 来访问

验证

如上如果上传了共享的Flink的lib和plugins,则创建的任务不会有如下红框内的内容

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/421525
推荐阅读
相关标签
  

闽ICP备14008679号