当前位置:   article > 正文

kafka中的自定义拦截器(interceptor)使用详解_kafka batchinterceptor

kafka batchinterceptor

一、拦截器原理

Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

Intercetpor的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor ,其定义的方法包括:

1configure(Map<String, ?> map)
获取配置信息和初始化数据时调用。
  • 1
  • 2
2onSend(ProducerRecord producerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • 1
  • 2
3onAcknowledgement(RecordMetadata recordMetadata, Exception e):
该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
  • 1
  • 2
4close()::
关闭interceptor,主要用于执行一些资源清理工作
  • 1
  • 2

如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

二、案例实操

下面我们以一个具体的案例来介绍一下kafka中的自定义拦截器的使用:

1、需求:
实现一个简单的双interceptor组成的拦截链。
第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;
第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。

效果图如下:
在这里插入图片描述
2、编写Java代码

1)创建一个Maven工程,导入以下依赖:

		<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2)编写第一个拦截器:TimeInterceptor

package com.xsluo.kafka;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;

/**
 * 第一个interceptor:在消息发送前将时间戳信息加到消息value的最前部。
 */
public class TimeInterceptor implements ProducerInterceptor {
    /**
     * 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。
     * 用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
     * @param producerRecord
     * @return
     */
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return new ProducerRecord(
                producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.timestamp(),
                producerRecord.key(),
                System.currentTimeMillis() + "," + producerRecord.value().toString());
    }

    /**
     * 该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。
     * onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率.
     * @param recordMetadata
     * @param e
     */
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    /**
     * 关闭interceptor,主要用于执行一些资源清理工作
     */
    public void close() {

    }

    /**
     * 获取配置信息和初始化数据时调用
     * @param map
     */
    public void configure(Map<String, ?> map) {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

3)编写第二个拦截器:CounterInterceptor

package com.xsluo.kafka;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;

/**
 * 第二个interceptor:在消息发送后更新成功发送消息数或失败发送消息数。
 */
public class CounterInterceptor implements ProducerInterceptor {
    private int successCounter = 0;
    private int errorCounter = 0;
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return producerRecord;
    }

    /**
     * 该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。
     * onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率.
     * @param recordMetadata
     * @param e
     */
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        //统计成功和失败的次数
        if (e == null) {
            successCounter++;
        } else {
            errorCounter++;
        }

    }

    /**
     * 关闭interceptor,主要用于执行一些资源清理工作
     */
    public void close() {
        //保存结果
        System.out.println("Successfully sent:" + successCounter);
        System.out.println("Failed sent:" + errorCounter);
    }

    public void configure(Map<String, ?> map) {

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

4)编写生产者类来测试自定义的拦截器:

package com.xsluo.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.ArrayList;
import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        // 1.设置配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "weekend110:9092,weekend01:9092,weekend02:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2.构建拦截器链
        ArrayList<String> interceptors = new ArrayList<String>();
        interceptors.add("com.xsluo.kafka.TimeInterceptor");
        interceptors.add("com.xsluo.kafka.CounterInterceptor");

        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

        String topic = "topic-Test";
        Producer<Object, String> producer = new KafkaProducer<>(props);

        // 3.发送消息
        for (int i = 0; i < 20; i++) {
            ProducerRecord<Object, String> record = new ProducerRecord<>(topic, "This is a message: " + i);
            producer.send(record);
        }

        // 4.关闭producer,这样才会调用interceptor的close方法
        producer.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

5)在Linux服务器上启动已部署好的zookeeper服务、kafka进程以及消费者进程:

bin/kafka-console-consumer.sh --bootstrap-server weekend110:9092 --from-beginning --topic topic-Test
  • 1

在这里插入图片描述
6)执行编写好的生产者类代码:
在这里插入图片描述
7)查看消费的结果:
在这里插入图片描述
OK!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/471562
推荐阅读
相关标签
  

闽ICP备14008679号