当前位置:   article > 正文

4.1.5 Flink-流处理框架-HA-Flink集群环境搭建(Standalone模式)_flink ha部署

flink ha部署

目录

1.前置准备

2.安装HA-Flink集群环境搭建(Standalone模式)

一、集群规划

二、集群配置

2.1 flink-conf.yaml

2.2 masters

2.3 slaves

2.4 上传hadoop依赖包

2.5 分发

三、启动集群

3.1 启动ha-hadoop集群

3.2 启动Flink集群和任务历史服务器

四、查看集群

4.1 jps进程查看

4.2 Web UI查看

五、提交任务


1.前置准备

CentOS7、jdk1.8、flink-1.10.1、hadoop-2.7.7、zookeeper-3.5.7

2.安装HA-Flink集群环境搭建(Standalone模式)

一、集群规划

图片

二、集群配置

  1. # 配置使用zookeeper来开启高可用模式
  2. high-availability: zookeeper
  3. # 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址
  4. high-availability.zookeeper.quorum: hadoop01:2181,hadoop02:2181,hadoop03:2181
  5. # 在zookeeper上存储flink集群元信息的路径
  6. high-availability.zookeeper.path.root: /ha-flink
  7. # 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息
  8. high-availability.storageDir: hdfs://hacluster:8020/flink/recovery
  9. # 将已完成的作业上传到此目录中,让任务历史服务器进行监控
  10. jobmanager.archive.fs.dir: hdfs://hacluster:8020/flink-jobhistory
  11. historyserver.web.address: hadoop01
  12. historyserver.web.port: 18082
  13. # 任务历史服务器监控目录中已存档的作业
  14. historyserver.archive.fs.dir: hdfs://hacluster:8020/flink-jobhistory
  15. historyserver.web.refresh-interval: 10000

2.2 masters

  1. hadoop01:8081
  2. hadoop02:8081

2.3 slaves

  1. hadoop01
  2. hadoop02
  3. hadoop03

2.4 上传hadoop依赖包

作业归档需要记录在hdfs上,但是当前版本的flink把hadoop的一些依赖删除了,需要手动将jar包放到lib目录下 ,这里我用的是flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

jar包(hadoop依赖包和wordcount依赖包)给大家提供下载地址:https://lanzous.com/b02b6wnfa,密码:2oby

2.5 分发

[xiaokang@hadoop01 ~]$ distribution.sh /opt/software/flink-1.10.1

三、启动集群

3.1 启动ha-hadoop集群

  1. [xiaokang@hadoop01 ~]ha-hadoop.sh start
  2. # 创建作业归档目录
  3. [xiaokang@hadoop01 ~]hdfs dfs -mkdir /flink-jobhistory

3.2 启动Flink集群和任务历史服务器

  1. [xiaokang@hadoop01 ~]start-cluster.sh
  2. [xiaokang@hadoop01 ~]historyserver.sh start

四、查看集群

4.1 jps进程查看

  1. [xiaokang@hadoop01 ~]$ call-cluster.sh jps
  2. --------hadoop01--------
  3. 10369 QuorumPeerMain
  4. 11297 NodeManager
  5. 12241 TaskManagerRunner
  6. 10885 JournalNode
  7. 10551 NameNode
  8. 12599 Jps
  9. 12538 HistoryServer
  10. 11083 DFSZKFailoverController
  11. 11211 JobHistoryServer
  12. 10669 DataNode
  13. 11823 StandaloneSessionClusterEntrypoint
  14. --------hadoop02--------
  15. 8977 TaskManagerRunner
  16. 7459 QuorumPeerMain
  17. 7956 ResourceManager
  18. 7542 NameNode
  19. 7623 DataNode
  20. 8616 StandaloneSessionClusterEntrypoint
  21. 9066 Jps
  22. 7821 DFSZKFailoverController
  23. 7726 JournalNode
  24. 8047 NodeManager
  25. --------hadoop03--------
  26. 7456 QuorumPeerMain
  27. 7636 JournalNode
  28. 7764 ResourceManager
  29. 7878 NodeManager
  30. 8345 TaskManagerRunner
  31. 8410 Jps
  32. 7532 DataNode

4.2 Web UI查看

两个 JobManager 和 任务历史服务器的端口号分别为 8081 、8081和 18082,界面应该如下:

图片

图片

图片

五、提交任务

流计算词频统计案例源码:

  1. package cool.xiaokang.wordcount
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. /**
  4.  * TODO: Flink流计算WordCount
  5.  *
  6.  * @author: xiaokang
  7.  * @date2020/6/8 14:19
  8.  *      
  9.  */
  10. object FlinkStreamingWordCount1 {
  11.   def main(args: Array[String]): Unit = {
  12.     if(args==null || args.length!=2){
  13.       println("缺少参数,使用方法:flink run FlinkStreamingWordCount-1.0.jar -c cool.xiaokang.wordcount.FlinkStreamingWordCount hadoop 1124")
  14.       System.exit(1)
  15.     }
  16.     val host=args(0)
  17.     val port=args(1)
  18.     //1.初始化流计算环境
  19.     val streamEnv:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  20.     //2.导入隐式转换
  21.     import org.apache.flink.streaming.api.scala._
  22.     //3.读取数据(Socket流)
  23.     val lines:DataStream[String= streamEnv.socketTextStream(host, port.toInt)
  24.     //4.转换和处理数据并打印结果
  25.     val result=lines.flatMap(_.split(" "))
  26.       .map((_,1))
  27.       .keyBy(0//分组算子 01代表前面的DataStream[(String,Int)]的下标,0代表单词,1代表词频
  28.       .sum(1//聚合累加算子
  29.     result.print("Result:")
  30.     //5.启动流计算程序
  31.     streamEnv.execute("FlinkSteamWordCount-微信公众号:小康新鲜事儿")
  32.   }
  33. }

提交作业:

  1. 1.虚拟机内开启端口
  2. [xiaokang@hadoop01 ~]$ nc -lk 1124
  3. 2.提交作业
  4. [xiaokang@hadoop01 ~]$ flink run -c cool.xiaokang.wordcount.FlinkStreamingWordCount ~/FlinkStreamingWordCount-1.0.jar hadoop01 1124

图片

输入数据以及结果:

图片

图片

运行过程中将正在服务的JobManager给kill掉,测试是否高可用

[xiaokang@hadoop01 ~]$ kill -9 11641

此时hadoop01的8081无法访问,hadoop02会进行接管(重新提交刚才被中断的作业),这个过程需要稍等一会儿

再次输入数据后可以从结果看出是一个新作业:

图片

图片

结束任务后可以在任务历史服务器WebUI中进行查看:

图片

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/764764
推荐阅读
相关标签
  

闽ICP备14008679号