当前位置:   article > 正文

Flink实战 - 搭建HA高可用集群

Flink实战 - 搭建HA高可用集群

一、部署说明

        废话不多说,直接上链接。

        flink-1.18.1

        hadoop-3.4.0

        zookeeper-3.8.4

        下载完成后上传至服务器中,执行如下命令解压:

tar -zxvf xxx.tar.gz

        解压完成后,将安装路径加入到 /etc/profile 或者 .bashrc文件中,然后使用 source /etc/profile 或者 source ~/.bashrc 使环境生效。

  1. export JAVA_HOME=/usr/lib/jdk1.8.0_131
  2. export PATH=$PATH:$JAVA_HOME/bin
  3. export FLINK_HOME=/home/web/zhangxiang/flink-1.18.1
  4. export PATH=$PATH:$FLINK_HOME/bin
  5. export HADOOP_HOME=/home/web/zhangxiang/ha/hadoop-3.4.0
  6. export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
  7. export PATH=${HADOOP_HOME}/bin:$PATH
  8. export PATH=$PATH:$HADOOP_HOME/sbin:$PATH
  9. export HADOOP_CLASSPATH=`hadoop classpath`
  10. export ZOOKEEPER_HOME=/home/web/zhangxiang/zookeeper-3.8.4
  11. export PATH=$PATH:$ZOOKEEPER_HOME/bin

二、集群规划

2.1 Flink HA

r3s3s4
JobManager
JobManager
TaskManager
TaskManager
TaskManager
HistoryServer

        服务器之间要设置互免登录,具体操作可询问ChatGPT或者通义千问,不在此赘述;并且更改 /etc/hosts 文件,命令如下:

vim /etc/hosts

内容如下:

xxx.xx.xx.xx    r3    r3
yyy.yy.yyy.y    s3   s3
yyy.yy.yyy.y    s4   s4

2.1.1 配置文件

2.1.1.1 flink-conf.yaml
        在不同的服务器上更改: taskmanager.host   为服务器IP,并在高可用节点更改 jobmanager.rpc.address 为服务器 IP, 切记不要填写 xxx.bind-host   否则会出错, 历史服务器需要单独执行命令 '/bin/historyserver.sh start' 进行开启。
        下面配置选项具体含义可看官网配置参数
  1. jobmanager.rpc.address: r3
  2. jobmanager.rpc.port: 6123
  3. jobmanager.memory.process.size: 2048m
  4. taskmanager.host: r3
  5. taskmanager.memory.process.size: 4096m
  6. taskmanager.numberOfTaskSlots: 3
  7. parallelism.default: 3
  8. high-availability.type: zookeeper
  9. high-availability.storageDir: hdfs://mycluster/flink/ha/
  10. high-availability.zookeeper.path.root: /flink
  11. high-availability.cluster-id: /cluster_one
  12. high-availability.zookeeper.quorum: r3:2181,s3:2181,s4:2181
  13. execution.checkpointing.interval: 30000
  14. execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
  15. execution.checkpointing.max-concurrent-checkpoints: 2
  16. execution.checkpointing.min-pause: 500
  17. execution.checkpointing.mode: EXACTLY_ONCE
  18. execution.checkpointing.timeout: 600000
  19. execution.checkpointing.tolerable-failed-checkpoints: 3
  20. restart-strategy.type: fixed-delay
  21. restart-strategy.fixed-delay.attempts: 3
  22. restart-strategy.fixed-delay.delay: 10000
  23. state.backend: filesystem
  24. state.checkpoints.dir: hdfs://mycluster/flink-checkpoints
  25. state.savepoints.dir: hdfs://mycluster/flink-savepoints
  26. jobmanager.execution.failover-strategy: region
  27. # 另外的高可用节点上关于此行不用更改,保持一致即可。
  28. rest.port: 8081
  29. rest.address: r3
  30. jobmanager.archive.fs.dir: hdfs://mycluster/logs/flink-job
  31. historyserver.web.address: r3
  32. historyserver.web.port: 8082
  33. historyserver.archive.fs.dir: hdfs://mycluster/logs/flink-job
  34. historyserver.archive.fs.refresh-interval: 5000
2.1.1.2 master
r3:8081

s4:8081

2.1.1.3 workers
r3

s3

s4

2.2 Hadoop HA

r3s3s4
NameNode
NameNode
JournalNode
JournalNode
DataNode
DataNode
DataNode
ZooKeeper
ZooKeeper
ZooKeeper
ZKFC
ZKFC
NodeManager
NodeManager
NodeManager

2.2.1 配置文件

