赞
踩
Kafka 消费者监控消费的滞后程度,这个滞后程度被称为:消费者 Lag 或 Consumer Lag。
滞后程度就是指消费者当前落后于生产者的程度。Kafka 生产者向某主题成功生产了 100 万条消息,你的消费者当前消费了 80 万条消息,那么我们就说你的消费者滞后了 20 万条消息,即 Lag 等于 20 万。
通常来说,Lag 的单位是消息数,一般是在主题这个级别上讨论 Lag 的,但实际上,Kafka 监控 Lag 的层级是在分区上的。如果要计算主题级别的,你需要手动汇总所有主题分区的 Lag,将它们累加起来,合并成最终的 Lag 值。
对消费者而言,Lag 应该算是最最重要的监控指标了。它直接反映了一个消费者的运行情况。一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。
由于消费者的速度无法匹及生产者的速度,导致它消费的数据已经不在操作系统的页缓存中了,那么这些数据就会失去享有 Zero Copy 技术的资格。消费者就不得不从磁盘上读取它们,这就进一步拉大了与生产者的差距。
如何监控:
接下来,我们分别来讨论下这 3 种方法。
kafka-consumer-groups 脚本是 Kafka 最直接的监控消费者消费进度的工具。
脚本位于 Kafka 安装目录的 bin 子目录下,我们可以通过下面的命令来查看某个给定消费者的 Lag 值:
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --describe --group <group 名称 >
- public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
- Properties props = new Properties();
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- try (AdminClient client = AdminClient.create(props)) {
- ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
- try {
- Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
- Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
- return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
- entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // 处理中断异常
- // ...
- return Collections.emptyMap();
- } catch (ExecutionException e) {
- // 处理 ExecutionException
- // ...
- return Collections.emptyMap();
- } catch (TimeoutException e) {
- throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
- }
- }
- }
关注点:第 1 处是调用 AdminClient.listConsumerGroupOffsets 方法获取给定消费者组的最新消费消息的位移;第 2 处则是获取订阅分区的最新消息位移;最后 1 处就是执行相应的减法操作,获取 Lag 值并封装进一个 Map 对象。
这段代码只适用于 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中没有 AdminClient.listConsumerGroupOffsets 方法。
kafka已有的监控框架 Zabbix 或 Grafana。
当前,Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标,records-lag-max 和 records-lead-min,它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值。
这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理。
监控到 Lag 越来越大,那就是消费者程序变得越来越慢了,至少是追不上生产者程序了。一旦监测到 Lead 越来越小,甚至是快接近于 0 了,你就一定要小心了,这可能预示着消费者端要丢消息了。
Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。