当前位置:   article > 正文

CentOS7安装kafka3.4.0集群_centos kafka

centos kafka

 参考文章: 

Kafka集群部署

  • Kafka 2.8.0版本: 实现了 Raft 分布式一致性机制, 弃用zookeeper,
  • Zookeeper-less Kafka 还属于早期版本, 并不完善, 所以, 现在不要应用在线上产品环境中。
  • 1.7版本的jdk只能支持kafka 2.0.0之前的版本;    
  • 早期版本(3.x以下): Kafka支持java8, 11和15(即将为16);
  • Kafka3.x版本: 弃用java8, 但依然可用, 官方建议更新至java11, 未来将支持jdk11, jdk16, jdk16, jdk17(非LTS版本);
  • 展望Kafka4.0:完全放弃java8 

一、下载kafka

下载地址:Apache Kafka

https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz

不要下载源码,下载编译过的

二、安装kafka集群

2.1 前提条件

1、部署Kafka集群搭建需要服务器至少3台,奇数台

2、Kafka的安装需要java环境,jdk1.8

3、Kafka安装包版本:kafka_2.12-3.4.0.tgz

4、假设3台服务器分别为:kafka1、kafka2、kafka3

kafka服务器名

IP

域名

kafka1

192.168.172.134

kafka1.sd.cn

kafka2

192.168.172.141

kafka2.sd.cn

kafka3

192.168.172.142

kafka3.sd.cn

5、增加host配置:

2.2 基于Zookeeper集群搭建

直接使用kafka自带的zookeeper建立zk集群

1、将安装包kafka_2.12-3.4.0.tgz上传到/opt 目录下

2、解压:tar -xf kafka_2.12-3.4.0.tgz

3、进入目录:cd /opt/kafka_2.12-3.4.0/

4、创建zookeeper目录:mkdir zk_kfk_data

5、进入目录:cd /opt/kafka_2.12-3.4.0/config

6、修改zookeeper.properties文件:

三台机器上的zookeeper.properties文件配置相同,data.Dir 为zk的数据目录,server.1、server.2、server.3 为集群信息。

2888端口号是zookeeper服务之间通信的端口

3888端口是zookeeper与其他应用程序通信的端口。

tickTime:CS通信心跳数

Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

      tickTime以毫秒为单位。

      tickTime:该参数用来定义心跳的间隔时间,zookeeper的客户端和服务端之间也有和web开发里类似的session的概念,而zookeeper里最小的session过期时间就是tickTime的两倍。

initLimit:LF初始通信时限

集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)

syncLimit:LF同步通信时限

集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量)

7、创建myid文件:

进入/opt/kafka_2.12-3.4.0/zk_kfk_data目录,创建myid文件,将三台服务器上的myid文件分别写入1,2,3。myid是zookeeper集群用来发现彼此的标识,必须创建,且不能相同。

8、执行启动zookeeper命令:

nohup /opt/kafka_2.12-3.4.0/bin/zookeeper-server-start.sh /opt/kafka_2.12-3.4.0/config/zookeeper.properties &>> /opt/kafka_2.12-3.4.0/zookeeper.log &

三台机器都执行启动命令,查看zookeeper的日志文件,没有报错就说明zookeeper集群启动成功了。

2.3 Kafka集群搭建(摒弃Zookeeper)

2.3.1 修改配置文件

1、创建kafka日志数据目录:mkdir /data/logs/kafka -p
2、进入目录:cd /opt/kafka_2.12-3.4.0/config/kraft/
3、修改server.properties配置文件:

修改参数如下:

参数名称

参数值

备注

broker.id

0

broker.id的值三个节点要配置不同的值,分别配置为0,1,2

advertised.host.name

kafka1.sd.cn

在hosts文件配置kafka1域名,另外两台分别为:kafka2.sd.cn,kafka3.sd.cn

advertised.port

9092

默认端口,不需要改

log.dirs

/opt/kafka_2.11-0.10.0.1/kafka-logs-1

Kafka日志数据目录

num.partitions

40

分区数,根据自行修改

log.retention.hours

24

日志保存时间

zookeeper.connect

kafka1.sd.cn:3181,kafka2.sd.cn:3181,kafka3.sd.cn:3181

