当前位置:   article > 正文

查询kafka信息,并设置Kafka的偏移量Offset_kafka 查下当下各分区offset

kafka 查下当下各分区offset

一、登录Kafka,进入Kafka的bin目录下

        --bootstrap-server:Kafka监听的IP和端口

        --topic:指定的topic

        --group:指定的消费组

        --describe:描述

二、获取命令帮助

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list --help

三、查询某个kafka的消费者信息

./kafka-consumer-groups.sh --bootstrap-server 10.0.0.198:9092 --list 

四、查询某个topic的分区情况

./kafka-topics.sh --zookeeper 10.0.0.198:2181 --topic alikafka-topic_can_information_statistics --describe

五、获取某个topic的某个分区是由哪个IP消费

./kafka-consumer-groups.sh --bootstrap-server 10.0.0.198:9092 --describe --group CID_VehicleAlarmGroup


六、查看某个消费者对某个topic的消费情况

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group **  --topic **  --zookeeper 10.0.0.198:2181


group 表示group名称;

topic:创建时topic名称;

partition:分区编号;

offset:表示该parition已经消费了多少条message;

logSize:表示该partition已经写了多少条message;

Lag:表示有多少条message没有被消费;

Owner:表示消费者;

Created:该partition创建时间;

Last Seen:消费状态刷新最新时间。

 七、设置Kafka偏移量:

  1. # --to-earliest: 将消费者的偏移量重置为最早的偏移量,即分区中的第一条消息。
  2. # --to-latest: 将消费者的偏移量重置为最新的偏移量,即分区中的最后一条消息。
  3. # --to-current: 将消费者的偏移量重置为当前消费的偏移量,通常用于查看当前消费者的偏移量位置,而不是用于重置。
  4. # --to-offset <offset>: 将消费者的偏移量重置为指定的偏移量。你需要提供具体的偏移量值。
  5. # --shift-by N: 将消费者的偏移量向前或向后移动N个位置。N为正数时向前移动,N为负数时向后移动。
  6. # --to-datetime <datetime>: 将消费者的偏移量重置为大于或等于指定日期时间的最早偏移量。你需要提供符合yyyy-MM-ddTHH:mm:ss.xxx格式的日期时间。
  7. # --by-duration <duration>: 将消费者的偏移量重置为当前时间减去指定时长的位置。例如,P1D表示一天前,P2H表示两小时前。
  8. # --from-file <file>: 从CSV文件中读取重置策略。CSV文件应包含消费者组、主题和分区信息,以及相应的偏移量或时间戳。
  9. ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test.group --reset-offsets --topic topic.test --to-earliest --execute
  10. ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test.group --reset-offsets --all-topics --to-earliest --execute

在Kafka中,消费者组的状态可以是以下几种之一,只有Inactive才可以重置消费点位

  • Stable:消费者组正在正常消费,没有任何重平衡操作正在进行。
  • Rebalancing:消费者组正在进行重平衡操作,此时不能执行偏移量重置。
  • Joining:新的消费者正在加入消费者组。
  • PreparingRebalance:消费者组即将进行重平衡。
  • Syncing:消费者组正在进行同步操作,通常发生在重平衡之后。

Assignments can only be reset if the group 'xxxx' is inactive, but the current state is Stable  

查询状态 

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list --state

需要停止消费服务,如需要重置某个Group(test.group),test.group消费是在test-server中,可以先关闭test-server服务,可以杀进程,用来关闭当前Group的服务的消费进程,不能关闭Kafka,要不然不能设置Offset,之后从新设置offset。

停止服务可以通过设置offset指定消费位置,还有一种方式,直接删除group,这样会直接清理group,也就没有offset了,也能达到重置消费点位的效果

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group test.group --execute

八、可以下载一个kafka工具如kafka-tool

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/907569
推荐阅读
相关标签
  

闽ICP备14008679号