2.2.1.1 hdfs-site.xml
  1. <configuration>
  2. <!-- NameNode 数据存储目录 -->
  3. <property>
  4. <name>dfs.namenode.name.dir</name>
  5. <value>file://${hadoop.tmp.dir}/name</value>
  6. </property>
  7. <!-- DataNode 数据存储目录 -->
  8. <property>
  9. <name>dfs.datanode.data.dir</name>
  10. <value>file://${hadoop.tmp.dir}/data</value>
  11. </property>
  12. <!-- JournalNode 数据存储目录 -->
  13. <property>
  14. <name>dfs.journalnode.edits.dir</name>
  15. <value>${hadoop.tmp.dir}/jn</value>
  16. </property>
  17. <!-- 完全分布式集群名称 -->
  18. <property>
  19. <name>dfs.nameservices</name>
  20. <value>mycluster</value>
  21. </property>
  22. <!-- 集群中 NameNode 节点都有哪些 -->
  23. <property>
  24. <name>dfs.ha.namenodes.mycluster</name>
  25. <value>nn1,nn2</value>
  26. </property>
  27. <!-- NameNode 的 RPC 通信地址 -->
  28. <property>
  29. <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  30. <value>r3:8020</value>
  31. </property>
  32. <property>
  33. <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  34. <value>s4:8020</value>
  35. </property>
  36. <!-- NameNode 的 http 通信地址 -->
  37. <property>
  38. <name>dfs.namenode.http-address.mycluster.nn1</name>
  39. <value>r3:9870</value>
  40. </property>
  41. <property>
  42. <name>dfs.namenode.http-address.mycluster.nn2</name>
  43. <value>s4:9870</value>
  44. </property>
  45. <!-- 指定 NameNode 元数据在 JournalNode 上的存放位置 -->
  46. <property>
  47. <name>dfs.namenode.shared.edits.dir</name>
  48. <value>qjournal://r3:8485;s4:8485/mycluster</value>
  49. </property>
  50. <!-- 访问代理类:client 用于确定哪个 NameNode 为 Active -->
  51. <property>
  52. <name>dfs.client.failover.proxy.provider.mycluster</name>
  53. <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  54. </property>
  55. <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
  56. <property>
  57. <name>dfs.ha.fencing.methods</name>
  58. <value>
  59. sshfence
  60. shell(/bin/true)
  61. </value>
  62. </property>
  63. <!-- 使用隔离机制时需要 ssh 秘钥登录-->
  64. <property>
  65. <name>dfs.ha.fencing.ssh.private-key-files</name>
  66. <value>/home/web/.ssh/id_rsa</value>
  67. </property>
  68. <!-- 配置sshfence隔离机制超时时间 -->
  69. <property>
  70. <name>dfs.ha.fencing.ssh.connect-timeout</name>
  71. <value>30000</value>
  72. </property>
  73. <property>
  74. <name>ha.failover-controller.cli-check.rpc-timeout.ms</name>
  75. <value>60000</value>
  76. </property>
  77. <!-- 启用 nn 故障自动转移 -->
  78. <property>
  79. <name>dfs.ha.automatic-failover.enabled</name>
  80. <value>true</value>
  81. </property>
  82. </configuration>
2.2.1.2 core-site.xml
  1. <configuration>
  2. <!-- 把多个 NameNode 的地址组装成一个集群 mycluster -->
  3. <property>
  4. <name>fs.defaultFS</name>
  5. <value>hdfs://mycluster</value>
  6. </property>
  7. <!-- 指定 hadoop 运行时产生文件的存储目录 -->
  8. <property>
  9. <name>hadoop.tmp.dir</name>
  10. <value>/home/web/zhangxiang/ha/hadoop-3.4.0/data</value>
  11. </property>
  12. <!-- 指定 zkfc 要连接的 zkServer 地址 -->
  13. <property>
  14. <name>ha.zookeeper.quorum</name>
  15. <value>r3:2181,s3:2181,s4:2181</value>
  16. </property>
  17. <!-- 配置HDFS网页端使用的静态用户 -->
  18. <property>
  19. <name>hadoop.http.staticuser.user</name>
  20. <value>web</value>
  21. </property>
  22. </configuration>

