赞
踩
监控消费进度 : 看滞后程度:消费者 Lag , Consumer Lag
滞后程度 : 消费者落后于生产者的程度
Kafka 监控 Lag 是在分区上的层级 :
Lag : 反映消费者的运行情况
当消费者的速度无法匹及生产者的速度 :
生产环境中要时刻关注消费者的消费进度 :
消费进度的监控方法 :
kafka-consumer-groups
Kafka 自带的命令行工具 : bin/kafka-consumer-groups.sh
kafka-consumer-groups
: 监控消费者消费进度的工具该脚本在 Kafka bin 目录下,查看某个给定消费者的 Lag 值:
bin/kafka-consumer-groups.sh \
--bootstrap-server <Kafka broker 连接信息> \
--describe --group <group 名称>
Kafka 连接信息 = < 主机名:端口 >
对
例子:
Java Consumer API :
用 Consumer API 监控消费者组的 Lag 值:
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException { Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); try (AdminClient client = AdminClient.create(props)) { // 获取给定消费者组的最新消费消息的位移 ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID); try { Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { // 获取订阅分区的最新消息位移 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet()); return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(), // 执行减法操作,获取 Lag 值并封装进一个 Map 对象 entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 处理中断异常 // ... return Collections.emptyMap(); } catch (ExecutionException e) { // 处理 ExecutionException // ... return Collections.emptyMap(); } catch (TimeoutException e) { throw new TimeoutException("Timed out when getting lag for consumer group " + groupID); } } }
Kafka 消费者的 JMX 指标 : kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"
Lead : 消费者最新消费消息的位移与分区当前第一条消息位移的差值
kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}"
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。