当前位置:   article > 正文

10.Flink集群安装操作

flink集群安装
  1. Flink集群安装操作

Flink支持多种安装模式。

  1. local(本地)——本地模式
  2. standalone——独立模式,Flink自带集群,开发测试环境使用
  3. standaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用
  4. yarn——计算资源统一由Hadoop YARN管理,生产环境测试

    1. [了解] - Standalone - 伪分布环境(开发测试)

和Local模式不同的是,Standalone模式中Flink的各个角色都是独立的进程

      1. 架构图

  1. Flink程序需要提交给JobClient
  2. JobClient将作业提交给JobManager
  3. JobManager负责协调资源分配和作业执行。 资源分配完成后,任务将提交给相应的TaskManager
  4. TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改。例如开始执行,正在进行或已完成。
  5. 作业执行完成后,结果将发送回客户端(JobClient)
      1. 环境准备
  6. jdk1.8及以上【配置JAVA_HOME环境变量】
  7. ssh免密码登录【集群内节点之间免密登录】
      1. 下载安装包

https://archive.apache.org/dist/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.12.tgz

      1. 服务器规划
  1. 服务器: node1(Master + Slave)
      1. 安装步骤
  2. 1) 上传flink到/export/software下, 并解压

cd /export/software/

rz 上传

解压:

tar -zxf flink-1.14.0-bin-scala_2.12.tgz -C /export/server/

为flink创建软连接

cd /export/server/

ln -s flink-1.14.0/ flink

  1. 2) 启动flink

cd /export/server/flink

bin/start-cluster.sh

启动后, 通过JPS查看进程信息:

                 

  1. 6) 访问flink的web界面

slot在flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分 配到slot来并行执行程序

  1. 7) 运行测试任务

cd /export/server/flink

bin/flink run /export/server/flink/examples/batch/WordCount.jar

         

  1. 8) 查看WEBUI显示

  1. 9) 如何查看执行日志

JobManager 和 TaskManager 的启动日志可以在 Flink binary 目录下的 log 子目录中找到

cd /export/server/flink/log

log 目录中以“flink-${user}-standalonesession-${id}-${hostname}”为前缀的文件对应的即是 JobManager 的输出,其中有三个文件:

  1. flink-${user}-standalonesession-${id}-${hostname}.log:代码中的日志输出
  2. flink-${user}-standalonesession-${id}-${hostname}.out:进程执行时的 stdout 输出
  3. flink-${user}-standalonesession-${id}-${hostname}-gc.log:JVM 的 GC 的日志

log 目录中以“flink-${user}-taskexecutor-${id}-${hostname}”为前缀的文件对应的是 TaskManager 的输出,也包括三个文件,和 JobManager 的输出一致。

    1.  [了解] - Standalone – 完全分布式集群环境(开发测试)
      1. 架构图

  1. client客户端提交任务给JobManager
  2. JobManager负责Flink集群计算资源管理,并分发任务给TaskManager执行
  3. TaskManager定期向JobManager汇报状态
  4. flink的TM就是运行在不同节点上的JVM进程(process),这个进程会拥有一定量的资源。比如内存,cpu,网络,磁盘等。flink将进程的内存进行了划分到多个slot中.图中有2个TaskManager,每个TM有2个slot的,每个slot占有1/2的内存。
      1. 集群规划
  5. 服务器: node1(Master + Slave)
  6. 服务器: node2(Slave)
  7. 服务器: node3(Slave)
      1. 安装步骤
        1. 修改flink-conf.yaml

cd /export/server/flink/conf

vim flink-conf.yaml

修改内容内容:

# jobManager 的IP地址

jobmanager.rpc.address: node1.itcast.cn

# JobManager 的端口号

jobmanager.rpc.port: 6123

# JobManager JVM heap 内存大小

jobmanager.memory.process.size: 1024m 

# TaskManager JVM heap 内存大小

taskmanager.memory.process.size: 1024m 

# 每个 TaskManager 提供的任务 slots 数量大小