2.2.1.3 yarn-site.xml
  1. <configuration>
  2. <property>
  3. <name>yarn.nodemanager.aux-services</name>
  4. <value>mapreduce_shuffle</value>
  5. </property>
  6. <!-- 启用 resourcemanager ha -->
  7. <property>
  8. <name>yarn.resourcemanager.ha.enabled</name>
  9. <value>true</value>
  10. </property>
  11. <!-- 声明 resourcemanager 的地址 -->
  12. <property>
  13. <name>yarn.resourcemanager.cluster-id</name>
  14. <value>cluster-yarn1</value>
  15. </property>
  16. <!--指定 resourcemanager 的逻辑列表-->
  17. <property>
  18. <name>yarn.resourcemanager.ha.rm-ids</name>
  19. <value>rm1,rm2</value>
  20. </property>
  21. <!-- ========== rm1 的配置 ========== -->
  22. <!-- 指定 rm1 的主机名 -->
  23. <property>
  24. <name>yarn.resourcemanager.hostname.rm1</name>
  25. <value>r3</value>
  26. </property>
  27. <!-- 指定 rm1 的 web 端地址 -->
  28. <property>
  29. <name>yarn.resourcemanager.webapp.address.rm1</name>
  30. <value>r3:8088</value>
  31. </property>
  32. <!-- 指定 rm1 的内部通信地址 -->
  33. <property>
  34. <name>yarn.resourcemanager.address.rm1</name>
  35. <value>r3:8032</value>
  36. </property>
  37. <!-- 指定 AM 向 rm1 申请资源的地址 -->
  38. <property>
  39. <name>yarn.resourcemanager.scheduler.address.rm1</name>
  40. <value>r3:8030</value>
  41. </property>
  42. <!-- 指定供 NM 连接的地址 -->
  43. <property>
  44. <name>yarn.resourcemanager.resource-tracker.address.rm1</name>
  45. <value>r3:8031</value>
  46. </property>
  47. <!-- ========== rm2 的配置 ========== -->
  48. <!-- 指定 rm2 的主机名 -->
  49. <property>
  50. <name>yarn.resourcemanager.hostname.rm2</name>
  51. <value>s4</value>
  52. </property>
  53. <property>
  54. <name>yarn.resourcemanager.webapp.address.rm2</name>
  55. <value>s4:8088</value>
  56. </property>
  57. <property>
  58. <name>yarn.resourcemanager.address.rm2</name>
  59. <value>s4:8032</value>
  60. </property>
  61. <property>
  62. <name>yarn.resourcemanager.scheduler.address.rm2</name>
  63. <value>s4:8030</value>
  64. </property>
  65. <property>
  66. <name>yarn.resourcemanager.resource-tracker.address.rm2</name>
  67. <value>s4:8031</value>
  68. </property>
  69. <!-- 指定 zookeeper 集群的地址 -->
  70. <property>
  71. <name>yarn.resourcemanager.zk-address</name>
  72. <value>r3:2181,s3:2181,s4:2181</value>
  73. </property>
  74. <!-- 启用自动恢复 -->
  75. <property>
  76. <name>yarn.resourcemanager.recovery.enabled</name>
  77. <value>true</value>
  78. </property>
  79. <!-- 指定 resourcemanager 的状态信息存储在 zookeeper 集群 -->
  80. <property>
  81. <name>yarn.resourcemanager.store.class</name>
  82. <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
  83. </property>
  84. <!-- 环境变量的继承 -->
  85. <property>
  86. <name>yarn.nodemanager.env-whitelist</name>
  87. <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
  88. </property>
  89. </configuration>

2.3 Zookeeper 

r3s3s4
ZooKeeperZooKeeperZooKeeper

2.3.1 配置文件

2.3.1.1 zoo.cfg

        首先在安装目录 /home/web/zhangxiang/zookeeper-3.8.4/ 下分别创建:zkdata、logs 这两个目录,然后在 conf 目录下使用如下命令:

  1. cp zoo_sample.cfg zoo.cfg 
  2. vim zoo.cfg

        在 zoo.cfg 文件中修改以下内容即可  

  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/home/web/zhangxiang/zookeeper-3.8.4/zkdata
  5. clientPort=2181
  6. dataLogDir=/home/web/zhangxiang/zookeeper-3.8.4/logs
  7. server.1=r3:2888:3888
  8. server.2=s3:2888:3888
  9. server.3=s4:2888:3888

        然后在 zkdata 中创建 myid 文件,并在其中增加 ID , ID 即 server.x 中的 x ,这个是机器的唯一标识,具体内容如下:

        

切记不同服务器节点需要按照上述中 server.x 以及 IP 对应关系设置

三、启动集群

3.1 启动zookeeper集群

必须先启动 zookeeper 集群否则会报错,一台台启动太麻烦,创建批量脚本 zk.sh :

  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4. for i in r3 s3 s4
  5. do
  6. echo ---------- zookeeper $i 启动 ------------
  7. ssh $i "/home/web/zhangxiang/zookeeper-3.8.4/bin/zkServer.sh start"
  8. done
  9. };;
  10. "stop"){
  11. for i in r3 s3 s4
  12. do
  13. echo ---------- zookeeper $i 停止 ------------
  14. ssh $i "/home/web/zhangxiang/zookeeper-3.8.4/bin/zkServer.sh stop"
  15. done
  16. };;
  17. "status"){
  18. for i in r3 s3 s4
  19. do
  20. echo ---------- zookeeper $i 状态 ------------
  21. ssh $i "/home/web/zhangxiang/zookeeper-3.8.4/bin/zkServer.sh status"
  22. done
  23. };;
  24. esac

然后执行 chmod +x zk.sh 赋予其可执行权限,接下来执行以下命令就可以对集群进行 启动/停止/查看状态 操作了。

./zk.sh start/stop/status

启动完成后,想要使用 jps 查看每台服务器上的进程,也可以创建批量执行脚本 jpsall.sh,然后赋予脚本可执行权限即可:

  1. #! /bin/bash
  2. for host in r3 s3 s4
  3. do
  4. echo ----------$host----------
  5. ssh $host jps
  6. done

QuorumPeerMain 代表 zookeeper 集群在3台服务器上已经启动了。

3.2 启动Hadoop集群

执行 start-dfs.sh 启动Hadoop 集群。

3.3 启动 Flink 集群

执行 start-cluster.sh 启动 flink 集群

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/796351
推荐阅读
相关标签
  

闽ICP备14008679号