赞
踩
一、定义消费者拦截器(只消费含"sister"的消息)
package com.cisdi.dsp.modules.metaAnalysis.rest; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import java.util.*; public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> { @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>(); Set<TopicPartition> partitionSet = records.partitions(); for(TopicPartition topicPartition: partitionSet){ List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition); List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>(); for(ConsumerRecord<String,String> record: partitionRecordList){ if(record.value().contains("sister")){ newPartitionRecordList.add(record); } } finalResult.put(topicPartition,newPartitionRecordList); } return new ConsumerRecords<>(finalResult); } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.forEach((tp,meta) -> { System.out.println("消费者拦截器:"+tp.topic()+":"+meta.offset()); }); } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
二、定义消费者,配置消费者拦截器
package com.cisdi.dsp.modules.metaAnalysis.rest; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class ConsumerInterceptorTest { public static void main(String[] args) { String topic="testTopic2"; String server="xx.xx.xx.xx:9092"; Properties properties=new Properties(); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName()); KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties); myConsumer.subscribe(Arrays.asList(topic)); while(true){ ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000)); for(ConsumerRecord consumerRecord: records){ System.out.println(consumerRecord.value()); } //myConsumer.commitSync(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。