赞
踩
# 部署教程参考 # 官方教程: https://kafka.apache.org/quickstart # 单机部署kafka参考: https://blog.csdn.net/u013416034/article/details/123875299 # 集群部署kafka参考: # https://blog.csdn.net/zhangzjx/article/details/123679453 # https://www.cnblogs.com/Andrew-Zhou/p/15366574.html
1-服务启动
- # 服务启动
- ./bin/kafka-server-stop.sh
- ./bin/zookeeper-server-stop.sh
-
- ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
- ./bin/kafka-server-start.sh ./config/server.properties
2-主题topic操作
- ./bin/kafka-topics.sh --list --bootstrap-server ip:9002 #topic列表
- ./bin/kafka-topics.sh --describe --bootstrap-server ip:9002 --topic topic_name #查看某topic信息
- ./bin/kafka-topics.sh --create --bootstrap-server ip:9002 --replication-factor 1 --partition 1 --topic topic_name #创建某topic
- ./bin/kafka-topics.sh --delete --bootstrap-server ip:9002 --topic topic_name #删除某topic
- ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ip:9092 --topic topic_name --time-1 #查看消息数量
- # 查看某个分区下的消息总量
- ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ip:9092 --topic topic_name --partition 0
3-消费者组操作
- ./bin/kafka-consumer-groups.sh --list --bootstrap-server ip:9092 #所有消费者组列表
- ./bin/kafka-consumer-groups.sh --describe --state --all-groups --bootstrap-server ip:9092 #所有消费者组状态
- ./bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server ip:9092 #所有消费者组成员
- ./bin/kafka-consumer-groups.sh --describe --state --group group_name --bootstrap-server ip:9092 #指定消费者组状态
- ./bin/kafka-consumer-groups.sh --describe --group group_name --bootstrap-server ip:9092 #指定消费者组详情
- ./bin/kafka-consumer-groups.sh --delete --group group_name --bootstrap-server ip:9092 #删除指定消费者组
- ./bin/kafka-consumer-groups.sh --execute --reset-offsets --to-latest --group group_name --bootstrap-server ip:9092 #创建消费者组
- # 查看某个用户组的消息积压情况
- ./bin/kafka-consumer-groups.sh --describe --bootstrap-server ip:9092 --group group_name #所有消费者组状态
- # 注意:LAG为积压的消息数量,Current-Offset、Log-Offset分别为当前offset以及总的Offset
4-Console操作
- ./bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic topic_name --from-beginning #打印指定topic的内容
- ./bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic topic_name --from-beginning | grep "**" #打印指定topic的内容
- ./bin/kafka-console-producer.sh --bootstrap-server ip:9092 --topic topic_name #生产消息
5-Configs操作
- ./bin/kafka-configs.sh --bootstrap-server ip:9092 --describe --entity-type topics --entity-type topic_name #查看topic数据清理策略
- ./bin/kafka-configs.sh --bootstrap-server ip:9092 --entity-type topics --entity-type topic_name --alter --add-config retention.ms=1000 #修改topic数据清理策略
- # 注意:修改保留时间为1000ms,但不是修改后1000ms就马上删除,kafka采用轮训的方式,轮训到该topic发现是1000ms前的数据就删掉
- ./bin/kafka-configs.sh --bootstrap-server ip:9092 --entity-type topics --entity-type topic_name --alter --config cleanup.policy=delete #修改topic数据清理策略为立即删除
- ./bin/kafka-configs.sh --bootstrap-server ip:9092 --entity-type topics --entity-type topic_name --alter --add-config partitions=3
6-【不同服务器间的消费配置】
listeners=PLAINTEXT://IP:9092 # vim config/server.properties
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。