赞
踩
Java使用kafka的API来监控kafka的某些topic的数据量增量,offset,定时查总量之后,然后计算差值,然后就可以算单位间隔的每个topic的增量,kafka监控一般都是监控的吞吐量,即数据量的大小,而不在意这个count,数量。额,这个就是在意count。统计一下count。总结最近的偏移量---Summed Recent Offsets.
使用的jar依赖
compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.0'
Java代码
-
- import com.google.common.collect.Lists;
- import com.google.common.collect.Maps;
- import kafka.api.PartitionOffsetRequestInfo;
- import kafka.common.TopicAndPartition;
- import kafka.javaapi.OffsetResponse;
- import kafka.javaapi.PartitionMetadata;
- import kafka.javaapi.TopicMetadata;
- import kafka.javaapi.TopicMetadataRequest;
- import kafka.javaapi.consumer.SimpleConsumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
-
- /**
- * kafka监控 topic的数据消费情况
- *
- * @author LiXuekai on 2020/9/16
- */
- public class KafkaMonitorTools {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMonitorTools.class);
-
-
- public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = Maps.newHashMap();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
- kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
- OffsetResponse response = consumer.getOffsetsBefore(request);
-
- if (response.hasError()) {
- LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
- return 0;
- }
- long[] offsets = response.offsets(topic, partition);
- return offsets[0];
- }
-
- /**
- * @param brokers broker 地址
- * @param topic topic
- * @return map<分区, 分区count信息>
- */
- public static Map<Integer, PartitionMetadata> findLeader(List<String> brokers, String topic) {
- Map<Integer, PartitionMetadata> map = Maps.newHashMap();
- for (String broker : brokers) {
- SimpleConsumer consumer = null;
- try {
- String[] hostAndPort = broker.split(":");
- consumer = new SimpleConsumer(hostAndPort[0], Integer.parseInt(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime());
- List<String> topics = Lists.newArrayList(topic);
- TopicMetadataRequest req = new TopicMetadataRequest(topics);
- kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
-
- List<TopicMetadata> metaData = resp.topicsMetadata();
- for (TopicMetadata item : metaData) {
- for (PartitionMetadata part : item.partitionsMetadata()) {
- map.put(part.partitionId(), part);
- }
- }
- } catch (Exception e) {
- LOGGER.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e);
- } finally {
- if (consumer != null)
- consumer.close();
- }
- }
- return map;
- }
-
- public static Map<String, Long> monitor(List<String> brokers, List<String> topics) {
- if (brokers == null || brokers.isEmpty()) {
- return null;
- }
- if (topics == null || topics.isEmpty()) {
- return null;
- }
- Map<String, Long> map = Maps.newTreeMap();
- for (String topicName : topics) {
- Map<Integer, PartitionMetadata> metadata = findLeader(brokers, topicName);
- long size = 0L;
- for (Map.Entry<Integer, PartitionMetadata> entry : metadata.entrySet()) {
- int partition = entry.getKey();
- String leadBroker = entry.getValue().leader().host();
- String clientName = "Client_" + topicName + "_" + partition;
- SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName);
- long readOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
- size += readOffset;
- consumer.close();
- }
- map.put(topicName, size);
- }
- return map;
- }
- }
测试代码:
- private final String topics = "a,b,c,d,e,f";
- private final String server = "1.1.1.11:92";
-
- @Test
- public void monitor() {
- Map<String, Long> monitor = KafkaMonitorTools.monitor(Lists.newArrayList(server), Lists.newArrayList(topics.split(",")));
- monitor.forEach((k, v)-> System.out.println("topic:" + k + " \tSummed Recent Offsets:" + v));
- }
使用的卡夫卡版本的截图
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。