赞
踩
三台centos7系统的机器,虚拟机或者其他方式自行选择。我这准备了三台虚拟机,IP分别为192.168.169.140、192.168.169.141、192.168.169.142
kafka集群依赖zookeeper进行管理,zookeeper依赖java环境,三者的软件包可以在通过以下链接下载,也可自行官网上下载。
https://pan.baidu.com/s/1lZB3ONRi4a9S8S3JdsuVtw,提取码:szum
PS:kafka2建议使用的jdk1.8的中161及以上版本,否则会有kafka启动失败的情况,个人踩的坑。。。
centos下安装jdk这里就不在赘述了,有问题的可以在网上自行搜索。
1、在/opt/zookeeper目录下(自己新建)上传zookeeper安装包至三台虚拟机,解压tar -zxvf zookeeper-3.4.14.tar.gz
2、进入zookeeper的conf目录,拷贝一份zoo_sample.cfg命名为zoo.cfg
3、修改zoo.cfg的配置文件
提前创建好目录文件:
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
三台服务器的配置:2888服务器之间通信端口,3888服务器选举端口,clientPort=2181对client端提供服务端口。
server.1=192.168.169.140:2888:3888
server.2=192.168.169.141:2888:3888
server.3=192.168.169.142:2888:3888
配置好后退出保存文件。
4、在指定的data文件里需要创建文件myid
在myid里面写入值为1,这里的1对应服务器192.168.169.140。同样的操作在141、142两台机器上也操作一遍,区别是myid里面的内容与各自的服务相对应。
5、分别启动三台机器上面的zookeeper服务。
启动命令:/opt/zookeeper/zookeeper-3.4.14/bin/zkServer.sh start
检查命名:/opt/zookeeper/zookeeper-3.4.14/bin/zkServer.sh status
一台leader两台follower,启动成功。
1、在/opt/kafka目录下(自己新建)上传kafka安装包至三台虚拟机,解压tar -zxvf kafka_2.12-2.1.1.tgz
2、进入config文件下,编辑server.properties文件
修改以下参数
broker.id为集群节点唯一标识号
内网之间访问
listeners=PLAINTEXT://192.168.169.141:9092
外网也可以进行访问
dvertised.listeners=PLAINTEXT://192.168.169.141:9092
对客户端提供的端口号
port=9092
机器主机名
host.name=kafka02
日志存储路径自己定义
注册到zookeeper
修改其他两台机器,注意broker.id不能重复
3、启动kafka,并进行测试
分别后台启动三台机器:
/opt/kafka/kafka_2.12-2.1.1/bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-2.1.1/config/server.properties
检查一下jps
登录zookeeper的客户端查看所以节点
kafka集群就启动成功了
1、启动命令
bin/kafka-server-start.sh -daemon config/server.properties
2、查看哪些主题:
bin/kafka-topics.sh --list --zookeeper 192.168.169.140:2181
3、创建主题topic
bin/kafka-topics.sh --create --zookeeper 192.168.169.140:2181 --replication-factor 3 --partitions 3 --topic test
4、查看主题topic
bin/kafka-topics.sh -zookeeper 192.168.169.140:2181 -describe -topic test
5、修改主题topic
bin/kafka-topics.sh --zookeeper 192.168.169.140:2181 --alter --partitions 5 --topic test
6、删除主题topic
bin/kafka-topics.sh --zookeeper 192.168.169.140:2181 --delete --topic test
7、启动生产者测试
bin/kafka-console-producer.sh --broker-list 192.168.169.140:9092 --topic test
8、启动消费者测试
bin/kafka-console-consumer.sh --bootstrap-server 192.168.169.140:9092 --topic test --from-beginning
9、查看消费者群组消费消息的情况
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.169.140:9092 --describe --group test-group
10、查看有哪些消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.169.140:9092 –list
11、查看某主题数据信息
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.169.140:9092 --topic test
12、针对主题进行单独调整配置项,修改后立即生效无需重启服务器
例:将test主题调整为保留5小时内的数据
bin/kafka-configs.sh --zookeeper 192.168.169.140:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=18000000
查看test主题的自定义配置项:
bin/kafka-configs.sh --zookeeper 192.168.169.140:2181 --entity-type topics --entity-name test --describe
删除test主题的自定义配置项:
bin/kafka-configs.sh --zookeeper 192.168.169.140:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms
13、服务器端重置消费者偏移量
查看消费者组test-group情况
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.169.140:9092 --group test-group --describe
指定消费者组test-group中test主题的0分区偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.169.140:9092 --group test-group --reset-offsets --topic test:0 --to-offset 4800 –execute
消费者组test-group中所有偏移量从最末尾开始
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.169.140:9092 --group test-group --reset-offsets --all-topics --to-latest –execute
14、内部主题:__transaction_state和__consumer_offsets清除策略不会使用全局服务器的配置,默认为压缩不删除策略,需要修改执行语句如下:
bin/kafka-configs.sh --zookeeper 192.168.169.140:2181 --entity-type topics --entity-name __consumer_offsets --alter --add-config cleanup.policy=delete
bin/kafka-configs.sh --zookeeper 192.168.169.140:2181 --entity-type topics --entity-name __transaction_state --alter --add-config cleanup.policy=delete
kafka集群搭建先就到这了,下一篇说一下springboot工程集成调用kafka集群以及优化的问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。