kafka自带了很多工具类,在源码kafka.tools里可以看到:
这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,
$ kfka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect=192.168.199.129:2181,192.168.199.130:2181,192.168.199.131:2181 --group=group-1
执行结果如下:列出了所有消费者组的所有信息,包括Group(消费者组)、Topic、Pid(分区id)、Offset(当前已消费的条数)、LogSize(总条数)、Lag(未消费的条数)、Owner
细看kafka-run-class.sh脚本,它是调用 了ConsumerOffsetChecker的main方法,所以,我们也可以通过java代码来访问scala的ConsumerOffsetChecker类,代码如下:
- package com.wxj.kafka.monitor.jmx;
-
- import kafka.tools.ConsumerOffsetChecker;
-
- /**
- * kafka自带很多工具类,其中ConsumerOffsetChecker能查看到消费者消费的情况,
- * 只可惜,ConsumerOffsetChecker只是将信息打印到标准的输出流中
- * @author root
- *
- */
- public class RunClass
- {
-
- public static void main(String[] args)
- {
- //group-1是消费者的group名称,可以在zk中
- String[] arr = new String[]{"--zkconnect=192.168.199.129:2181,192.168.199.130:2181,192.168.199.131:2181","--group=group-1"};
- ConsumerOffsetChecker.main(arr);
-
- }
-
- }
跟通过kafa-run-class.sh执行的结果是一样一样的