taskmanager.numberOfTaskSlots: 2

# 程序默认并行计算的个数

parallelism.default: 1

slot和parallelism总结:

taskmanager.numberOfTaskSlots:2

每一个taskmanager中的分配2个TaskSlot,3个taskmanager一共有6个TaskSlot

parallelism.default:1 运行程序默认的并行度为1,6个TaskSlot只用了1个,有5个空闲

slot是静态的概念,是指taskmanager具有的最大并发执行能力

parallelism是动态的概念,是指程序运行时实际使用的并发能力

        1. 修改workers配置文件

cd /export/server/flink/conf

vim workers

添加以下内容:

node1.itcast.cn

node2.itcast.cn

node3.itcast.cn

        1. 修改环境变量

vim /etc/profile

添加:

export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

重新加载环境变量:

source /etc/profile

        1. 将flink支持hadoop的集成包导入

5) 将资料中的相关jar包全部上传至flink的lib目录下

cd /export/server/flink/lib

rz上传

        1. 分发其他两个节点并修改相关配置

cd /export/server

scp -r flink-1.14.0/ node2:$PWD

scp -r flink-1.14.0/ node3:$PWD

在node2和node3创建软链接

cd /export/server

ln -s flink-1.14.0/ flink

修改环境变量:

vim /etc/profile

添加内容:

export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

加载环境变量:

source /etc/profile

        1. 启动flink集群

cd /export/server/flink

bin/start-cluster.sh

Jps 查看各个节点进程:

        1. 测试

1) 首先启动 hadoop集群

start-all.sh

2) 接着在hdfs上创建目录:

hadoop fs -mkdir -p /test/input

3) 在linux上创建wordcount.txt文件:

cd ~

vim wordcount.txt

内容如下:

hadoop hello world hive

hive hadoop hello hadoop

hadoop hive hive

4) 将其上传至hdfs上:

hadoop fs -put /root/wordcount.txt /test/input

5) 执行flink程序:

cd /export/server/flink

bin/flink run /export/server/flink/examples/batch/WordCount.jar --input hdfs://node1:8020/test/input/wordcount.txt

6) 查看web浏览器:

http://node1:8081

      1. [说明]启动/停止flink集群
  1. 启动:./bin/start-cluster.sh  
  2. 停止:./bin/stop-cluster.sh
      1. [说明]Flink集群的重启或扩容
  3. 启动/停止jobmanager
    1. 启动:./bin/start-cluster.sh
    2. 停止:./bin/stop-cluster.sh
  4. 如果集群中的jobmanager进程挂了,执行下面命令启动
    1. bin/jobmanager.sh start
    2. bin/jobmanager.sh stop
  5. 添加新的taskmanager节点或者重启taskmanager节点
    1. bin/taskmanager.sh start
    2. bin/taskmanager.sh stop
    1. [了解] - Standalone – 完全分布式之高可用HA模式(生产可用)

从上述架构图中,可发现JobManager存在单点故障,一旦JobManager出现意外,整个集群无法工作。所以,为了确保集群的高可用,需要搭建Flink的HA。(如果是部署在YARN上,部署YARN的HA),我们这里演示如何搭建Standalone 模式HA。

      1. HA架构图

      1. 集群规划
  1. 服务器: node1(Master + Slave)
  2. 服务器: node2(Master + Slave)
  3. 服务器: node3(Slave)
      1. 前置条件(此操作再第二部分已经搞定)

需要将hadoop组件上传到Flink安装包的lib目录下,因为Flink1.8开始,安装包不再基于flink版本进行划分,因此需要手动下载hadoop组件,同时需要注意Hadoop版本号需要与开发环境版本保持一致,以生产环境使用Hadoop3为例

操作步骤

说明

1

下载hadoop的组件Jar包

https://mvnrepository.com/

2

