赞
踩
推荐链接:
总结——》【Java】
总结——》【Mysql】
总结——》【Redis】
总结——》【Kafka】
总结——》【Spring】
总结——》【SpringBoot】
总结——》【MyBatis、MyBatis-Plus】
KAFKA_HOME=/opt/app/install/kafka
参数 | 描述 |
---|---|
--bootstrap-server | kafka服务地址 |
--list | 查看消费者组列表 |
--group | 指定消费者组 |
--all-groups | 所有消费者组 |
--describe | 查看消费者组详情 |
--state | 查看消费者组状态 |
--members | 查看消费者组成员 |
--delete | 删除消费者组 |
--reset-offsets | 重置消费组的偏移量 |
--delete-offsets | 删除消费组的偏移量 |
--dry-run | 预先执行重置偏移量 |
--excute | 真正执行重置偏移量 |
--to-earliest | 将offset重置到最早 |
--to-latest | 将offset重置到最近 |
# 查看消费者组列表
bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.x:9092 --list
# 查看所有消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups
# 查看指定消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1
结果列 | GROUP | TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
---|---|---|---|---|---|---|---|---|---|
描述 | 消费组 | 主题 | 分区编号 | 当前offset | 最新offset | 消息滞后(未消费)数量 | 消费者ID | 主机 | 客户端ID |
# 查看所有消费者组状态
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups --state
# 查看指定消费者组状态
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1 --state
结果列 | GROUP | COORDINATOR (ID) | ASSIGNMENT-STRATEGY | STATE | #MEMBERS |
---|---|---|---|---|---|
描述 | 消费组 | 协调者ID | 分配策略 | 状态 - Stable:有消费者成员 - Empty:没有消费者成员 - Dead - PreparingRebalance - CompletingRebalance | 成员数量 |
# 查看所有消费者组成员
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups --members
# 查看指定消费者组成员
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1 --members
结果列 | GROUP | CONSUMER-ID | HOST | CLIENT-ID | #PARTITION |
---|---|---|---|---|---|
描述 | 消费组 | 消费者ID | 主机 | 客户端ID | 分区 |
# 删除所有消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete --all-groups
# 删除指定消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete --group listenerForSyncEsfCommunity1
# 只有这个消费组的所有客户端都停止消费/不在线才能够成功删除,否则会报下面异常
Error: Deletion of some consumer groups failed:
* Group 'listenerForSyncEsfCommunity1' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
重置Offset的模式 | 描述 |
---|---|
--to-earliest | 重置到最早offset |
--to-latest | 重置到最近offset |
--to-current | 重置到当前offset |
--to-datetime | 重置到指定时间offset 格式:YYYY-MM-DDTHH:mm:SS.sss 示例:2021-6-26T00:00:00.000 |
--to-offset | 重置到指定offset(多个分区都重置,一般不用这个) |
--shift-by | 按照偏移量增加或者减少offset - 正数:往前增加 - 负数:往后减少 |
--from-file | 根据CVS文档来重置 |
--dry-run | 预先执行重置偏移量 |
--excute | 真正执行重置偏移量 |
# 重置指定消费组的所有Topic的偏移量 bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --group listenerForSyncEsfCommunity1 --all-topic --dry-run # 重置指定消费组的指定Topic的偏移量 bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --group listenerForSyncEsfCommunity1 --topic test_topic --dry-run # 重置所有消费组的所有Topic的偏移量 bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --all-group --all-topic --dry-run # 重置所有消费组的指定Topic的偏移量 bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --all-group --topic test_topic --dry-run # 只有这个消费组不可用状态才能重置成功,否则会报下面异常: Error: Assignments can only be reset if the group 'listenerForSyncEsfCommunity1' is inactive, but the current state is Stable. TOPIC PARTITION NEW-OFFSET
# 删除指定消费组的指定Topic的偏移量(下一次从头消费)
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete-offsets --group listenerForSyncEsfCommunity1 --topic test_topic
# 只有这个消费组不可用状态才能删除成功,否则会报异常
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。