赞
踩
上代码:
public class ConsumerLagApply { public static void main(String[] args) { String ipPort="kafkaClusterIpPort"; String consumerGroup="consumerGroup"; String topic="topic"; int partitionCount=0 ; long totalLag=0; long maxLag=0; int lagIsNotNull=0; BasicInfo basicInfo = new BasicInfo(); KafkaConsumer consumer = basicInfo.kafkaConsumer(ipPort, consumerGroup); List<PartitionInfo> partitions = consumer.partitionsFor(topic); for (PartitionInfo partition : partitions) { TopicPartition topicPartition = new TopicPartition(partition.topic(), partition.partition()); // 计算单个分区的lag consumer.assign(Collections.singletonList(topicPartition)); consumer.seekToEnd(Collections.singletonList(topicPartition)); long endOffset = consumer.position(topicPartition); if (endOffset!=0){ lagIsNotNull++; } consumer.seek(topicPartition,consumer.position(topicPartition)); long currentOffset = consumer.position(topicPartition); long lag = endOffset - currentOffset; maxLag = Math.max(maxLag, lag); totalLag += lag; partitionCount++; } double averageLag = totalLag / (double) partitionCount; double ratioConsumer = lagIsNotNull / (double) partitionCount; DecimalFormat decimalFormat = new DecimalFormat("0.0%"); String ConsumerRatio = decimalFormat.format(ratioConsumer); System.out.println("Consumer coverage ratio: " + ConsumerRatio+",maxLag: "+maxLag+",avgLag: "+averageLag); consumer.close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。