当前位置:   article > 正文

kafka java 查询信息_Kafka Java API获取非compacted topic总消息数

java kafka 当前消息数量

目前Kafka并没有提供直接的工具来帮助我们获取某个topic的当前总消息数,需要我们自行写程序来实现。下列代码可以实现这一功能,特此记录一下:

/**

* 获取某个topic的当前消息数

* Java 8+ only

*

* @param topic

* @param brokerList

* @return

*/

public static long totalMessageCount(String topic, String brokerList) {

Properties props = new Properties();

props.put("bootstrap.servers", brokerList);

props.put("group.id", "test-group");

props.put("enable.auto.commit", "false");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {

List tps = Optional.ofNullable(consumer.partitionsFor(topic))

.orElse(Collections.emptyList())

.stream()

.map(info -> new TopicPartition(info.topic(), info.partition()))

.collect(Collectors.toList());

Map beginOffsets = consumer.beginningOffsets(tps);

Map endOffsets = consumer.endOffsets(tps);

return tps.stream().mapToLong(tp -> endOffsets.get(tp) - beginOffsets.get(tp)).sum();

}

}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/712356
推荐阅读
相关标签
  

闽ICP备14008679号