将下载的jar文件拷贝到flink安装目录lib目录下(每个节点都需要拷贝

3

拷贝完成jar到每个节点以后需要重启flink集群

      1. 安装步骤
        1. 修改fink-conf.yaml

cd /export/server/flink/conf/

vim flink-conf.yaml

修改以下内容:

#开启HA,使用文件系统作为快照存储:126

state.backend: filesystem

#默认为none,用于指定checkpoint的data files和meta data存储的目录:131

state.checkpoints.dir: hdfs://node1.itcast.cn:8020/flink-checkpoints

#默认为none,用于指定savepoints的默认目录:135

state.savepoints.dir: hdfs://node1.itcast.cn:8020/flink-savepoints

#使用zookeeper搭建高可用:81

high-availability: zookeeper

# 存储JobManager的元数据到HDFS,用来恢复JobManager 所需的所有元数据 :90

high-availability.storageDir: hdfs://node1.itcast.cn:8020/flink/ha/

high-availability.zookeeper.quorum: node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181

分发到node2和node3

cd /export/server/flink/conf

scp flink-conf.yaml node2:$PWD

scp flink-conf.yaml node3:$PWD

        1. 修改node2的flink-conf.yaml

cd /export/server/flink/conf

vim flink-conf.yaml

修改:

jobmanager.rpc.address: node2.itcast.cn

        1. 修改master文件

cd /export/server/flink/conf

vim masters

修改为以下内容:

node1.itcast.cn:8081

node2.itcast.cn:8081

分发给node2和node3

cd /export/server/flink/conf

scp masters node2:$PWD

scp masters node3:$PWD

        1. 启动集群

1) 先启动zookeeper集群: 三个节点启动

cd /export/server/zookeeper/bin/

./zkServer.sh start

2) 启动hadoop集群

start-all.sh

3) 启动flink集群

cd /export/server/flink

./bin/start-cluster.sh

查看node1和node2的8081 webUI

注意事项

切记搭建HA,需要将第二个节点的 jobmanager.rpc.address 修改为node2.itcast.cn

    1. [理解] - yarn集群环境(生产推荐)

Local模式:通过一个JVM进程中,通过线程模拟出各个Flink角色来得到Flink环境

Standalone模式:各个角色是独立的进程存在

YARN模式:Flink的各个角色,均运行在多个YARN的容器内,其整体上是一个YARN的任务

flink on yarn的前提是:hdfs、yarn均启动

企业实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式原因如下:

  1. Yarn的资源可以按需使用,提高集群的资源利用率
  2. Yarn的任务有优先级,根据优先级运行作业
  3. 基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)
    1. JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
    2. 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
    3. 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager

      1. 准备工作
  1. jdk1.8及以上【配置JAVA_HOME环境变量】
  2. ssh免密码登录【集群内节点之间免密登录】
  3. 至少hadoop2.2
  4. hdfs & yarn均启动
      1. 集群规划
  5. 服务器: node1(Master + Slave)
  6. 服务器: node2(Slave)
  7. 服务器: node3(Slave)
      1. 修改hadoop的配置参数

操作步骤

说明

1

