赞
踩
- import kafka.api.PartitionOffsetRequestInfo;
- import kafka.common.TopicAndPartition;
- import kafka.javaapi.OffsetResponse;
- import kafka.javaapi.PartitionMetadata;
- import kafka.javaapi.TopicMetadata;
- import kafka.javaapi.TopicMetadataRequest;
- import kafka.javaapi.consumer.SimpleConsumer;
-
- import java.util.*;
- import java.util.Map.Entry;
-
- public class KafkaOffsetTools {
- public final static String KAFKA_TOPIC_NAME_ADAPTER = "sample";
- public final static String KAFKA_TOPIC_NAME_EXCEPTION = "exception";
- public final static String KAFKA_TOPIC_NAME_AUDIT = "audit";
- private static final String rawTopicTotal = "rawTopicTotalRecordCounter";
- private static final String avroTopicTotal = "avroTopicTotalRecordCounter";
- private static final String exceptionTopicTotal = "exceptionTopicTotalRecordCounter";
-
- public KafkaOffsetTools() {
- }
-
- public static long getLastOffset(SimpleConsumer consumer, String topic,
- int partition, long whichTime, String clientName) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
- partition);
- Map, PartitionOffsetRequestInfo> requestInfo = new HashMap, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
- whichTime, 1));
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
- clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- System.err.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
- return 0;
- }
- long[] offsets = response.offsets(topic, partition);
- return offsets[0];
- }
-
- private TreeMap, PartitionMetadata> findLeader(List a_seedBrokers, String a_topic) {
- TreeMap, PartitionMetadata> map = new TreeMap, PartitionMetadata>();
- loop:
- for (String seed : a_seedBrokers) {
- SimpleConsumer consumer = null;
- try {
- String[] hostAndPort;
- hostAndPort = seed.split(":");
- consumer = new SimpleConsumer(hostAndPort[0], Integer.valueOf(hostAndPort[1]), 100000, 64 * 1024,
- "leaderLookup" + new Date().getTime());
- List topics = Collections.singletonList(a_topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
- List metaData = resp.topicsMetadata();
- for (TopicMetadata item : metaData) {
- for (PartitionMetadata part : item.partitionsMetadata()) {
- map.put(part.partitionId(), part);
- }
- }
- } catch (Exception e) {
- System.out.println("Error communicating with Broker [" + seed
- + "] to find Leader for [" + a_topic + ", ] Reason: " + e);
- } finally {
- if (consumer != null)
- consumer.close();
- }
- }
- return map;
- }
-
- public static void main(String[] args) {
- String kafkaBrokerList = System.getenv("metadata.broker.list");
- if(kafkaBrokerList == null || kafkaBrokerList.length() == 0){
- System.err.println("No config kafka metadata.broker.list,it is null .");
- //for test
- kafkaBrokerList = "localhost:9092,localhost:9093";
- System.err.println("Use this broker list for test,metadata.broker.list="+kafkaBrokerList);
- }
- //init topic,logSize = 0
- Map,Integer> topics = new HashMap,Integer>();
- topics.put(KAFKA_TOPIC_NAME_ADAPTER,0);
- topics.put(KAFKA_TOPIC_NAME_EXCEPTION,0);
- topics.put(KAFKA_TOPIC_NAME_AUDIT,0);
- //init kafka broker list
- String[] kafkaHosts;
- kafkaHosts = kafkaBrokerList.split(",");
- if (kafkaHosts == null || kafkaHosts.length == 0) {
- System.err.println("No config kafka metadata.broker.list,it is null .");
- System.exit(1);
- }
- List seeds = new ArrayList();
- for (int i = 0; i < kafkaHosts.length; i++) {
- seeds.add(kafkaHosts[i]);
- }
-
- KafkaOffsetTools kot = new KafkaOffsetTools();
-
- for(String topicName : topics.keySet()){
- TreeMap, PartitionMetadata> metadatas = kot.findLeader(seeds, topicName);
- int logSize = 0;
- for (Entry, PartitionMetadata> entry : metadatas.entrySet()) {
- int partition = entry.getKey();
- String leadBroker = entry.getValue().leader().host();
- String clientName = "Client_" + topicName + "_" + partition;
- SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000,
- 64 * 1024, clientName);
- long readOffset = getLastOffset(consumer, topicName, partition,
- kafka.api.OffsetRequest.LatestTime(), clientName);
- logSize += readOffset;
- if (consumer != null) consumer.close();
- }
- topics.put(topicName,logSize);
- }
- System.out.println(topics.toString());
- System.out.println(rawTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_ADAPTER)+" "+System.currentTimeMillis());
- System.out.println(avroTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_AUDIT)+" "+System.currentTimeMillis());
- System.out.println(exceptionTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_EXCEPTION)+" "+System.currentTimeMillis());
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。