当前位置:   article > 正文

kafka生产者拦截器详细讲解,附带代码_kafka producer 拦截器

kafka producer 拦截器

拦截器

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。

对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等
可指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)

Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。在序列化前调用该方法。用户可以在该方法中对消息做任何操作
onAcknowledgement:该方法在消息被应答前或消息发送失败时调用,且通常都是在Producer回调逻辑触发前。运行在Producer的IO线程中,不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
close:关闭Interceptor,主要用于执行一些资源清理工作。

Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。
另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。


02 测试

拦截器1

public class InterceptorOne implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("拦截器1 -- go");

        // 消息内容
        final String topic = record.topic();
        final Integer partition = record.partition();
        final Integer key = record.key();
        final String value = record.value();
        final Long timestamp = record.timestamp();
        final Headers headers = record.headers();

        // 拦截器拦下来之后改变原来的消息内容
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic, partition, timestamp, key, value+"1", headers);
        // 传递新的消息
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("拦截器1 -- back");
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {
        // 这里的config就是消息发送方的config配置
        final Object classContent = configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        System.out.println(classContent);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

拦截器2

public class InterceptorTwo implements ProducerInterceptor<Integer, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);

    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        System.out.println("拦截器2 -- go");

        // 消息内容
        final String topic = record.topic();
        final Integer partition = record.partition();
        final Integer key = record.key();
        final String value = record.value();
        final Long timestamp = record.timestamp();
        final Headers headers = record.headers();

        // 拦截器拦下来之后改变原来的消息内容
        ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic, partition, timestamp, key, value+"2", headers);
        // 传递新消息
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("拦截器2 -- back");
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {
        // 这里的config就是消息发送方的config配置
        final Object classContent = configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        System.out.println(classContent);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

发送方

public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.77.206.207:9092");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    // 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应
    // 此时可以保证发送消息即使在重试的情况下也是有序的。
    configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

    // 配置拦截器
    configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.test.inter.interceptor.InterceptorOne,com.test.inter.interceptor.InterceptorTwo");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
    ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topicName", 0, 1001, "this is message");
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                System.out.println(metadata.offset());
            }
        }
    });
    producer.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

输出
class org.apache.kafka.common.serialization.StringSerializer
class org.apache.kafka.common.serialization.StringSerializer
拦截器1 – go
拦截器2 – go
拦截器1 – back
拦截器2 – back
偏移量2

接收方收到的信息
this is message12

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/471542?site
推荐阅读
相关标签
  

闽ICP备14008679号