赞
踩
Flink可以部署于各种各样的集群之中,比如Flink自己的standalone集群(不依赖于其他资源调度框架,是Flink自带的),flink on yarn集群等。而不管是standalone还是flink on yarn都属于集群,还有一种特殊的单机flink——local。
Flink真正用来做执行操作的叫做worker,进程在不同的环境模式下运行,名称有所不同。如在Standalone集群模式启动,JobManager叫做StandaloneSessionClusterEntrypoint,TaskManager中叫做TaskManagerRunner,而使用yarn集群启动,名称又不相同。
hadoop-2.7.4
java 1.8
zookeeper 3.4.14
os:centos 7.5
wget +网址
wget +网址
tar -zxvf flink-1.7.1-bin-hadoop27-scala_2.11.tgz
tar -zxvf /scala-2.11.8
vi /etc/profile
vi …/conf/flink-conf.yaml
jobmanager.rpc.address: 192.168.2.100(主机ip地址)
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
vi …/conf/slaves
vi …/conf/master
localhost:8081
注:此配置关系到在web查看flink运行的整体情况
Scp -r flink root@slave1.hadoop:/usr/local/flink
Scp -r flink root@slaves2.hadoop:/usr/local/flink
…/bin/start-cluster.sh
分别查看主节点jps是否有 StandaloneSessionClusterEntrypoint
从节点jps是否有TaskManagerRunner
若有,则表明flink-standalone模式部署完成
…/bin/start-scala-shell.sh remote 192.168.2.100 8081
Scala> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") .map { (_, 1) }.groupBy(0).sum(1)
Scala> counts.print()
与下图相同则表示配置成功
因为yarn属于hadoop集群的组件
Start-all.sh
打开flink的bin目录,输入如下指令
/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d (参数照着填,否则会报错;内存不足也会报错)
成功启动后从节点会多出一个名叫的 YarnSessionClusterEntrypoint进程
flink run --class org.example.Streaming flinkDemo4-1.0-SNAPSHOT.jar
kill -9 application_1645859510311_0002(app的id会有提示)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。