当前位置:   article > 正文

kafka消费者客户端自定义拦截器_list>

list>

与生产者客户端拦截器机制一样,kafka消费者客户端中也定义了拦截器逻辑,通过实现ConsumerInterceptor来实现自定义拦截器逻辑,ConsumerInterceptor主要有三个方法:

public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records)
consumer会在poll方法返回之前调用此方法,来对消息进行定制化的操作,比如修改消息内容,按照一定规则过滤消息等。

public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets)
consumer会在提交消费位移之后调用此方法,可以在此方法中跟踪提交位移的相关信息。

实现消费者客户端自定义拦截器示例代码:

public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
        for(TopicPartition partition:records.partitions()){
            List<ConsumerRecord<String, String>> recs = records.records(partition);
            List<ConsumerRecord<String, String>> newRecs = new ArrayList<>();
            for(ConsumerRecord<String,String> rec:recs){
                String newValue = "interceptor-"+rec.value();
                ConsumerRecord<String,String> newRec = new ConsumerRecord<>(rec.topic(),
                        rec.partition(),rec.offset(),rec.key(),newValue);
                newRecs.add(newRec);
            }
            newRecords.put(partition,newRecs);
        }
        return new ConsumerRecords<>(newRecords);
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
       offsets.forEach((tp,offsetAndMetadata) -> {
           System.out.println(tp+" : "+offsetAndMetadata.offset());
        });
    }
    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

另外在消费者客户端配置中增加如下配置:

properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName())
  • 1

运行后结果如下:
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/471628
推荐阅读
相关标签
  

闽ICP备14008679号