打开yarn配置页面(每台hadoop节点都需要修改

vim etc/hadoop/yarn-site.xml

添加

<property>

    <name>yarn.nodemanager.vmem-check-enabled</name>

    <value>false</value>

</property>

是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。

在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job

2

分发yarn-site.xml到其它服务器节点

scp yarn-site.xml node2:$PWD

scp yarn-site.xml node3:$PWD

3

启动HDFS、YARN集群

start-all.sh

      1. Flink on Yarn的运行机制

从图中可以看出,Yarn的客户端需要获取hadoop的配置信息,连接Yarn的ResourceManager。所以要有设置有 YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择hadoop_home的环境变量,都区成功将会尝试加载$HADOOP_HOME/etc/hadoop的配置文件。

  1. 当启动一个Flink Yarn会话时,客户端首先会检查本次请求的资源是否足够。资源足够将会上传包含HDFS配置信息和Flink的jar包到HDFS。
  2. 随后客户端会向Yarn发起请求,启动applicationMaster,随后NodeManager将会加载有配置信息和jar包,一旦完成,ApplicationMaster(AM)便启动。
  3. 当JobManager and AM 成功启动时,他们都属于同一个container,从而AM就能检索到JobManager的地址。此时会生成新的Flink配置信息以便TaskManagers能够连接到JobManager。同时,AM也提供Flink的WEB接口。用户可并行执行多个Flink会话。
  4. 随后,AM将会开始为分发从HDFS中下载的jar以及配置文件的container给TaskMangers.完成后Fink就完全启动并等待接收提交的job.
      1. Flink on Yarn的三种部署方式介绍
        1. Session模式

这种模式会预先在yarn或者或者k8s上启动一个flink集群,然后将任务提交到这个集群上,这种模式,集群中的任务使用相同的资源,如果某一个任务出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。

特点:需要事先申请资源,使用Flink中的yarn-session(yarn客户端),启动JobManager和TaskManger

优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率

缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源

应用场景:适合作业递交比较频繁的场景,小作业比较多的场景

        1. Per-Job模式

考虑到集群的资源隔离情况,一般生产上的任务都会选择per job模式,也就是每个任务启动一个flink集群,各个集群之间独立运行,互不影响,且每个集群可以设置独立的配置。

特点:每次递交作业都需要申请一次资源

优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源

缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间

应用场景:适合作业比较少的场景、大作业的场景

      1. application模式
        1. 背景

flink-1.11 引入了一种新的部署模式,即 Application 模式。目前,flink-1.11 已经可以支持基于 Yarn 和 Kubernetes 的 Application 模式。

        1. 优势

Session模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行。
Per-Job模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行。

通过以上两种模式的特点描述,可以看出,main方法都是在客户端执行,社区考虑到在客户端执行 main() 方法来获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。尤其在大量用户共享客户端时,问题更加突出。

此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

因此,社区提出新的部署方式 Application 模式解决该问题。

        1. 原理

Application 模式下,用户程序的 main 方法将在集群中而不是客户端运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止。在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。

      1. Flink on Yarn的三种部署方式使用说明
        1. 第一种方式:YARN session

操作步骤

说明

1

yarn-session.sh(开辟资源)+flink run(提交任务)

这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和Task-managers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)

通过./bin/yarn-session.sh脚本启动YARN Session

脚本可以携带的参数:

-n(--container):TaskManager的数量。(1.10 已经废弃)

-s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。

-jm:JobManager的内存(单位MB)。

-q显示可用的YARN资源(内存,内核);

-tm:每个TaskManager容器的内存(默认值:MB)

-nm:yarn 的appName(现在yarn的ui上的名字)。  

-d:后台执行。

注意:

如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d--detached

确定TaskManager数

Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为10,每个TaskManager有两个任务槽的作业,就会启动5个TaskManager。

2

启动:

bin/yarn-session.sh -tm 1024  -s 4 -d

上面的命令的意思是,每个 TaskManager 拥有4个 Task Slot(-s 4),并且被创建的每个 TaskManager 所在的YARN Container 申请 1024M  的内存,同时额外申请一个Container用以运行ApplicationMaster以及Job Manager。

TM的数量取决于并行度,如下图:

执行:bin/flink run -p 8 examples/batch/WordCount.jar

3

启动成功之后,控制台显示:

4

去yarn页面:ip:8088可以查看当前提交的flink session

5

然后使用flink提交任务

bin/flink run examples/batch/WordCount.jar

在控制台中可以看到wordCount.jar计算出来的任务结果

6

在yarn-session.sh提交后的任务页面中也可以观察到当前提交的任务:

7

点击查看任务细节:

8

停止当前任务:

yarn application -kill  application_1527077715040_0007

        1. 第二种方式:在YARN上运行一个Flink作业

上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业,这里我们还是使用./bin/flink,但是不需要事先启动YARN session:

  1. 使用flink直接提交任务

bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

常用参数:

  • -p 程序默认并行度

