赞
踩
Flink支持多种安装模式。
和Local模式不同的是,Standalone模式中Flink的各个角色都是独立的进程。
https://archive.apache.org/dist/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.12.tgz
cd /export/software/ rz 上传 解压: tar -zxf flink-1.14.0-bin-scala_2.12.tgz -C /export/server/ 为flink创建软连接 cd /export/server/ ln -s flink-1.14.0/ flink |
cd /export/server/flink bin/start-cluster.sh 启动后, 通过JPS查看进程信息: |
slot在flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分 配到slot来并行执行程序
cd /export/server/flink bin/flink run /export/server/flink/examples/batch/WordCount.jar |
JobManager 和 TaskManager 的启动日志可以在 Flink binary 目录下的 log 子目录中找到 cd /export/server/flink/log |
log 目录中以“flink-${user}-standalonesession-${id}-${hostname}”为前缀的文件对应的即是 JobManager 的输出,其中有三个文件:
log 目录中以“flink-${user}-taskexecutor-${id}-${hostname}”为前缀的文件对应的是 TaskManager 的输出,也包括三个文件,和 JobManager 的输出一致。
cd /export/server/flink/conf vim flink-conf.yaml 修改内容内容: # jobManager 的IP地址 jobmanager.rpc.address: node1.itcast.cn # JobManager 的端口号 jobmanager.rpc.port: 6123 # JobManager JVM heap 内存大小 jobmanager.memory.process.size: 1024m # TaskManager JVM heap 内存大小 taskmanager.memory.process.size: 1024m # 每个 TaskManager 提供的任务 slots 数量大小 taskmanager.numberOfTaskSlots: 2 # 程序默认并行计算的个数 parallelism.default: 1 |
slot和parallelism总结:
taskmanager.numberOfTaskSlots:2
每一个taskmanager中的分配2个TaskSlot,3个taskmanager一共有6个TaskSlot
parallelism.default:1 运行程序默认的并行度为1,6个TaskSlot只用了1个,有5个空闲
slot是静态的概念,是指taskmanager具有的最大并发执行能力
parallelism是动态的概念,是指程序运行时实际使用的并发能力
cd /export/server/flink/conf vim workers 添加以下内容: node1.itcast.cn node2.itcast.cn node3.itcast.cn |
vim /etc/profile 添加: export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop 重新加载环境变量: source /etc/profile |
5) 将资料中的相关jar包全部上传至flink的lib目录下 cd /export/server/flink/lib rz上传 |
cd /export/server scp -r flink-1.14.0/ node2:$PWD scp -r flink-1.14.0/ node3:$PWD 在node2和node3创建软链接 cd /export/server ln -s flink-1.14.0/ flink 修改环境变量: vim /etc/profile 添加内容: export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop 加载环境变量: source /etc/profile |
cd /export/server/flink bin/start-cluster.sh |
Jps 查看各个节点进程:
1) 首先启动 hadoop集群 start-all.sh 2) 接着在hdfs上创建目录: hadoop fs -mkdir -p /test/input 3) 在linux上创建wordcount.txt文件: cd ~ vim wordcount.txt 内容如下: hadoop hello world hive hive hadoop hello hadoop hadoop hive hive 4) 将其上传至hdfs上: hadoop fs -put /root/wordcount.txt /test/input 5) 执行flink程序: cd /export/server/flink bin/flink run /export/server/flink/examples/batch/WordCount.jar --input hdfs://node1:8020/test/input/wordcount.txt |
6) 查看web浏览器: http://node1:8081 |
从上述架构图中,可发现JobManager存在单点故障,一旦JobManager出现意外,整个集群无法工作。所以,为了确保集群的高可用,需要搭建Flink的HA。(如果是部署在YARN上,部署YARN的HA),我们这里演示如何搭建Standalone 模式HA。
需要将hadoop组件上传到Flink安装包的lib目录下,因为Flink1.8开始,安装包不再基于flink版本进行划分,因此需要手动下载hadoop组件,同时需要注意Hadoop版本号需要与开发环境版本保持一致,以生产环境使用Hadoop3为例
操作步骤 | 说明 |
1 | 下载hadoop的组件Jar包 |
2 | 将下载的jar文件拷贝到flink安装目录lib目录下(每个节点都需要拷贝) |
3 | 拷贝完成jar到每个节点以后需要重启flink集群 |
cd /export/server/flink/conf/ vim flink-conf.yaml 修改以下内容: #开启HA,使用文件系统作为快照存储:126行 state.backend: filesystem #默认为none,用于指定checkpoint的data files和meta data存储的目录:131行 state.checkpoints.dir: hdfs://node1.itcast.cn:8020/flink-checkpoints #默认为none,用于指定savepoints的默认目录:135行 state.savepoints.dir: hdfs://node1.itcast.cn:8020/flink-savepoints #使用zookeeper搭建高可用:81行 high-availability: zookeeper # 存储JobManager的元数据到HDFS,用来恢复JobManager 所需的所有元数据 :90行 high-availability.storageDir: hdfs://node1.itcast.cn:8020/flink/ha/ high-availability.zookeeper.quorum: node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181 |
分发到node2和node3
cd /export/server/flink/conf scp flink-conf.yaml node2:$PWD scp flink-conf.yaml node3:$PWD |
cd /export/server/flink/conf vim flink-conf.yaml 修改: jobmanager.rpc.address: node2.itcast.cn |
cd /export/server/flink/conf vim masters 修改为以下内容: node1.itcast.cn:8081 node2.itcast.cn:8081 |
分发给node2和node3
cd /export/server/flink/conf scp masters node2:$PWD scp masters node3:$PWD |
1) 先启动zookeeper集群: 三个节点启动 cd /export/server/zookeeper/bin/ ./zkServer.sh start 2) 启动hadoop集群 start-all.sh 3) 启动flink集群 cd /export/server/flink ./bin/start-cluster.sh |
查看node1和node2的8081 webUI
注意事项
切记搭建HA,需要将第二个节点的 jobmanager.rpc.address 修改为node2.itcast.cn
Local模式:通过一个JVM进程中,通过线程模拟出各个Flink角色来得到Flink环境
Standalone模式:各个角色是独立的进程存在
YARN模式:Flink的各个角色,均运行在多个YARN的容器内,其整体上是一个YARN的任务
flink on yarn的前提是:hdfs、yarn均启动
在企业实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
操作步骤 | 说明 |
1 | 打开yarn配置页面(每台hadoop节点都需要修改) |
vim etc/hadoop/yarn-site.xml | |
添加 | |
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> | |
是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。 在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job | |
2 | 分发yarn-site.xml到其它服务器节点 |
scp yarn-site.xml node2:$PWD scp yarn-site.xml node3:$PWD | |
3 | 启动HDFS、YARN集群 |
start-all.sh |
从图中可以看出,Yarn的客户端需要获取hadoop的配置信息,连接Yarn的ResourceManager。所以要有设置有 YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择hadoop_home的环境变量,都区成功将会尝试加载$HADOOP_HOME/etc/hadoop的配置文件。
这种模式会预先在yarn或者或者k8s上启动一个flink集群,然后将任务提交到这个集群上,这种模式,集群中的任务使用相同的资源,如果某一个任务出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。
特点:需要事先申请资源,使用Flink中的yarn-session(yarn客户端),启动JobManager和TaskManger
优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率
缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源
应用场景:适合作业递交比较频繁的场景,小作业比较多的场景
考虑到集群的资源隔离情况,一般生产上的任务都会选择per job模式,也就是每个任务启动一个flink集群,各个集群之间独立运行,互不影响,且每个集群可以设置独立的配置。
特点:每次递交作业都需要申请一次资源
优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源
缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间
应用场景:适合作业比较少的场景、大作业的场景
flink-1.11 引入了一种新的部署模式,即 Application 模式。目前,flink-1.11 已经可以支持基于 Yarn 和 Kubernetes 的 Application 模式。
Session模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行。
Per-Job模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行。
通过以上两种模式的特点描述,可以看出,main方法都是在客户端执行,社区考虑到在客户端执行 main() 方法来获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。尤其在大量用户共享客户端时,问题更加突出。
此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。
因此,社区提出新的部署方式 Application 模式解决该问题。
Application 模式下,用户程序的 main 方法将在集群中而不是客户端运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止。在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。
操作步骤 | 说明 |
1 | yarn-session.sh(开辟资源)+flink run(提交任务) |
这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和Task-managers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件) 通过./bin/yarn-session.sh脚本启动YARN Session 脚本可以携带的参数: | |
-n(--container):TaskManager的数量。(1.10 已经废弃) -s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。 -jm:JobManager的内存(单位MB)。 -q:显示可用的YARN资源(内存,内核); -tm:每个TaskManager容器的内存(默认值:MB) -nm:yarn 的appName(现在yarn的ui上的名字)。 -d:后台执行。 | |
注意: 如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d或--detached。 | |
确定TaskManager数: Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为10,每个TaskManager有两个任务槽的作业,就会启动5个TaskManager。 | |
2 | 启动: |
bin/yarn-session.sh -tm 1024 -s 4 -d | |
上面的命令的意思是,每个 TaskManager 拥有4个 Task Slot(-s 4),并且被创建的每个 TaskManager 所在的YARN Container 申请 1024M 的内存,同时额外申请一个Container用以运行ApplicationMaster以及Job Manager。 TM的数量取决于并行度,如下图: 执行:bin/flink run -p 8 examples/batch/WordCount.jar | |
3 | 启动成功之后,控制台显示: |
4 | 去yarn页面:ip:8088可以查看当前提交的flink session |
5 | 然后使用flink提交任务 |
bin/flink run examples/batch/WordCount.jar | |
在控制台中可以看到wordCount.jar计算出来的任务结果 | |
6 | 在yarn-session.sh提交后的任务页面中也可以观察到当前提交的任务: |
7 | 点击查看任务细节: |
8 | 停止当前任务: |
yarn application -kill application_1527077715040_0007 |
上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业,这里我们还是使用./bin/flink,但是不需要事先启动YARN session:
bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar 常用参数:
下面的参数仅可用于 -m yarn-cluster 模式
|
在8088页面观察: |
yarn application -kill application的ID |
注意: |
在创建集群的时候,集群的配置参数就写好了,但是往往因为业务需要,要更改一些配置参数,这个时候可以不必因为一个实例的提交而修改conf/flink-conf.yaml; 可以通过:-yD <arg> Dynamic properties 来覆盖原有的配置信息:比如: bin/flink run -m yarn-cluster -yD fs.overwrite-files=true examples/batch/WordCount.jar -yD fs.overwrite-files=true -yD taskmanager.network.numberOfBuffers=16368 |
application 模式使用 bin/flink run-application 提交作业;通过 -t 指定部署环境,目前 application 模式支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);并支持通过 -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。
通过 bin/flink run-application -h 可以看到 -D/-t 的详细说明:(-e 已经被废弃,可以忽略)
bin/flink run-application -h 参数: Options for Generic CLI mode: -D <property=value> Generic configuration options for execution/deployment and for the configured executor.The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "collection", "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. The currently available targets are: "collection", "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session", "yarn-application" and "kubernetes-application". |
第一种方式 | 带有 JM 和 TM 内存设置的命令提交: |
./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name="MyFlinkWordCount" \ ./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_51 | |
第二种方式 | 在上面例子 的基础上自己设置 TaskManager slots 个数为3,以及指定并发数为3: |
./bin/flink run-application -t yarn-application -p 3 \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ ./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_52 | |
当然,指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D+通用配置代替客户端命令参数(比如 -p)。所以这样写更符合规范: | |
./bin/flink run-application -t yarn-application \ -Dparallelism.default=3 \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ ./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_53 | |
第三种方式 | 和 yarn.provided.lib.dirs 参数一起使用,可以充分发挥 application 部署模式的优势:我们看 官方配置文档 对这个配置的解释: |
yarn.provided.lib.dirs: A semicolon-separated list of provided lib directories. They should be pre-uploaded and world-readable. Flink will use them to exclude the local Flink jars(e.g. flink-dist, lib/, plugins/)uploading to accelerate the job submission process. Also YARN will cache them on the nodes so that they doesn't need to be downloaded every time for each application. An example could be hdfs://$namenode_address/path/of/flink/lib | |
意思是我们可以预先上传 flink 客户端依赖包 (flink-dist/lib/plugin) 到远端存储(一般是 hdfs,或者共享存储),然后通过 yarn.provided.lib.dirs 参数指定这个路径,flink 检测到这个配置时,就会从该地址拉取 flink 运行需要的依赖包,省去了依赖包上传的过程,yarn-cluster/per-job 模式也支持该配置。在之前的版本中,使用 yarn-cluster/per-job 模式,每个作业都会单独上传 flink 依赖包(一般会有 180MB左右)导致 hdfs 资源浪费,而且程序异常退出时,上传的 flink 依赖包往往得不到自动清理。通过指定 yarn.provided.lib.dirs,所有作业都会使用一份远端 flink 依赖包,并且每个 yarn nodemanager 都会缓存一份,提交速度也会大大提升,对于跨机房提交作业会有很大的优化。 | |
上传 Flink 相关 plugins 到hdfs | |
cd /export/server/flink/plugins hdfs dfs -mkdir /flink/plugins hdfs dfs -put \ external-resource-gpu/flink-external-resource-gpu-1.14.0.jar \ metrics-datadog/flink-metrics-datadog-1.14.0.jar \ metrics-graphite/flink-metrics-graphite-1.14.0.jar \ metrics-influx/flink-metrics-influxdb-1.14.0.jar \ metrics-jmx/flink-metrics-jmx-1.14.0.jar \ metrics-prometheus/flink-metrics-prometheus-1.14.0.jar \ metrics-slf4j/flink-metrics-slf4j-1.14.0.jar \ metrics-statsd/flink-metrics-statsd-1.14.0.jar \ /flink/plugins | |
根据自己业务需求上传相关的 jar | |
cd /export/server/flink/lib hdfs dfs -mkdir /flink/lib | |
hdfs dfs -put flink-csv-1.14.0.jar \ flink-dist_2.12-1.14.0.jar \ flink-json-1.14.0.jar \ flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar \ commons-cli-1.4.jar \ flink-shaded-zookeeper-3.4.14.jar \ flink-table_2.12-1.14.0.jar \ log4j-1.2-api-2.14.1.jar \ log4j-api-2.14.1.jar \ log4j-core-2.14.1.jar \ log4j-slf4j-impl-2.14.1.jar \ /flink/lib | |
上传用户 jar 到 hdfs | |
cd /export/server/flink hdfs dfs -mkdir /flink/user-libs hdfs dfs -put ./examples/batch/WordCount.jar /flink/user-libs | |
提交任务 | |
bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=2 \ -Dyarn.provided.lib.dirs="hdfs://node1:8020/flink/lib;hdfs://node1:8020/flink/plugins" \ -Dyarn.application.name="batchWordCount" \ hdfs://node1:8020/flink/user-libs/WordCount.jar --output hdfs://node1:8020/wordcount/output_54 | |
也可以将 yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml,这时提交作业就和普通作业没有区别了: | |
./bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name="MyFlinkWordCount" \ -Dtaskmanager.numberOfTaskSlots=3 \ /local/path/to/my-application.jar | |
注意:如果自己指定 yarn.provided.lib.dirs,有以下注意事项:
该种模式的操作使得 flink 作业提交变得很轻量,因为所需的 Flink jar 包和应用程序 jar 将到指定的远程位置获取,而不是由客户端下载再发送到集群。这也是社区在 flink-1.11 版本引入新的部署模式的意义所在。 Application 模式在停止、取消或查询正在运行的应用程序的状态等方面和 flink-1.11 之前的版本一样,可以采用现有的方法。 |
如果使用的是flink on yarn方式,想切换回standalone模式的话,需要删除文件:【/tmp/.yarn-properties-root】
因为默认查找当前yarn集群中已有的yarn-session信息中的jobmanager
如果是分离模式运行的YARN JOB后,其运行完成会自动删除这个文件
但是会话模式的话,如果是kill掉任务,其不会执行自动删除这个文件的步骤,所以需要我们手动删除这个文件。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。