赞
踩
Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。
如果需要加AddTimestampInterceptor和UpdateCounterInterceptor两个拦截器,可以这样配置
Properties props = new Properties();List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor");
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
那么如何编码自己的拦截器呢?
Producer 端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor 接口。该接口是 Kafka 提供的,里面有两个核心的方法。
同理,指定消费者拦截器也是同样的方法,只是具体的实现类要实现org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。
Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
引用:极客时间读书笔记
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。