zookeeper连接地址,多个以逗号隔开

2.3.2 初始化部署集群

1、初始化集群

  1. # 1. 生成本次安装的集群统一UUID号:
  2. $. /opt/kafka_2.12-3.4.0/bin/kafka-storage.sh random-uuid
  3. q9ztDs1fQ-OsoBYlYoJykw
  4. # 2. 根据集群id,初始化元数据(各个节点均执行):
  5. $. /opt/kafka_2.12-3.4.0/bin/kafka-storage.sh format \
  6. -t q9ztDs1fQ-OsoBYlYoJykw \
  7. -c /opt/kafka_2.12-3.4.0/config/kraft/server.properties
  8. # use
  9. bin/kafka-storage.sh format \
  10. -t x4-gd5zeRRS-g-c_yH6q3A \
  11. -c /opt/kafka_2.12-3.4.0/config/kraft/server.properties --ignore-formatted

注意:集群中的每台机器上均执行相同操作,如果集群ip地址发生变化,也要重新执行初始化集群的操作,否则报错

  1. [2022-05-01 17:16:57,982] ERROR [RaftManager nodeId=1] Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=38, data=VoteResponseData(errorCode=104, topics=[]), sourceId=2) (org.apache.kafka.raft.KafkaRaftClient)
  2. [2022-05-01 17:16:58,008] ERROR [RaftManager nodeId=1] Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=39, data=VoteResponseData(errorCode=104, topics=[]), sourceId=2) (org.apache.kafka.raft.KafkaRaftClient)
  3. [2022-05-01 17:16:58,036] ERROR [RaftManager nodeId=1] Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=40, data=VoteResponseData(errorCode=104, topics=[]), sourceId=2) (org.apache.kafka.raft.KafkaRaftClient)
  4. [2022-05-01 17:16:58,064] ERROR [RaftManager nodeId=1] Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=41, data=VoteResponseData(errorCode=104, topics=[]), sourceId=2) (org.apache.kafka.raft.KafkaRaftClient)
  5. [2022-05-01 17:16:58,087] ERROR [RaftManager nodeId=1] Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=42, data=VoteResponseData(errorCode=104, topics=[]), sourceId=2) (org.apache.kafka.raft.KafkaRaftClient)
  6. [2022-05-01 17:16:58,111] ERROR [RaftManager nodeId=1] Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=43, data=VoteResponseData(errorCode=104, topics=[]), sourceId=2) (org.apache.kafka.raft.KafkaRaftClient)
  7. [2022-05-01 17:16:58,134] ERROR [RaftManager nodeId=1] Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=44, data=VoteResponseData(errorCode=104, topics=[]), sourceId=2) (org.apache.kafka.raft.KafkaRaftClient)
  8. [2022-05-01 17:16:58,162] ERROR [RaftManager nodeId=1] Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=45, data=VoteResponseData(errorCode=104, topics=[]), sourceId=2) (org.apache.kafka.raft.KafkaRaftClient)

2、 启动kafka集群:

  1. $. /opt/kafka_2.12-3.4.0/bin/kafka-server-start.sh \
  2. -daemon \
  3. /opt/kafka_2.12-3.4.0/config/server.properties &>> \
  4. /opt/kafka_2.12-3.4.0/kafka.log &
  5. # 参考
  6. ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties &

三个节点均要启动;启动无报错,即搭建成功。

Kafka集群节点服务管理命令:

  1. # 启动kafka集群
  2. /opt/kafka_2.12-3.4.0/bin/kafka-server-start.sh \
  3. /opt/kafka_2.12-3.4.0/config/kraft/server.properties
  4. # 停止运行
  5. /opt/kafka_2.12-3.4.0/bin/kafka-server-stop.sh \
  6. /opt/kafka_2.12-3.4.0/config/kraft/server.properties
  7. # 后台运行
  8. cd /opt/kafka_2.12-3.4.0
  9. .bin/kafka-server-start.sh -daemon ./config/kraft/server.properties &

2.4 测试Kafka集群

2.4.1 创建topic:test

/opt/kafka_2.12-3.4.0/bin/kafktopics.sh --create --zookeeper kafka1.sd.cn:3181,kafka2.sd.cn:3181,kafka3.sd.cn:3181 --replication-factor 1 --partitions 1 --topic test

