赞
踩
与生产者客户端拦截器机制一样,kafka消费者客户端中也定义了拦截器逻辑,通过实现ConsumerInterceptor
来实现自定义拦截器逻辑,ConsumerInterceptor
主要有三个方法:
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records)
consumer会在poll方法返回之前调用此方法,来对消息进行定制化的操作,比如修改消息内容,按照一定规则过滤消息等。
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets)
consumer会在提交消费位移之后调用此方法,可以在此方法中跟踪提交位移的相关信息。
实现消费者客户端自定义拦截器示例代码:
public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> { @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>(); for(TopicPartition partition:records.partitions()){ List<ConsumerRecord<String, String>> recs = records.records(partition); List<ConsumerRecord<String, String>> newRecs = new ArrayList<>(); for(ConsumerRecord<String,String> rec:recs){ String newValue = "interceptor-"+rec.value(); ConsumerRecord<String,String> newRec = new ConsumerRecord<>(rec.topic(), rec.partition(),rec.offset(),rec.key(),newValue); newRecs.add(newRec); } newRecords.put(partition,newRecs); } return new ConsumerRecords<>(newRecords); } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.forEach((tp,offsetAndMetadata) -> { System.out.println(tp+" : "+offsetAndMetadata.offset()); }); } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }
另外在消费者客户端配置中增加如下配置:
properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName())
运行后结果如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。