当前位置:   article > 正文

Spring-Kafka配置读取与KafkaTemplate多实例实现

kafkatemplate

Spring-Kafka版本信息

工作也好多年了,头一次写博客,内容也比较简单,希望大家多多支持,多提建议。
本篇文章根据以下版本依赖进行说明

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.4.3.RELEASE</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

Spring-Kafka配置读取类

KafkaProperties类为加载配置信息类,在初始化的时候,可以无需自己编写代码读取kafka相关配置,代码中可以直接注入KafkaProperties,在根据需要,覆盖ProducerFactory的属性。
其中consumer、producer 、admin、streams、listener分别对应不同的配置信息,如consumer对应spring.kafka.consumer开头的配置信息。
buildConsumerProperties方法则表示获取consumer的配置信息,返回值为Map。

server:
  port: 9006
  servlet:
    context-path: /kafka
spring:
  kafka:
    # Kafka服务端监听地址端口,集群用逗号分隔
    bootstrap-servers: 192.168.31.249:9092
    consumer:
      # 消费者组ID,在消费者实例没有指定消费者组的时候生效
      group-id: test01
      # 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。
      enable-auto-commit: true
      # 每次自动提交offset的时间间隔,当enable-auto-commit设置为true时生效,默认值为5000,单位ms
      auto-commit-interval: 500
      # kafka服务(实际是zookeeper)中没有初始化的offset时,如果offset是以下值的回应:
      # earliest:自动复位offset为smallest的offset
      # latest:自动复位offset为largest的offset
      # anything  else:向consumer抛出异常
      # none:如果整个消费者组中没有以往的offset,则抛出异常
      auto-offset-reset: latest
      # message的key的解码类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # message的value的解码类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 单次消费获取数据的最大条数
      max-poll-records: 500
      # 每次fetch请求时,server应该返回的最小字节数。
      # 如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认值为1,单位bytes
      fetch-min-size: 1
      # 如果没有足够的数据能够满足fetch.min.bytes(fetch-min-size),
      # 则此项配置是指在应答fetch请求之前,server会阻塞的最大时间,默认值为100,单位ms
      fetch-max-wait: 100
      # 如果设置为read_committed,则consumer会缓存消息,直到收到消息对应的事务控制消息。
      # 若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息
      # 默认值为read_uncommitted
      isolation-level: read_uncommitted
    producer:
      # producer需要server接收到数据之后发出的确认接收的信号
      # acks=0:设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket  buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;
      # acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
      # acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
      acks: 1
      # 设置大于0的值将使客户端重新发送任何数据。
      # 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 4
      # producer将试图批处理消息记录,以减少请求次数,这项配置控制默认的批量处理消息字节数,默认值16384,单位bytes
      batch-size: 16384
      properties:
        # producer发送消息的延时,与batch-size配合使用,默认值0,单位ms
        linger:
          ms: 100
      # producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,
      # 默认值33554432,单位bytes
      buffer-memory: 33554432
      # key的序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value的序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4','zstd')
      # 默认为none
      compression-type: none
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {

	/**
	 * Comma-delimited list of host:port pairs to use for establishing the initial
	 * connections to the Kafka cluster. Applies to all components unless overridden.
	 */
	private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

	/**
	 * ID to pass to the server when making requests. Used for server-side logging.
	 */
	private String clientId;

	/**
	 * Additional properties, common to producers and consumers, used to configure the
	 * client.
	 */
	private final Map<String, String> properties = new HashMap<>();

	private final Consumer consumer = new Consumer();

	private final Producer producer = new Producer();

	private final Admin admin = new Admin();

	private final Streams streams = new Streams();

	private final Listener listener = new Listener();
	
	... ...

	/**
	 * Create an initial map of consumer properties from the state of this instance.
	 * <p>
	 * This allows you to add additional properties, if necessary, and override the
	 * default kafkaConsumerFactory bean.
	 * @return the consumer properties initialized with the customizations defined on this
	 * instance
	 */
	public Map<String, Object> buildConsumerProperties() {
		Map<String, Object> properties = buildCommonProperties();
		properties.putAll(this.consumer.buildProperties());
		return properties;
	}

	/**
	 * Create an initial map of producer properties from the state of this instance.
	 * <p>
	 * This allows you to add additional properties, if necessary, and override the
	 * default kafkaProducerFactory bean.
	 * @return the producer properties initialized with the customizations defined on this
	 * instance
	 */
	public Map<String, Object> buildProducerProperties() {
		Map<String, Object> properties = buildCommonProperties();
		properties.putAll(this.producer.buildProperties());
		return properties;
	}

	/**
	 * Create an initial map of admin properties from the state of this instance.
	 * <p>
	 * This allows you to add additional properties, if necessary, and override the
	 * default kafkaAdmin bean.
	 * @return the admin properties initialized with the customizations defined on this
	 * instance
	 */
	public Map<String, Object> buildAdminProperties() {
		Map<String, Object> properties = buildCommonProperties();
		properties.putAll(this.admin.buildProperties());
		return properties;
	}

	/**
	 * Create an initial map of streams properties from the state of this instance.
	 * <p>
	 * This allows you to add additional properties, if necessary.
	 * @return the streams properties initialized with the customizations defined on this
	 * instance
	 */
	public Map<String, Object> buildStreamsProperties() {
		Map<String, Object> properties = buildCommonProperties();
		properties.putAll(this.streams.buildProperties());
		return properties;
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

默认Bean的实现

KafkaAutoConfiguration类,提供了Kafka常用Bean的默认实现,包括KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory、KafkaAdmin 等,每个实现的Bean都使用了@ConditionalOnMissingBean注解,表示当开发人员没有自己单独实现的时候,使用默认实现,当开发人员单独实现的时候,默认实现不起作用,不会初始化默认的Bean实现。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

	private final KafkaProperties properties;

	public KafkaAutoConfiguration(KafkaProperties properties) {
		this.properties = properties;
	}

	@Bean
	@ConditionalOnMissingBean(KafkaTemplate.class)
	public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
			ProducerListener<Object, Object> kafkaProducerListener,
			ObjectProvider<RecordMessageConverter> messageConverter) {
		KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
		messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
		kafkaTemplate.setProducerListener(kafkaProducerListener);
		kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
		return kafkaTemplate;
	}

	@Bean
	@ConditionalOnMissingBean(ProducerListener.class)
	public ProducerListener<Object, Object> kafkaProducerListener() {
		return new LoggingProducerListener<>();
	}

	@Bean
	@ConditionalOnMissingBean(ConsumerFactory.class)
	public ConsumerFactory<?, ?> kafkaConsumerFactory() {
		return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
	}

	@Bean
	@ConditionalOnMissingBean(ProducerFactory.class)
	public ProducerFactory<?, ?> kafkaProducerFactory() {
		DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
				this.properties.buildProducerProperties());
		String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
		if (transactionIdPrefix != null) {
			factory.setTransactionIdPrefix(transactionIdPrefix);
		}
		return factory;
	}

	@Bean
	@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
	@ConditionalOnMissingBean
	public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
		return new KafkaTransactionManager<>(producerFactory);
	}

	@Bean
	@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
	@ConditionalOnMissingBean
	public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
		KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
		Jaas jaasProperties = this.properties.getJaas();
		if (jaasProperties.getControlFlag() != null) {
			jaas.setControlFlag(jaasProperties.getControlFlag());
		}
		if (jaasProperties.getLoginModule() != null) {
			jaas.setLoginModule(jaasProperties.getLoginModule());
		}
		jaas.setOptions(jaasProperties.getOptions());
		return jaas;
	}

	@Bean
	@ConditionalOnMissingBean
	public KafkaAdmin kafkaAdmin() {
		KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
		kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
		return kafkaAdmin;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

KafkaTemplate多实例使用

以KafkaTemplate为例,在某些使用场景向,可以在一个工程中,创建多个KafkaTemplate,比如当key的序列化类不同的时候,这个时候需要注意的是,由于Spring-Kafka的默认装配的Bean使用了@ConditionalOnMissingBean的注解,如果原工程使用的是默认的Bean实现,此处需要重新编写Bean的默认实现,并修改原有的KafkaTemplate的自动注入使用代码,使用@Qualifier注解,指定默认实现的Bean名称,新的代码使用新的Bean名称,就能做到同时存在多个KafkaTemplate了。

package com.example.kafka.producer.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;

import java.util.Map;

@EnableKafka
@Configuration
public class KafkaTemplateConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean(name="defaultKafkaTemplate")
    public KafkaTemplate<?, ?> kafkaTemplate(@Qualifier("defaultKafkaProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<>();
    }

    @Bean(name="defaultKafkaProducerFactory")
    public ProducerFactory<Object, Object> kafkaProducerFactory() {
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                kafkaProperties.buildProducerProperties());
        String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        return factory;
    }
    /**
     * 获取生产者工厂
     */
    @Bean(name="newKafkaProducerFactory")
    public ProducerFactory<Object, Object> newProducerFactory() {
        Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties();
        // 修改参数名称
        producerProperties.put(ProducerConfig.ACKS_CONFIG,"all");
        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
                producerProperties);
        String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }

        return factory;
    }

    @Bean(name="newKafkaTemplate")
    public KafkaTemplate<?, ?> newKafkaTemplate(@Qualifier("newKafkaProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory,
                                             ProducerListener<Object, Object> kafkaProducerListener,
                                             ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
import com.example.kafka.producer.entity.ProduceEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * Kafka生产者服务
 */
@RestController
public class KafkaProducerController {

    @Autowired
    @Qualifier("defaultKafkaTemplate")
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    @Qualifier("newKafkaTemplate")
    private KafkaTemplate<String, String> newKafkaTemplate;
    @PostMapping("produce")
    public void produce(@RequestBody ProduceEntity produceEntity) {
        for(int i = 0; i<12; i++) {
            kafkaTemplate.send(produceEntity.getTopic(), i+ " " + produceEntity.getMessage());
            newKafkaTemplate.send(produceEntity.getTopic(), "new "+ i +" "+produceEntity.getMessage());
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/419817
推荐阅读
相关标签
  

闽ICP备14008679号