2.4.2 列出已创建的topic列表

/opt/kafka_2.12-3.4.0/bin/kafka-topics.sh --list --zookeeper localhost:3181

2.4.3 模拟客户端去发送消息

/opt/kafka_2.12-3.4.0/bin/kafka-console-producer.sh --broker-list kafka1.sd.cn:9092,kafka2.sd.cn:9092,kafka3.sd.cn:9092 --topic test

2.4.4 模拟客户端去接受消息

/opt/kafka_2.12-3.4.0/bin/kafka-console-consumer.sh --zookeeper kafka1.sd.cn:3181,kafka2.sd.cn:3181,kafka3.sd.cn:3181 --from-beginning --topic test

三、Kafka的一些基本命令

3.1 基于zookeeper的命令

  1. #启动zk
  2. bin/zookeeper-server-start.sh  config/zookeeper.properties
  3. #启动kafka
  4. bin/kafka-server-start.sh config/server.properties
  5. #创建tocic
  6. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  7. #查询topic列表
  8. bin/kafka-topics.sh --list --zookeeper localhost:2181
  9. #启动 Producer
  10. bin/kafka-console-producer.sh --broker-list 192.168.100.129:9092 --topic test
  11. #启动 Consumer
  12. bin/kafka-console-consumer.sh --bootstrap-server 192.168.100.129:9092 --topic test --from-beginning
  13. #topic的信息
  14. bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

 3.2 无Zookeeper的Kafka集群

  1. # 创建topic
  2. bin/kafka-topics.sh \
  3. --create \
  4. --topic quickstart-events \
  5. --bootstrap-server 110.112.0.134:9092,110.112.0.135:9092,110.112.0.136:9092
  6. # 查看topic列表
  7. bin/kafka-topics.sh \
  8. --list \
  9. --bootstrap-server 110.112.0.134:9092,110.112.0.135:9092,110.112.0.136:9092
  10. # 查看消息详情
  11. bin/kafka-topics.sh \
  12. --describe \
  13. --topic quickstart-events \
  14. --bootstrap-server 110.112.0.134:9092,110.112.0.135:9092,110.112.0.136:9092
  15. # 生产消息
  16. bin/kafka-console-producer.sh \
  17. --topic quickstart-events \
  18. --bootstrap-server 110.112.0.134:9092,110.112.0.135:9092,110.112.0.136:9092
  19. # 消费消息
  20. bin/kafka-console-consumer.sh \
  21. --topic quickstart-events \
  22. --from-beginning \
  23. --bootstrap-server 110.112.0.134:9092,110.112.0.135:9092,110.112.0.136:9092
  24. # 查看消费者组
  25. bin/kafka-consumer-groups.sh \
  26. --bootstrap-server localhost:9092 \
  27. --describe \
  28. --group my-group
  29. # 查看消费者组列表
  30. bin/kafka-consumer-groups.sh \
  31. --bootstrap-server 110.112.0.134:9092,110.112.0.135:9092,110.112.0.136:9092 \
  32. --list

四、安装Kafka单机(测试使用)

将kafka上传到 Linux里面并解压,kafka需要JDK的环境。

  1. #解压kafka
  2. tar -xf kafka_2.12-3.4.0.tgz
  3. #重命名
  4. mv kafka_2.12-3.4.0 kafka

配置config下面的server.properties文件

  1. broker.id=1    #改为1
  2. 增加ip映射,如果不增加就是locahost。这样没办法使用java连接
  3. host.name=192.168.100.129

  1. # 启动kafka集群
  2. /opt/kafka_2.12-3.4.0/bin/kafka-server-start.sh \
  3. /opt/kafka_2.12-3.4.0/config/kraft/server.properties
  4. # 停止运行
  5. /opt/kafka_2.12-3.4.0/bin/kafka-server-stop.sh \
  6. /opt/kafka_2.12-3.4.0/config/kraft/server.properties
  7. # 后台运行
  8. cd /opt/kafka_2.12-3.4.0
  9. .bin/kafka-server-start.sh -daemon ./config/kraft/server.properties &

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/564750
推荐阅读
相关标签
  

闽ICP备14008679号