赞
踩
一、登录Kafka,进入Kafka的bin目录下
--bootstrap-server:Kafka监听的IP和端口
--topic:指定的topic
--group:指定的消费组
--describe:描述
二、获取命令帮助
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list --help
三、查询某个kafka的消费者信息
./kafka-consumer-groups.sh --bootstrap-server 10.0.0.198:9092 --list
四、查询某个topic的分区情况
./kafka-topics.sh --zookeeper 10.0.0.198:2181 --topic alikafka-topic_can_information_statistics --describe
五、获取某个topic的某个分区是由哪个IP消费
./kafka-consumer-groups.sh --bootstrap-server 10.0.0.198:9092 --describe --group CID_VehicleAlarmGroup
六、查看某个消费者对某个topic的消费情况
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group ** --topic ** --zookeeper 10.0.0.198:2181
group 表示group名称;
topic:创建时topic名称;
partition:分区编号;
offset:表示该parition已经消费了多少条message;
logSize:表示该partition已经写了多少条message;
Lag:表示有多少条message没有被消费;
Owner:表示消费者;
Created:该partition创建时间;
Last Seen:消费状态刷新最新时间。
七、设置Kafka偏移量:
- # --to-earliest: 将消费者的偏移量重置为最早的偏移量,即分区中的第一条消息。
- # --to-latest: 将消费者的偏移量重置为最新的偏移量,即分区中的最后一条消息。
- # --to-current: 将消费者的偏移量重置为当前消费的偏移量,通常用于查看当前消费者的偏移量位置,而不是用于重置。
- # --to-offset <offset>: 将消费者的偏移量重置为指定的偏移量。你需要提供具体的偏移量值。
- # --shift-by N: 将消费者的偏移量向前或向后移动N个位置。N为正数时向前移动,N为负数时向后移动。
- # --to-datetime <datetime>: 将消费者的偏移量重置为大于或等于指定日期时间的最早偏移量。你需要提供符合yyyy-MM-ddTHH:mm:ss.xxx格式的日期时间。
- # --by-duration <duration>: 将消费者的偏移量重置为当前时间减去指定时长的位置。例如,P1D表示一天前,P2H表示两小时前。
- # --from-file <file>: 从CSV文件中读取重置策略。CSV文件应包含消费者组、主题和分区信息,以及相应的偏移量或时间戳。
-
- ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test.group --reset-offsets --topic topic.test --to-earliest --execute
-
- ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test.group --reset-offsets --all-topics --to-earliest --execute
在Kafka中,消费者组的状态可以是以下几种之一,只有Inactive
才可以重置消费点位
:
Stable
:消费者组正在正常消费,没有任何重平衡操作正在进行。Rebalancing
:消费者组正在进行重平衡操作,此时不能执行偏移量重置。Joining
:新的消费者正在加入消费者组。PreparingRebalance
:消费者组即将进行重平衡。Syncing
:消费者组正在进行同步操作,通常发生在重平衡之后。
Assignments can only be reset if the group 'xxxx' is inactive, but the current state is Stable
查询状态
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list --state
需要停止消费服务,如需要重置某个Group(test.group),test.group消费是在test-server中,可以先关闭test-server服务,可以杀进程,用来关闭当前Group的服务的消费进程,不能关闭Kafka,要不然不能设置Offset,之后从新设置offset。
停止服务可以通过设置offset指定消费位置,还有一种方式,直接删除group,这样会直接清理group,也就没有offset了,也能达到重置消费点位的效果
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group test.group --execute
八、可以下载一个kafka工具如kafka-tool
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。