赞
踩
flink Standalone模式集群部署,使用flink1.11版本 flink-1.11.1-bin-scala_2.12 .tgz ,安装环境为七个节点,一个jobmanager七个taskmanager。
1、基础环境准备
1.1、jdk1.8或者更高
默认已安装
1.2、主机名和hosts文件集群内完全对应。如下添加:
IP1 hostname1
IP2 hostname2
IP3 hostname3
IP4 hostname4
IP5 hostname5
IP6 hostname6
IP7 hostname7
1.3、集群之间通信正常
1.4、配置免密登录
1.5、时间同步, ntp或者chrony服务 。
创建flink目录,便于维护,所有节点的目录一致/data/flink/flink
解压安装包:tar zxvf flink-1.11.1-bin-scala_2.12 .tgz
修改路径名称:mv flink-1.11 flink
进入flink/conf目录,编辑配置
vi flink-conf.yaml
jobmanager.rpc.address: hostname1
jobmanager.rpc.port: 6123
jobmanager.heap.size: 2048m
taskmanager.heap.size: 4096m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 8
配置文件说明:
jobmanager.rpc.address
表示jobmanager rpc通信绑定的地址,这里就是jobmanager的主机名。
jobmanager.rpc.port
是jobmanager的rpc端口,默认是6123
jobmanager.heap.size
这个是 jobmanager jvm进程的堆内存大小,默认是1024M,这里设置成2048m,也就是2G
taskmanager.heap.size
这个是taskmanager jvm进程的堆内存大小,也就是实际运行任务的jvm最大所能占用的堆内存,默认也是1024m,这里设置成4g
taskmanager.numberOfTaskSlots
这个表示每个taskmanager所能提供的slots数量,也就是flink节点的计算能力,这个和算子的并行度配合使用,每个slot运行1个pipeline[source,transformation,slink],多个slot使得flink的subtasks是并行的,这个一般设置成和机器cpu核数一致,比如我们这里是3个taskmanager,每个taskmanager是4个slots,那么这个集群的slots个数为12.
parallelism.default
这个是默认任务的并行度,也就是说当代码中或者提交时没指定并行度,则按照这里的并行度执行任务,并行度是按照整个集群来算的,比如上面slots个数为12,那么支持subtasks最大的并行度就是12,因为在代码中通常会针对单个任务设置并行度,所以这里的默认并行度可以不设置.
上面这几项是flink独立集群最基本的配置,另外还有关于rest和web ui的配置,如果需要可以配置一下,我这里按照默认的配置.
web ui和jobmanager同时运行,端口默认为8081,可以根据需要修改,另外还有个参数:web.submit.enable,也就是是否可以从界面提交任务,默认是开启的,取消注释就可以关闭.
conf/masters,配置jobmanager的机器列表,这里独立集群非HA模式下配置为:hostname1:8081
conf/slaves,配置taskmanager的机器列表,这里配置如下:
hostname1
hostname2
hostname3
hostname4
hostname5
hostname6
hostname7
确保正确后保存,分发到各个节点:
scp -P 53322 -r /data/user/flink/flink:root@hostname1:/data/user/flink/
按如上命令进行分发。
可以在任意节点启动集群: bin/start-cluster.sh
然后可以访问浏览器界面查看web ui,这里是:http://hostname1:8081
单个jobmanager的启动或停止: bin/jobmanager.sh start|start-foreground|stop|stop-all
单个taskmanager的启动或停止: bin/taskmanager.sh start|start-foreground|stop|stop-all
1、集群之间ssh开放端口被修改
给flink指定ssh连接端口,在flink-conf.yaml中添加如下一行
env.ssh.opts: -p 5000
2、
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
... 31 more
资源不够一般为内存不够,按照实际情况怎加配置项:一般为以下几项:
jobmanager.heap.size:4096m
taskmanager.memory.process.size1024m
taskmanager.memory.flink.size:1024
jobmanager.heap.size:8192m
taskmanager.heap.size:4096m
3、
java.util.concurrent.CompletionException: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request 822ebe74ed5812ce780135edf4012e03. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?] at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.failPendingRequest(SlotPoolImpl.java:517) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.failAllocation(SlotPoolImpl.java:708) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:562) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.jobmaster.JobMaster.notifyAllocationFailure(JobMaster.java:700) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.11.1.jar:1.11.1] Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException: Could not fulfill slot request 822ebe74ed5812ce780135edf4012e03. Requested resource profile (ResourceProfile{UNKNOWN}) is unfulfillable. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.setFailUnfulfillableRequest(SlotManagerImpl.java:577) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.resourcemanager.ResourceManager.setFailUnfulfillableRequest(ResourceManager.java:1132) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.resourcemanager.StandaloneResourceManager.lambda$startStartupPeriod$0(StandaloneResourceManager.java:116) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.12-1.11.1.jar:1.11.1] ... 22 more
可能是资源不够,同第二项检查
4、web界面slot数为0
检查各个节点,集群是否正常
5、注意集群IP和hostname的对应关系。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。