当前位置:   article > 正文

kafka监控获取指定topic的消息总量_spring-kafka api获取topic数据量

spring-kafka api获取topic数据量
  1. import kafka.api.PartitionOffsetRequestInfo;
  2. import kafka.common.TopicAndPartition;
  3. import kafka.javaapi.OffsetResponse;
  4. import kafka.javaapi.PartitionMetadata;
  5. import kafka.javaapi.TopicMetadata;
  6. import kafka.javaapi.TopicMetadataRequest;
  7. import kafka.javaapi.consumer.SimpleConsumer;
  8. import java.util.*;
  9. import java.util.Map.Entry;
  10. public class KafkaOffsetTools {
  11. public final static String KAFKA_TOPIC_NAME_ADAPTER = "sample";
  12. public final static String KAFKA_TOPIC_NAME_EXCEPTION = "exception";
  13. public final static String KAFKA_TOPIC_NAME_AUDIT = "audit";
  14. private static final String rawTopicTotal = "rawTopicTotalRecordCounter";
  15. private static final String avroTopicTotal = "avroTopicTotalRecordCounter";
  16. private static final String exceptionTopicTotal = "exceptionTopicTotalRecordCounter";
  17. public KafkaOffsetTools() {
  18. }
  19. public static long getLastOffset(SimpleConsumer consumer, String topic,
  20. int partition, long whichTime, String clientName) {
  21. TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
  22. partition);
  23. Map, PartitionOffsetRequestInfo> requestInfo = new HashMap, PartitionOffsetRequestInfo>();
  24. requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
  25. whichTime, 1));
  26. kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
  27. requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
  28. clientName);
  29. OffsetResponse response = consumer.getOffsetsBefore(request);
  30. if (response.hasError()) {
  31. System.err.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
  32. return 0;
  33. }
  34. long[] offsets = response.offsets(topic, partition);
  35. return offsets[0];
  36. }
  37. private TreeMap, PartitionMetadata> findLeader(List a_seedBrokers, String a_topic) {
  38. TreeMap, PartitionMetadata> map = new TreeMap, PartitionMetadata>();
  39. loop:
  40. for (String seed : a_seedBrokers) {
  41. SimpleConsumer consumer = null;
  42. try {
  43. String[] hostAndPort;
  44. hostAndPort = seed.split(":");
  45. consumer = new SimpleConsumer(hostAndPort[0], Integer.valueOf(hostAndPort[1]), 100000, 64 * 1024,
  46. "leaderLookup" + new Date().getTime());
  47. List topics = Collections.singletonList(a_topic);
  48. TopicMetadataRequest req = new TopicMetadataRequest(topics);
  49. kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
  50. List metaData = resp.topicsMetadata();
  51. for (TopicMetadata item : metaData) {
  52. for (PartitionMetadata part : item.partitionsMetadata()) {
  53. map.put(part.partitionId(), part);
  54. }
  55. }
  56. } catch (Exception e) {
  57. System.out.println("Error communicating with Broker [" + seed
  58. + "] to find Leader for [" + a_topic + ", ] Reason: " + e);
  59. } finally {
  60. if (consumer != null)
  61. consumer.close();
  62. }
  63. }
  64. return map;
  65. }
  66. public static void main(String[] args) {
  67. String kafkaBrokerList = System.getenv("metadata.broker.list");
  68. if(kafkaBrokerList == null || kafkaBrokerList.length() == 0){
  69. System.err.println("No config kafka metadata.broker.list,it is null .");
  70. //for test
  71. kafkaBrokerList = "localhost:9092,localhost:9093";
  72. System.err.println("Use this broker list for test,metadata.broker.list="+kafkaBrokerList);
  73. }
  74. //init topic,logSize = 0
  75. Map,Integer> topics = new HashMap,Integer>();
  76. topics.put(KAFKA_TOPIC_NAME_ADAPTER,0);
  77. topics.put(KAFKA_TOPIC_NAME_EXCEPTION,0);
  78. topics.put(KAFKA_TOPIC_NAME_AUDIT,0);
  79. //init kafka broker list
  80. String[] kafkaHosts;
  81. kafkaHosts = kafkaBrokerList.split(",");
  82. if (kafkaHosts == null || kafkaHosts.length == 0) {
  83. System.err.println("No config kafka metadata.broker.list,it is null .");
  84. System.exit(1);
  85. }
  86. List seeds = new ArrayList();
  87. for (int i = 0; i < kafkaHosts.length; i++) {
  88. seeds.add(kafkaHosts[i]);
  89. }
  90. KafkaOffsetTools kot = new KafkaOffsetTools();
  91. for(String topicName : topics.keySet()){
  92. TreeMap, PartitionMetadata> metadatas = kot.findLeader(seeds, topicName);
  93. int logSize = 0;
  94. for (Entry, PartitionMetadata> entry : metadatas.entrySet()) {
  95. int partition = entry.getKey();
  96. String leadBroker = entry.getValue().leader().host();
  97. String clientName = "Client_" + topicName + "_" + partition;
  98. SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000,
  99. 64 * 1024, clientName);
  100. long readOffset = getLastOffset(consumer, topicName, partition,
  101. kafka.api.OffsetRequest.LatestTime(), clientName);
  102. logSize += readOffset;
  103. if (consumer != null) consumer.close();
  104. }
  105. topics.put(topicName,logSize);
  106. }
  107. System.out.println(topics.toString());
  108. System.out.println(rawTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_ADAPTER)+" "+System.currentTimeMillis());
  109. System.out.println(avroTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_AUDIT)+" "+System.currentTimeMillis());
  110. System.out.println(exceptionTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_EXCEPTION)+" "+System.currentTimeMillis());
  111. }
  112. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/712354
推荐阅读
相关标签
  

闽ICP备14008679号