赞
踩
一、拦截器原理
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
Intercetpor的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor ,其定义的方法包括:
(1)configure(Map<String, ?> map)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord producerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata recordMetadata, Exception e):
该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
(4)close()::
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
二、案例实操
下面我们以一个具体的案例来介绍一下kafka中的自定义拦截器的使用:
1、需求:
实现一个简单的双interceptor组成的拦截链。
第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;
第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
效果图如下:
2、编写Java代码
1)创建一个Maven工程,导入以下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
2)编写第一个拦截器:TimeInterceptor
package com.xsluo.kafka; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /** * 第一个interceptor:在消息发送前将时间戳信息加到消息value的最前部。 */ public class TimeInterceptor implements ProducerInterceptor { /** * 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。 * 用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算 * @param producerRecord * @return */ public ProducerRecord onSend(ProducerRecord producerRecord) { return new ProducerRecord( producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), System.currentTimeMillis() + "," + producerRecord.value().toString()); } /** * 该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。 * onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率. * @param recordMetadata * @param e */ public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } /** * 关闭interceptor,主要用于执行一些资源清理工作 */ public void close() { } /** * 获取配置信息和初始化数据时调用 * @param map */ public void configure(Map<String, ?> map) { } }
3)编写第二个拦截器:CounterInterceptor
package com.xsluo.kafka; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /** * 第二个interceptor:在消息发送后更新成功发送消息数或失败发送消息数。 */ public class CounterInterceptor implements ProducerInterceptor { private int successCounter = 0; private int errorCounter = 0; public ProducerRecord onSend(ProducerRecord producerRecord) { return producerRecord; } /** * 该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。 * onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率. * @param recordMetadata * @param e */ public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { //统计成功和失败的次数 if (e == null) { successCounter++; } else { errorCounter++; } } /** * 关闭interceptor,主要用于执行一些资源清理工作 */ public void close() { //保存结果 System.out.println("Successfully sent:" + successCounter); System.out.println("Failed sent:" + errorCounter); } public void configure(Map<String, ?> map) { } }
4)编写生产者类来测试自定义的拦截器:
package com.xsluo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.ArrayList; import java.util.Properties; public class MyProducer { public static void main(String[] args) { // 1.设置配置信息 Properties props = new Properties(); props.put("bootstrap.servers", "weekend110:9092,weekend01:9092,weekend02:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2.构建拦截器链 ArrayList<String> interceptors = new ArrayList<String>(); interceptors.add("com.xsluo.kafka.TimeInterceptor"); interceptors.add("com.xsluo.kafka.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); String topic = "topic-Test"; Producer<Object, String> producer = new KafkaProducer<>(props); // 3.发送消息 for (int i = 0; i < 20; i++) { ProducerRecord<Object, String> record = new ProducerRecord<>(topic, "This is a message: " + i); producer.send(record); } // 4.关闭producer,这样才会调用interceptor的close方法 producer.close(); } }
5)在Linux服务器上启动已部署好的zookeeper服务、kafka进程以及消费者进程:
bin/kafka-console-consumer.sh --bootstrap-server weekend110:9092 --from-beginning --topic topic-Test
6)执行编写好的生产者类代码:
7)查看消费的结果:
OK!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。