下面的参数仅可用于 -m yarn-cluster 模式

  • -yjm JobManager可用内存,单位兆
  • -ynm YARN程序的名称
  • -yq 查询YARN可用的资源
  • -yqu 指定YARN队列是哪一个
  • -ys 每个TM会有多少个Slot
  • -ytm 每个TM所在的Container可申请多少内存,单位兆
  • -yD 动态指定Flink参数
  • -yd 分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)

在8088页面观察:

  1. 停止yarn-cluster

yarn application -kill application的ID

注意:

在创建集群的时候,集群的配置参数就写好了,但是往往因为业务需要,要更改一些配置参数,这个时候可以不必因为一个实例的提交而修改conf/flink-conf.yaml;

可以通过:-yD <arg>                        Dynamic properties

来覆盖原有的配置信息:比如:

bin/flink run -m yarn-cluster -yD fs.overwrite-files=true examples/batch/WordCount.jar

-yD fs.overwrite-files=true -yD taskmanager.network.numberOfBuffers=16368

        1. 第三种方式:Application Mode

application 模式使用 bin/flink run-application 提交作业;通过 -t 指定部署环境,目前 application 模式支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);并支持通过 -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。
通过 bin/flink run-application -h 可以看到 -D/-t 的详细说明:(-e 已经被废弃,可以忽略)

bin/flink run-application -h 

参数:

Options for Generic CLI mode:

     -D <property=value>  Generic configuration options for execution/deployment and for the configured executor.The available options can be found at

                           https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the

                           given job, which is equivalent to the "execution.target" config option. The currently available executors are: "collection", "remote",

                           "local", "kubernetes-session", "yarn-per-job", "yarn-session".

     -t,--target <arg>     The deployment target for the given application, which is equivalent to the "execution.target" config option. The currently available targets are:

                           "collection", "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session", "yarn-application" and "kubernetes-application".

  1. 下面列举几个使用 Application 模式提交作业到 yarn 上运行的命令:

第一种方式

带有 JM 和 TM 内存设置的命令提交:

./bin/flink run-application -t yarn-application \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlinkWordCount" \

./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_51

第二种方式

在上面例子 的基础上自己设置 TaskManager slots 个数为3,以及指定并发数为3:

./bin/flink run-application -t yarn-application -p 3 \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlinkWordCount" \

-Dtaskmanager.numberOfTaskSlots=3 \

./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_52

当然,指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D+通用配置代替客户端命令参数(比如 -p)。所以这样写更符合规范:

./bin/flink run-application -t yarn-application \

-Dparallelism.default=3 \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlinkWordCount" \

-Dtaskmanager.numberOfTaskSlots=3 \

./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_53

第三种方式

 yarn.provided.lib.dirs 参数一起使用,可以充分发挥 application 部署模式的优势:我们看 官方配置文档 对这个配置的解释:

yarn.provided.lib.dirs: A semicolon-separated list of provided lib directories. They should be pre-uploaded and world-readable. Flink will use them to exclude the local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the job submission process. Also YARN will cache them on the nodes so that they doesn't need to be downloaded every time for each application. An example could be hdfs://$namenode_address/path/of/flink/lib

意思是我们可以预先上传 flink 客户端依赖包 (flink-dist/lib/plugin) 到远端存储(一般是 hdfs,或者共享存储),然后通过 yarn.provided.lib.dirs 参数指定这个路径,flink 检测到这个配置时,就会从该地址拉取 flink 运行需要的依赖包,省去了依赖包上传的过程,yarn-cluster/per-job 模式也支持该配置。在之前的版本中,使用 yarn-cluster/per-job 模式,每个作业都会单独上传 flink 依赖包(一般会有 180MB左右)导致 hdfs 资源浪费,而且程序异常退出时,上传的 flink 依赖包往往得不到自动清理。通过指定 yarn.provided.lib.dirs,所有作业都会使用一份远端 flink 依赖包,并且每个 yarn nodemanager 都会缓存一份,提交速度也会大大提升,对于跨机房提交作业会有很大的优化。
使用示例如下:
my-application.jar 是用户 jar 包

