赞
踩
节点服务器 | hadoop1 | hadoop2 | hadoop3 | hadoop4 |
角色 | JobManager TaskManager | TaskManager | TaskManager | TaskManager |
从16版本开始1-9行必须改集群才能用
# JobManager节点地址. jobmanager.rpc.address: hadoop1 jobmanager.bind-host: 0.0.0.0 jobmanager.rpc.port: 6123 rest.address: hadoop1 rest.bind-address: 0.0.0.0 # TaskManager节点地址.需要配置为当前机器名 taskmanager.host: hadoop1 #hadoopN节点这里为hadoopN N=1,2,3,4 taskmanager.bind-host: 0.0.0.0 jobmanager.memory.process.size: 1600m #对JobManager进程可使用到的全部内存 #其中一部分是JobManager使用的内存,还有一部分作为jvm进程本身的开销,如元数据的保存。 #默认1600M taskmanager.memory.process.size: 1728m #TaskManager进程可使用到的全部内存 #其中一部分是TaskManager使用的内存,还有一部分作为jvm进程本身的开销,如元数据的保存。 #默认1728m taskmanager.numberOfTaskSlots: 1 #一个taskmanager中一共有多少个任务槽即一个taskmanager最多能够并行执行的多少的任务. #默认为1 parallelism.default: 1 #并行度,实际运行的并行度
masters
hadoop1:8081
workers
- hadoop1
- hadoop2
- hadoop3
- hadoop4
vim /etc/profile.d/my_env.sh
- export FLINK_HOME=/data/cluster/flink-1.17.0
- export PATH=$PATH:$FLINK_HOME/bin
- start-cluster.sh
- stop-cluster.sh
StandaloneSessionClusterEntrypoint->JobmManager
TaskManagerRunner->TaskManager
不知道为啥hadoop1节点没有启动TaskManagerRunner进程,看了配置文件也没问题呀
获得入口类:
- flink run \
- -m hadoop1:8081 \#指定jobmanager的位置,和webUI端口号
- -c chapter02.StreamWordCount \ #指定入口类
- -p 2 \ #指定并行度
- /data/yzw/flink_jars/original-flink-1.0-SNAPSHOT.jar \
- --host hadoop1 --port 7777
ctrl+c并不会将任务退出
【1】webUI界面看 Hadoop:8081
【2】命令行
flink list jobID 查看这个job的运行状态
flink list 查看所有正在运行的job
flink list -a 所有的job,包括执行完的
flink cancel jobID退出job
在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:
会话模式(Session Mode)
单作业模式(Per-Job Mode)
应用模式(Application Mode)
他们的区别在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行:客户端、JobManager
会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业(需要客户端先跑一遍代码,把应用拆分成作业,比如并行度为2,会拆成两个作业,然后将作业提交给jobManager)。集群启动时所有资源都已经确定,所以所有提交的作业会竞争集群中的资源。集群的生命周期超越任何作业。
当前的资源被占用,只有当资源被释放掉,才能提交别的作业。因此需要结合资源管理平台(yarn等)。
适合每个作业都很小,执行时间很短,执行完就能释放资源,接下来可以跑其他的作业。
会话:即StandaloneSessionClusterEntrypoint进程
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个作业启动一个集群,就是所谓的单作业模式。
单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序(需要客户端先跑一遍代码,把应用拆分成作业,比如并行度为2,会拆成两个作业,需要启动两个集群),然后启动集群,作业提交给JobManager,进而分发给TaskManager执行。作业完成后,集群就会关闭,所有资源也会释放。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。
需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,如yarn、kubernetes。因为在一开始不启动集群,有作业提交之后,需要为作业单独的取创建容器,去跑作业。
一个应用启动一个集群。会话模式、单作业模式,都需要客户端先跑一遍代码,把应用拆分成作业,比如并行度为2,会拆成两个作业。单作业模式在这种情况下,会启动两个集群,而应用模式会启动一个集群。
会话模式、单作业模式都需要在客户端跑一遍代码,然后由客户端将作业提交给JobManager。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager。加上很多情况下我们提交作业用的是一个客户端,会加重客户端所在节点的资源消耗。
所以解决办法就是,我们不要客户端了,直接把应用提交到JobManager上运行,所有的代码的解析,转换成作业,都由JobManager处理。而这也就代表着,我们需要为每一个提交的应用启动一个JobManager,也就是创建一个集群。这个JobManager只为执行一个应用而存在,执行结束之后JobManager也就关闭了,这就是所谓的应用模式。
和单作业模式的区别在于:单作业模式是由客户端将应用解析成作业,每个作业启动一个Flink集群。
应用模式是由JobManager将应用解析成作业,每个应用启动一个Flink集群。
需要先启动单节点的集群
- 启动start-cluster.sh
- 停止stop-cluster.sh
不借助任何资源管理平台,这时候创建的Flink集群中的组件,就是一个个JVM进程。资源需要手动管理,在实际项目中不常用。
测试用的就是。
不支持。Flink无法直接以单作业的方式启动集群,必须需要借助一些资源管理平台。
支持。不常用。
Yarn上部署的过程是:客户端把Flink应用交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManager上的作业所需要的Slot数量动态分配TaskManager资源。
修改环境变量vim /etc/profile.d/my_env.sh
常用的是会话模式和单作业模式。
- #FLINK
- export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
- export HADOOP_CLASSPATH=`hadoop classpath`
- export FLINK_HOME=/data/cluster/flink-1.13.0
- export PATH=$PATH:$FLINK_HOME/bin
yarn-session.sh -nm test -d
向yarn集群申请资源,开启一个YARNsession并启动Flink集群,之后就可以向集群提交作业。
可以打开这个链接,发现可用资源为0。这里资源不是写死的,而是动态分配的,运行应用就会自动分配了。
- -d 分离模式,如果你不想让flink yarn session客户端一直前台运行.可以使用这个参数,改为在后台运行
- 即使关掉当前对话窗口,yarn session也可以后台运行。
- -jm(--jobManagerMemory):配置jobManager所需内存,默认单位MB
- -nm(--name):配置在yarn ui界面上显示的任务名,即8088端口显示的名字
- -qu(--queue):指定yarn队列名
- -tm(--taskManager):配置每个TaskManager所使用内存
- flink run \
- -c chapter02.StreamWordCount \
- -p 2 \
- /data/yzw/flink_jars/original-flink-1.0-SNAPSHOT.jar \
- --host hadoop1 --port 7777
执行一个作业启动一个flink集群。
不需要提前启动yarn-session.sh
执行命令提交作业:
- flink run -d \
- -t yarn-per-job \ #使用Yarn的单作业模式
- -c chapter02.StreamWordCount \
- -p 2 \
- /data/yzw/flink_jars/original-flink-1.0-SNAPSHOT.jar \
- --host hadoop1 --port 7777
- flink run-application -d \
- -t yarn=application \ #使用Yarn的单作业模式
- -c chapter02.StreamWordCount \
- -p 2 \
- /data/yzw/flink_jars/original-flink-1.0-SNAPSHOT.jar \
- --host hadoop1 --port 7777
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。