赞
踩
Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。
对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等
可指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)
Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。在序列化前调用该方法。用户可以在该方法中对消息做任何操作
onAcknowledgement:该方法在消息被应答前或消息发送失败时调用,且通常都是在Producer回调逻辑触发前。运行在Producer的IO线程中,不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
close:关闭Interceptor,主要用于执行一些资源清理工作。
Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。
另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
02 测试
拦截器1
public class InterceptorOne implements ProducerInterceptor<Integer, String> { private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class); @Override public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) { System.out.println("拦截器1 -- go"); // 消息内容 final String topic = record.topic(); final Integer partition = record.partition(); final Integer key = record.key(); final String value = record.value(); final Long timestamp = record.timestamp(); final Headers headers = record.headers(); // 拦截器拦下来之后改变原来的消息内容 ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic, partition, timestamp, key, value+"1", headers); // 传递新的消息 return newRecord; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("拦截器1 -- back"); } @Override public void close() {} @Override public void configure(Map<String, ?> configs) { // 这里的config就是消息发送方的config配置 final Object classContent = configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); System.out.println(classContent); } }
拦截器2
public class InterceptorTwo implements ProducerInterceptor<Integer, String> { private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class); @Override public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) { System.out.println("拦截器2 -- go"); // 消息内容 final String topic = record.topic(); final Integer partition = record.partition(); final Integer key = record.key(); final String value = record.value(); final Long timestamp = record.timestamp(); final Headers headers = record.headers(); // 拦截器拦下来之后改变原来的消息内容 ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic, partition, timestamp, key, value+"2", headers); // 传递新消息 return newRecord; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println("拦截器2 -- back"); } @Override public void close() {} @Override public void configure(Map<String, ?> configs) { // 这里的config就是消息发送方的config配置 final Object classContent = configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); System.out.println(classContent); } }
发送方
public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.77.206.207:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应 // 此时可以保证发送消息即使在重试的情况下也是有序的。 configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 配置拦截器 configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.test.inter.interceptor.InterceptorOne,com.test.inter.interceptor.InterceptorTwo"); KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs); ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topicName", 0, 1001, "this is message"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println(metadata.offset()); } } }); producer.close(); }
输出
class org.apache.kafka.common.serialization.StringSerializer
class org.apache.kafka.common.serialization.StringSerializer
拦截器1 – go
拦截器2 – go
拦截器1 – back
拦截器2 – back
偏移量2
接收方收到的信息
this is message12
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。