赞
踩
实际项目中,我们会遇到kafka消费不及时,系统发现最新的数据一致无法出现,这时候通过其他kafka工具发现原来指定的消费组lag太大,也就是我们系统要么出问题,要么需要启动更多的实例加快消费消息。每次通过kafka工具去查询lag基本都是手工而且耗时且慢,能否在自己的系统中集成查询指定消费组lag的功能,然后出现问题是可以管理界面中迅速查看lag呢?答案是当然可以。 kafka提供了这些API。
备注:这里的kafka工具是指kafka自带的命令行工具,或者其他第三方提供的kafka工具。
注意事项:本博客所有代码是为了介绍相关内容而编写或者引用的,示例代码并非可直接用于生产的代码。仅供参考而已。
首先我们需要引入的依赖如下。 其中kafka-clients是作为kafka客户端访问kakfa需要的依赖包,kafka_2.12是管理端需要的依赖包。
备注:我的kafka版本是kafka_2.12-1.0.0。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
kafka管理端的AdminClient可以查询消费组的详情。
AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(GROUP_ID, 5000);
scala.collection.immutable.Map<TopicPartition, Object> maps = adminClient.listGroupOffsets(GROUP_ID);
listGroupOffsets可以查询消费组的在指定topic的指定分区中的offset. 然后我们可以通过查询该TopicPartition最新的postion得到endOffset。
两者之差就是指定消费组在指定topic的某个分区上的lag信息。
KafkaConsumer<String, String> consumer = getNewConsumer();
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToEnd(Arrays.asList(topicPartition));
long endOffset = consumer.position(topicPartition);
完整的代码在这里,欢迎加星和fork。 谢谢!
package com.yq; import kafka.admin.AdminClient; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import scala.Option; import scala.collection.immutable.List; import java.util.Arrays; import java.util.Properties; /** * Simple to Introduction * className: SendMessageMain * * @author EricYang * @version 2019/01/10 11:30 */ @Slf4j public class AdminMain { private static final String SERVERS = "ubuntu:9092"; private static final String GROUP_ID = "yq-consumer09"; public static long getLogEndOffset(TopicPartition topicPartition){ KafkaConsumer<String, String> consumer = getNewConsumer(); consumer.assign(Arrays.asList(topicPartition)); consumer.seekToEnd(Arrays.asList(topicPartition)); long endOffset = consumer.position(topicPartition); consumer.close(); return endOffset; } public static KafkaConsumer getNewConsumer(){ Properties props = new Properties(); props.put("bootstrap.servers", SERVERS); props.put("group.id", "test001"); props.put("enable.auto.commit", "true"); props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); return consumer; } public static void main(String[] args) throws InstantiationException, IllegalAccessException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, SERVERS); AdminClient adminClient = AdminClient.create(props); AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(GROUP_ID, 5000); if(consumerGroupSummary.state().equals("Empty")){ System.out.println("No grp summary"); } System.out.println("consumerGrpSummary State " + consumerGroupSummary.state()); Option<List<AdminClient.ConsumerSummary>> consumerSummaryOption = consumerGroupSummary.consumers(); scala.collection.immutable.Map<TopicPartition, Object> maps = adminClient.listGroupOffsets(GROUP_ID); scala.collection.Set<TopicPartition> topicPartitions = maps.keySet(); scala.collection.immutable.List<TopicPartition> topicPartitionList = topicPartitions.reversed(); for(int j =0; j< topicPartitionList.size(); j++){ TopicPartition topicPartition = topicPartitionList.apply(j); String currentOffset = maps.get(topicPartition).get().toString(); long groupLastEndOffset = getLogEndOffset(topicPartition); long lag = groupLastEndOffset -Long.valueOf(currentOffset); System.out.println("topic:"+topicPartition.topic()+", partition:" + topicPartition.partition() + ", offset:" + currentOffset + ", groupLastEndOffset:"+ groupLastEndOffset + ", lag:"+ lag); } adminClient.close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。