上传 Flink 相关 plugins 到hdfs

cd /export/server/flink/plugins

hdfs dfs -mkdir /flink/plugins

hdfs dfs -put \

external-resource-gpu/flink-external-resource-gpu-1.14.0.jar \

metrics-datadog/flink-metrics-datadog-1.14.0.jar \

metrics-graphite/flink-metrics-graphite-1.14.0.jar \

metrics-influx/flink-metrics-influxdb-1.14.0.jar \

metrics-jmx/flink-metrics-jmx-1.14.0.jar \

metrics-prometheus/flink-metrics-prometheus-1.14.0.jar \

metrics-slf4j/flink-metrics-slf4j-1.14.0.jar \

metrics-statsd/flink-metrics-statsd-1.14.0.jar \

/flink/plugins

根据自己业务需求上传相关的 jar

cd /export/server/flink/lib

hdfs dfs -mkdir /flink/lib

hdfs dfs -put flink-csv-1.14.0.jar \

flink-dist_2.12-1.14.0.jar \

flink-json-1.14.0.jar \

flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar \

commons-cli-1.4.jar \

flink-shaded-zookeeper-3.4.14.jar \

flink-table_2.12-1.14.0.jar \

log4j-1.2-api-2.14.1.jar \

log4j-api-2.14.1.jar \

log4j-core-2.14.1.jar \

log4j-slf4j-impl-2.14.1.jar \

/flink/lib

上传用户 jar 到 hdfs

cd /export/server/flink

hdfs dfs -mkdir /flink/user-libs

hdfs dfs -put ./examples/batch/WordCount.jar /flink/user-libs

提交任务

bin/flink run-application -t yarn-application \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dtaskmanager.numberOfTaskSlots=2 \

-Dparallelism.default=2 \

-Dyarn.provided.lib.dirs="hdfs://node1:8020/flink/lib;hdfs://node1:8020/flink/plugins" \

-Dyarn.application.name="batchWordCount" \

hdfs://node1:8020/flink/user-libs/WordCount.jar --output hdfs://node1:8020/wordcount/output_54

也可以将 yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml,这时提交作业就和普通作业没有区别了:

./bin/flink run-application -t yarn-application \

-Djobmanager.memory.process.size=1024m \

-Dtaskmanager.memory.process.size=1024m \

-Dyarn.application.name="MyFlinkWordCount" \

-Dtaskmanager.numberOfTaskSlots=3 \

/local/path/to/my-application.jar

注意:如果自己指定 yarn.provided.lib.dirs,有以下注意事项:

  1. 需要将 lib 包和 plugins 包地址用;分开,从上面的例子中也可以看到,将 plugins 包放在 lib 目录下可能会有包冲突错误
  2. plugins 包路径地址必须以 plugins 结尾,例如上面例子中的 hdfs://node1:8020/flink/plugins
  3. hdfs 路径必须指定 nameservice(或 active namenode 地址),而不能使用简化方式(例如 hdfs://node1:8020/flink/libs)

该种模式的操作使得 flink 作业提交变得很轻量,因为所需的 Flink jar 包和应用程序 jar 将到指定的远程位置获取,而不是由客户端下载再发送到集群。这也是社区在 flink-1.11 版本引入新的部署模式的意义所在。

Application 模式在停止、取消或查询正在运行的应用程序的状态等方面和 flink-1.11 之前的版本一样,可以采用现有的方法。

      1. 注意

如果使用的是flink on yarn方式,想切换回standalone模式的话,需要删除文件:【/tmp/.yarn-properties-root】

因为默认查找当前yarn集群中已有的yarn-session信息中的jobmanager

如果是分离模式运行的YARN JOB后,其运行完成会自动删除这个文件

但是会话模式的话,如果是kill掉任务,其不会执行自动删除这个文件的步骤,所以需要我们手动删除这个文件。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/662178
推荐阅读
相关标签
  

闽ICP备14008679号