当前位置:   article > 正文

【Spring Boot 使用记录】kafka自动配置和自定义配置_spring.kafka.consumer.auto-offset-reset

spring.kafka.consumer.auto-offset-reset

1 自动配置

自动配置实现在 org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

配置类为:

  1. @ConfigurationProperties(prefix = "spring.kafka")
  2. public class KafkaProperties {
  3. ......
  4. }

所以使用spring boot 默认的自动配置,kafka的配置都已spring.kafka开头,如:

  1. #kafka默认消费者配置
  2. spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092
  3. spring.kafka.consumer.enable-auto-commit=false
  4. spring.kafka.consumer.auto-offset-reset=earliest
  5. #kafka默认生产者配置
  6. spring.kafka.producer.bootstrap-servers=192.168.0.15:9092
  7. spring.kafka.producer.acks=-1
  8. spring.kafka.client-id=kafka-producer
  9. spring.kafka.producer.batch-size=5

 

2 自定义配置

配置类org.springframework.boot.autoconfigure.kafka.KafkaProperties中并没有涵盖所有的org.apache.kafka.clients.producer.ProducerConfigorg.apache.kafka.clients.consumer.ConsumerConfig中的配置,这就导致某些特殊配置不能依赖spring boot自动创建,需要我们手动创建Poducer和comsumer。

生产端自定义配置例子:

  1. package cn.ztuo.bitrade.config;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.apache.kafka.clients.producer.ProducerConfig;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.annotation.EnableKafka;
  10. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  11. import org.springframework.kafka.core.KafkaTemplate;
  12. import org.springframework.kafka.core.ProducerFactory;
  13. @Configuration
  14. @EnableKafka
  15. public class KafkaProducerConfiguration {
  16. @Value("${spring.kafka.bootstrap-servers}")
  17. private String servers;
  18. @Value("${spring.kafka.producer.retries}")
  19. private int retries;
  20. @Value("${spring.kafka.producer.batch.size}")
  21. private int batchSize;
  22. @Value("${spring.kafka.producer.linger}")
  23. private int linger;
  24. @Value("${spring.kafka.producer.buffer.memory}")
  25. private int bufferMemory;
  26. // 创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
  27. public Map<String, Object> producerConfigs() {
  28. Map<String, Object> props = new HashMap<>();
  29. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  30. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  31. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  32. props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
  33. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  34. // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "cn.ztuo.bitrade.kafka.kafkaPartitioner");
  35. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  36. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  37. return props;
  38. }
  39. /**
  40. * 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义
  41. * @return
  42. */
  43. public ProducerFactory<String, String> producerFactory() {
  44. return new DefaultKafkaProducerFactory<>(producerConfigs());
  45. }
  46. /**
  47. * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
  48. * @return
  49. */
  50. @Bean
  51. public KafkaTemplate<String, String> kafkaTemplate() {
  52. return new KafkaTemplate<String, String>(producerFactory());
  53. }
  54. }

消费端自定义配置例子:

  1. package cn.ztuo.bitrade.config;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.apache.kafka.common.serialization.StringDeserializer;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.annotation.EnableKafka;
  10. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  11. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  12. import org.springframework.kafka.core.ConsumerFactory;
  13. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  14. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  15. //这里创建了对应类型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的对应Bean定义将不起作用。
  16. @Configuration
  17. @EnableKafka
  18. public class KafkaConsumerConfiguration {
  19. @Value("${spring.kafka.bootstrap-servers}")
  20. private String servers;
  21. @Value("${spring.kafka.consumer.enable.auto.commit}")
  22. private boolean enableAutoCommit;
  23. @Value("${spring.kafka.consumer.session.timeout}")
  24. private String sessionTimeout;
  25. @Value("${spring.kafka.consumer.auto.commit.interval}")
  26. private String autoCommitInterval;
  27. @Value("${spring.kafka.consumer.group.id}")
  28. private String groupId;
  29. @Value("${spring.kafka.consumer.auto.offset.reset}")
  30. private String autoOffsetReset;
  31. @Value("${spring.kafka.consumer.concurrency}")
  32. private int concurrency;
  33. @Value("${spring.kafka.consumer.maxPollRecordsConfig}")
  34. private int maxPollRecordsConfig;
  35. //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
  36. public Map<String, Object> consumerConfigs() {
  37. Map<String, Object> propsMap = new HashMap<>();
  38. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  39. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  40. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
  41. propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
  42. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  43. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  44. propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  45. propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  46. propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);// 每个批次获取数
  47. return propsMap;
  48. }
  49. /**
  50. * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
  51. * @return
  52. */
  53. public ConsumerFactory<String, String> consumerFactory() {
  54. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  55. }
  56. @Bean
  57. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  58. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  59. factory.setConsumerFactory(consumerFactory());
  60. factory.setConcurrency(concurrency);
  61. factory.setMissingTopicsFatal(false);
  62. factory.getContainerProperties().setPollTimeout(1500);
  63. factory.setBatchListener(true);
  64. return factory;
  65. }
  66. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/870305
推荐阅读
相关标签
  

闽ICP备14008679号