当前位置:   article > 正文

kafka拦截器案例

kafka拦截器案例

需求: 实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间
戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或
失败发送消息数。
在这里插入图片描述
案例实操
增加时间戳拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class TimeInterceptor implements ProducerInterceptor<String,String> {

    public void configure(Map<String, ?> map) {

    }

    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> ProducerRecord) {

        //1.取出数据
        String value = ProducerRecord.value();

        //2.创建一个新的ProducerRecord对象,并返回
        return new ProducerRecord<String, String>(ProducerRecord.topic(),ProducerRecord.partition(),
                ProducerRecord.key(),System.currentTimeMillis()+"-"+value);
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    public void close() {

    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor {

    int success;
    int error;

    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return producerRecord;
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

        if (recordMetadata !=null) {
            success++;
        }else {
            error++;
        }


    }

    public void close() {
        System.out.println("success"+success);
        System.out.println(" error"+ error);

    }

    public void configure(Map<String, ?> map) {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

)producer 主程序

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.Properties;


public class CallBackProducer {

    public static void main(String[] args) {

        //1.创建配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //添加拦截器
        ArrayList<String> interceptors = new ArrayList<String>();
        interceptors.add("com.donglin.TimeInterceptor");
        interceptors.add("com.donglin.CounterInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);


        //创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        //发送数据
        for (int i = 0; i <10 ; i++) {
            producer.send(new ProducerRecord<String, String>("first", "ldl" + "---" + i));



        }
                    //关闭资源
                    producer.close();

        }
        }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

测试

bin/kafka-console-consumer.sh \ --bootstrap-server hadoop102:9092 --from-beginning --topic first
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/471584
推荐阅读
相关标签
  

闽ICP备14008679号