赞
踩
需求: 实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间
戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或
失败发送消息数。
案例实操
增加时间戳拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String,String> { public void configure(Map<String, ?> map) { } public ProducerRecord<String, String> onSend(ProducerRecord<String, String> ProducerRecord) { //1.取出数据 String value = ProducerRecord.value(); //2.创建一个新的ProducerRecord对象,并返回 return new ProducerRecord<String, String>(ProducerRecord.topic(),ProducerRecord.partition(), ProducerRecord.key(),System.currentTimeMillis()+"-"+value); } public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } public void close() { } }
统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterInterceptor implements ProducerInterceptor { int success; int error; public ProducerRecord onSend(ProducerRecord producerRecord) { return producerRecord; } public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (recordMetadata !=null) { success++; }else { error++; } } public void close() { System.out.println("success"+success); System.out.println(" error"+ error); } public void configure(Map<String, ?> map) { } }
)producer 主程序
import org.apache.kafka.clients.producer.*; import java.util.ArrayList; import java.util.Properties; public class CallBackProducer { public static void main(String[] args) { //1.创建配置信息 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //添加拦截器 ArrayList<String> interceptors = new ArrayList<String>(); interceptors.add("com.donglin.TimeInterceptor"); interceptors.add("com.donglin.CounterInterceptor"); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); //创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //发送数据 for (int i = 0; i <10 ; i++) { producer.send(new ProducerRecord<String, String>("first", "ldl" + "---" + i)); } //关闭资源 producer.close(); } }
测试
bin/kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --from-beginning --topic first
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。