赞
踩
工作也好多年了,头一次写博客,内容也比较简单,希望大家多多支持,多提建议。
本篇文章根据以下版本依赖进行说明
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.3.RELEASE</version>
</dependency>
KafkaProperties类为加载配置信息类,在初始化的时候,可以无需自己编写代码读取kafka相关配置,代码中可以直接注入KafkaProperties,在根据需要,覆盖ProducerFactory的属性。
其中consumer、producer 、admin、streams、listener分别对应不同的配置信息,如consumer对应spring.kafka.consumer开头的配置信息。
buildConsumerProperties方法则表示获取consumer的配置信息,返回值为Map。
server: port: 9006 servlet: context-path: /kafka spring: kafka: # Kafka服务端监听地址端口,集群用逗号分隔 bootstrap-servers: 192.168.31.249:9092 consumer: # 消费者组ID,在消费者实例没有指定消费者组的时候生效 group-id: test01 # 如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。 enable-auto-commit: true # 每次自动提交offset的时间间隔,当enable-auto-commit设置为true时生效,默认值为5000,单位ms auto-commit-interval: 500 # kafka服务(实际是zookeeper)中没有初始化的offset时,如果offset是以下值的回应: # earliest:自动复位offset为smallest的offset # latest:自动复位offset为largest的offset # anything else:向consumer抛出异常 # none:如果整个消费者组中没有以往的offset,则抛出异常 auto-offset-reset: latest # message的key的解码类 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # message的value的解码类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 单次消费获取数据的最大条数 max-poll-records: 500 # 每次fetch请求时,server应该返回的最小字节数。 # 如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认值为1,单位bytes fetch-min-size: 1 # 如果没有足够的数据能够满足fetch.min.bytes(fetch-min-size), # 则此项配置是指在应答fetch请求之前,server会阻塞的最大时间,默认值为100,单位ms fetch-max-wait: 100 # 如果设置为read_committed,则consumer会缓存消息,直到收到消息对应的事务控制消息。 # 若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息 # 默认值为read_uncommitted isolation-level: read_uncommitted producer: # producer需要server接收到数据之后发出的确认接收的信号 # acks=0:设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1; # acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。 # acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。 acks: 1 # 设置大于0的值将使客户端重新发送任何数据。 # 注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 retries: 4 # producer将试图批处理消息记录,以减少请求次数,这项配置控制默认的批量处理消息字节数,默认值16384,单位bytes batch-size: 16384 properties: # producer发送消息的延时,与batch-size配合使用,默认值0,单位ms linger: ms: 100 # producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常, # 默认值33554432,单位bytes buffer-memory: 33554432 # key的序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4','zstd') # 默认为none compression-type: none
@ConfigurationProperties(prefix = "spring.kafka") public class KafkaProperties { /** * Comma-delimited list of host:port pairs to use for establishing the initial * connections to the Kafka cluster. Applies to all components unless overridden. */ private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092")); /** * ID to pass to the server when making requests. Used for server-side logging. */ private String clientId; /** * Additional properties, common to producers and consumers, used to configure the * client. */ private final Map<String, String> properties = new HashMap<>(); private final Consumer consumer = new Consumer(); private final Producer producer = new Producer(); private final Admin admin = new Admin(); private final Streams streams = new Streams(); private final Listener listener = new Listener(); ... ... /** * Create an initial map of consumer properties from the state of this instance. * <p> * This allows you to add additional properties, if necessary, and override the * default kafkaConsumerFactory bean. * @return the consumer properties initialized with the customizations defined on this * instance */ public Map<String, Object> buildConsumerProperties() { Map<String, Object> properties = buildCommonProperties(); properties.putAll(this.consumer.buildProperties()); return properties; } /** * Create an initial map of producer properties from the state of this instance. * <p> * This allows you to add additional properties, if necessary, and override the * default kafkaProducerFactory bean. * @return the producer properties initialized with the customizations defined on this * instance */ public Map<String, Object> buildProducerProperties() { Map<String, Object> properties = buildCommonProperties(); properties.putAll(this.producer.buildProperties()); return properties; } /** * Create an initial map of admin properties from the state of this instance. * <p> * This allows you to add additional properties, if necessary, and override the * default kafkaAdmin bean. * @return the admin properties initialized with the customizations defined on this * instance */ public Map<String, Object> buildAdminProperties() { Map<String, Object> properties = buildCommonProperties(); properties.putAll(this.admin.buildProperties()); return properties; } /** * Create an initial map of streams properties from the state of this instance. * <p> * This allows you to add additional properties, if necessary. * @return the streams properties initialized with the customizations defined on this * instance */ public Map<String, Object> buildStreamsProperties() { Map<String, Object> properties = buildCommonProperties(); properties.putAll(this.streams.buildProperties()); return properties; } }
KafkaAutoConfiguration类,提供了Kafka常用Bean的默认实现,包括KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory、KafkaAdmin 等,每个实现的Bean都使用了@ConditionalOnMissingBean注解,表示当开发人员没有自己单独实现的时候,使用默认实现,当开发人员单独实现的时候,默认实现不起作用,不会初始化默认的Bean实现。
@Configuration(proxyBeanMethods = false) @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) @Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class }) public class KafkaAutoConfiguration { private final KafkaProperties properties; public KafkaAutoConfiguration(KafkaProperties properties) { this.properties = properties; } @Bean @ConditionalOnMissingBean(KafkaTemplate.class) public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; } @Bean @ConditionalOnMissingBean(ProducerListener.class) public ProducerListener<Object, Object> kafkaProducerListener() { return new LoggingProducerListener<>(); } @Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory<?, ?> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()); } @Bean @ConditionalOnMissingBean(ProducerFactory.class) public ProducerFactory<?, ?> kafkaProducerFactory() { DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>( this.properties.buildProducerProperties()); String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } return factory; } @Bean @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix") @ConditionalOnMissingBean public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) { return new KafkaTransactionManager<>(producerFactory); } @Bean @ConditionalOnProperty(name = "spring.kafka.jaas.enabled") @ConditionalOnMissingBean public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); Jaas jaasProperties = this.properties.getJaas(); if (jaasProperties.getControlFlag() != null) { jaas.setControlFlag(jaasProperties.getControlFlag()); } if (jaasProperties.getLoginModule() != null) { jaas.setLoginModule(jaasProperties.getLoginModule()); } jaas.setOptions(jaasProperties.getOptions()); return jaas; } @Bean @ConditionalOnMissingBean public KafkaAdmin kafkaAdmin() { KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast()); return kafkaAdmin; } }
以KafkaTemplate为例,在某些使用场景向,可以在一个工程中,创建多个KafkaTemplate,比如当key的序列化类不同的时候,这个时候需要注意的是,由于Spring-Kafka的默认装配的Bean使用了@ConditionalOnMissingBean的注解,如果原工程使用的是默认的Bean实现,此处需要重新编写Bean的默认实现,并修改原有的KafkaTemplate的自动注入使用代码,使用@Qualifier注解,指定默认实现的Bean名称,新的代码使用新的Bean名称,就能做到同时存在多个KafkaTemplate了。
package com.example.kafka.producer.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; import java.util.Map; @EnableKafka @Configuration public class KafkaTemplateConfig { @Autowired private KafkaProperties kafkaProperties; @Bean(name="defaultKafkaTemplate") public KafkaTemplate<?, ?> kafkaTemplate(@Qualifier("defaultKafkaProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic()); return kafkaTemplate; } @Bean public ProducerListener<Object, Object> kafkaProducerListener() { return new LoggingProducerListener<>(); } @Bean(name="defaultKafkaProducerFactory") public ProducerFactory<Object, Object> kafkaProducerFactory() { DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>( kafkaProperties.buildProducerProperties()); String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } return factory; } /** * 获取生产者工厂 */ @Bean(name="newKafkaProducerFactory") public ProducerFactory<Object, Object> newProducerFactory() { Map<String, Object> producerProperties = kafkaProperties.buildProducerProperties(); // 修改参数名称 producerProperties.put(ProducerConfig.ACKS_CONFIG,"all"); DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>( producerProperties); String transactionIdPrefix = kafkaProperties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } return factory; } @Bean(name="newKafkaTemplate") public KafkaTemplate<?, ?> newKafkaTemplate(@Qualifier("newKafkaProducerFactory") ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic()); return kafkaTemplate; } }
import com.example.kafka.producer.entity.ProduceEntity; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; /** * Kafka生产者服务 */ @RestController public class KafkaProducerController { @Autowired @Qualifier("defaultKafkaTemplate") private KafkaTemplate<String, String> kafkaTemplate; @Autowired @Qualifier("newKafkaTemplate") private KafkaTemplate<String, String> newKafkaTemplate; @PostMapping("produce") public void produce(@RequestBody ProduceEntity produceEntity) { for(int i = 0; i<12; i++) { kafkaTemplate.send(produceEntity.getTopic(), i+ " " + produceEntity.getMessage()); newKafkaTemplate.send(produceEntity.getTopic(), "new "+ i +" "+produceEntity.getMessage()); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。