赞
踩
Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器。本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理。
使用Kafka Producer端的拦截器非常简单,主要是实现ProducerInterceptor接口,此接口包含4个方法:
一般情况下只需要关注并实现onSend或者onAcknowledgement方法即可。下面我们来举个案例,通过onSend方法来过滤消息体为空的消息以及通过onAcknowledgement方法来计算发送消息的成功率。
- public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {
- private volatile long sendSuccess = 0;
- private volatile long sendFailure = 0;
-
- @Override
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
- if(record.value().length()<=0)
- return null;
- return record;
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- sendSuccess++;
- } else {
- sendFailure ++;
- }
- }
-
- @Override
- public void close() {
- double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);
- System.out.println("[INFO] 发送成功率="+String.format("%f", successRatio * 100)+"%");
- }
-
- @Override
- public void configure(Map<String, ?> configs) {}
- }

自定义的ProducerInterceptorDemo类实现之后就可以在Kafka Producer的主程序中指定,示例代码如下:
- public class ProducerMain {
- public static final String brokerList = "localhost:9092";
- public static final String topic = "hidden-topic";
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties properties = new Properties();
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("bootstrap.servers", brokerList);
- properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");
-
- Producer<String, String> producer = new KafkaProducer<String, String>(properties);
-
- for(int i=0;i<100;i++) {
- ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);
- producer.send(producerRecord).get();
- }
- producer.close();
- }
- }

Kafka Producer不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链,这个拦截链会按照其中的拦截器的加入顺序一一执行。比如上面的程序多添加一个拦截器,示例如下:
properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");
这样Kafka Producer会先执行拦截器ProducerInterceptorDemo,之后再执行ProducerInterceptorDemoPlus。
有关interceptor.classes参数,在kafka 1.0.0版本中的定义如下:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
interceptor.calssses | A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. | list | null | low |
PS:消息中间件(Kafka、RabbitMQ)交流可加微信:hiddenzzh
欢迎支持笔者新书:《RabbitMQ实战指南》以及关注微信公众号:Kafka技术专栏。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。