当前位置:   article > 正文

Kafka 拦截器

kafka 拦截器

拦截器 : 在不修改业务逻辑下,动态实现一组可插拔的事件处理逻辑链

  • Kafka 拦截器自 0.10.0.0 引入

Kafka 拦截器能在消息处理的前后多个时点动态植入不同的处理逻辑

  • 如:在消息发送前或 在消息被消费后

在这里插入图片描述

Kafka 拦截器分 :

  • 生产者拦截器 :在发送消息前或 消息提交成功后植入拦截器逻辑
  • 消费者拦截器:在消费消息前或 提交位移后植入拦截器逻辑

指定 Producer 拦截器:

  • 定拦截器类时,指定全限定名
List<String> interceptors = new ArrayList<>();
interceptors.add("com.cpucode.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1
interceptors.add("com.cpucode.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2

Properties props = new Properties();
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

所有 Producer 拦截器的实现类都要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口

  1. onSend:在消息发送前被调用
  2. onAcknowledgement:在消息成功提交或发送失败后被调用,onAcknowledgement 要早于 callback 的调用。注意线程安全,无重的逻辑,会导致 Producer TPS 下降

消费者拦截器的实现类都要继承org.apache.kafka.clients.consumer.ConsumerInterceptor 接口

  • onConsume:Consumer 处理信息前调用
  • onCommit:Consumer 提交位移后调用

案例

Kafka 拦截器可用于:客户端监控、端到端系统性能检测、消息审计

用拦截器统计消息端到端处理的延时

实现生产者拦截器:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
    private Jedis jedis; // 省略 Jedis 初始化
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
      // 更新总发送消息数
      jedis.incr("totalSentMessage");
      return record;
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
 
    @Override
    public void close() {
    }
 
    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

消费者的拦截器实现:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    private Jedis jedis; // 省略 Jedis 初始化

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        // 用当前的时钟时间 - 在消息中的创建时间,累计这批消息时间
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        // 总的端到端处理延时并更新到 Redis
        jedis.incrBy("totalLatency", lantency);
      
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        // 总延时/总消息数 = 端到端消息的平均处理延时
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }
 
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }
 
    @Override
    public void close() {
    }
 
    @Override
    public void configure(Map<String, ?> configs) {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/471538
推荐阅读
相关标签
  

闽ICP备14008679号