当前位置:   article > 正文

kafka拦截器(示例)

kafka拦截器实例
  1. 拦截内容在内容前加时间戳
  1. import java.util.Map;
  2. import org.apache.kafka.clients.producer.ProducerInterceptor;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. public class TimeInterceptor implements ProducerInterceptor<String, String> {
  6. @Override
  7. public void configure(Map<String, ?> configs) {
  8. }
  9. @Override
  10. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  11. // 创建一个新的record,把时间戳写入消息体的最前部
  12. return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
  13. System.currentTimeMillis() + "," + record.value().toString());
  14. }
  15. @Override
  16. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  17. }
  18. @Override
  19. public void close() {
  20. }
  21. }
  1. 拦截内容计数
  1. import java.util.Map;
  2. import org.apache.kafka.clients.producer.ProducerInterceptor;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. public class CounterInterceptor implements ProducerInterceptor<String, String>{
  6. private int errorCounter = 0;
  7. private int successCounter = 0;
  8. @Override
  9. public void configure(Map<String, ?> configs) {
  10. }
  11. @Override
  12. public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  13. return record;
  14. }
  15. @Override
  16. public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  17. // 统计成功和失败的次数
  18. if (exception == null) {
  19. successCounter++;
  20. } else {
  21. errorCounter++;
  22. }
  23. }
  24. @Override
  25. public void close() {
  26. // 保存结果
  27. System.out.println("Successful sent: " + successCounter);
  28. System.out.println("Failed sent: " + errorCounter);
  29. }
  30. }
  1. 主程序
  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.Properties;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.Producer;
  6. import org.apache.kafka.clients.producer.ProducerConfig;
  7. import org.apache.kafka.clients.producer.ProducerRecord;
  8. public class InterceptorProducer {
  9. public static void main(String[] args) throws Exception {
  10. // 1 设置配置信息
  11. Properties props = new Properties();
  12. props.put("bootstrap.servers", "hadoop01:9092");
  13. props.put("acks", "all");
  14. props.put("retries", 0);
  15. props.put("batch.size", 16384);
  16. props.put("linger.ms", 1);
  17. props.put("buffer.memory", 33554432);
  18. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  19. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  20. // 2 构建拦截链
  21. List<String> interceptors = new ArrayList<>();
  22. interceptors.add("TimeInterceptor"); interceptors.add("CounterInterceptor");
  23. props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
  24. String topic = "first";
  25. Producer<String, String> producer = new KafkaProducer<>(props);
  26. // 3 发送消息
  27. for (int i = 0; i < 10; i++) {
  28. ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
  29. producer.send(record);
  30. }
  31. // 4 一定要关闭producer,这样才会调用interceptor的close方法
  32. producer.close();
  33. }
  34. }

运行结果
API端
在这里插入图片描述
Linux端
在这里插入图片描述

转载于:https://www.cnblogs.com/drl-blogs/p/11086893.html

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号