当前位置:   article > 正文

springboot使用@KafkaListener监听多个kafka配置

kafkalistener

        背景: 使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用

        方案: 可以手动读取不同kafka配置信息, 创建不同的Kafka 监听容器工厂, 使用@KafkaListener时指定相应的容器工厂, 代码如下:

1. 导入依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2. yml配置

  1. kafka:
  2. # 默认消费者配置
  3. default-consumer:
  4. # 自动提交已消费offset
  5. enable-auto-commit: true
  6. # 自动提交间隔时间
  7. auto-commit-interval: 1000
  8. # 消费的超时时间
  9. poll-timeout: 1500
  10. # 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
  11. auto.offset.reset: latest
  12. # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
  13. session.timeout.ms: 120000
  14. # 消费请求超时时间
  15. request.timeout.ms: 180000
  16. # 1号kafka配置
  17. test1:
  18. bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
  19. consumer:
  20. group-id: xxx
  21. sasl.mechanism: xxxx
  22. security.protocol: xxxx
  23. sasl.jaas.config: xxxx
  24. # 2号kafka配置
  25. test2:
  26. bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
  27. consumer:
  28. group-id: xxx
  29. sasl.mechanism: xxxx
  30. security.protocol: xxxx
  31. sasl.jaas.config: xxxx

3. 容器工厂配置

  1. package com.zhdx.modules.backstage.config;
  2. import com.google.common.collect.Maps;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.common.config.SaslConfigs;
  5. import org.apache.kafka.common.serialization.StringDeserializer;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.cloud.context.config.annotation.RefreshScope;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.kafka.annotation.EnableKafka;
  11. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  12. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  13. import org.springframework.kafka.core.ConsumerFactory;
  14. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  15. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  16. import java.util.Map;
  17. /**
  18. * kafka监听容器工厂配置
  19. * <p>
  20. * 拓展其他消费者配置只需配置指定的属性和bean即可
  21. */
  22. @EnableKafka
  23. @Configuration
  24. @RefreshScope
  25. public class KafkaListenerContainerFactoryConfig {
  26. /**
  27. * test1 kafka配置
  28. */
  29. @Value("${kafka.test1.bootstrap-servers}")
  30. private String test1KafkaServerUrls;
  31. @Value("${kafka.test1.consumer.group-id}")
  32. private String test1GroupId;
  33. @Value("${kafka.test1.consumer.sasl.mechanism}")
  34. private String test1SaslMechanism;
  35. @Value("${kafka.test1.consumer.security.protocol}")
  36. private String test1SecurityProtocol;
  37. @Value("${kafka.test1.consumer.sasl.jaas.config}")
  38. private String test1SaslJaasConfig;
  39. /**
  40. * test2 kafka配置
  41. */
  42. @Value("${kafka.test2.bootstrap-servers}")
  43. private String test2KafkaServerUrls;
  44. @Value("${kafka.test2.consumer.group-id}")
  45. private String test2GroupId;
  46. @Value("${kafka.test2.consumer.sasl.mechanism}")
  47. private String test2SaslMechanism;
  48. @Value("${kafka.test2.consumer.security.protocol}")
  49. private String test2SecurityProtocol;
  50. @Value("${kafka.test2.consumer.sasl.jaas.config}")
  51. private String test2SaslJaasConfig;
  52. /**
  53. * 默认消费者配置
  54. */
  55. @Value("${kafka.default-consumer.enable-auto-commit}")
  56. private boolean enableAutoCommit;
  57. @Value("${kafka.default-consumer.poll-timeout}")
  58. private int pollTimeout;
  59. @Value("${kafka.default-consumer.auto.offset.reset}")
  60. private String autoOffsetReset;
  61. @Value("${kafka.default-consumer.session.timeout.ms}")
  62. private int sessionTimeoutMs;
  63. @Value("${kafka.default-consumer.request.timeout.ms}")
  64. private int requestTimeoutMs;
  65. /**
  66. * test1消费者配置
  67. */
  68. public Map<String, Object> test1ConsumerConfigs() {
  69. Map<String, Object> props = getDefaultConsumerConfigs();
  70. // broker server地址
  71. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls);
  72. // 消费者组
  73. props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId);
  74. // 加密
  75. props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism);
  76. props.put("security.protocol", test1SecurityProtocol);
  77. // 账号密码
  78. props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig);
  79. return props;
  80. }
  81. /**
  82. * test2消费者配置
  83. */
  84. public Map<String, Object> test2ConsumerConfigs() {
  85. Map<String, Object> props = getDefaultConsumerConfigs();
  86. // broker server地址
  87. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls);
  88. // 消费者组
  89. props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId);
  90. // 加密
  91. props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism);
  92. props.put("security.protocol", test2SecurityProtocol);
  93. // 账号密码
  94. props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig);
  95. return props;
  96. }
  97. /**
  98. * 默认消费者配置
  99. */
  100. private Map<String, Object> getDefaultConsumerConfigs() {
  101. Map<String, Object> props = Maps.newHashMap();
  102. // 自动提交(按周期)已消费offset 批量消费下设置false
  103. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  104. // 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
  105. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
  106. // 消费请求超时时间
  107. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
  108. // 序列化
  109. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  110. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  111. // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
  112. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  113. return props;
  114. }
  115. /**
  116. * 消费者工厂类
  117. */
  118. public ConsumerFactory<String, String> initConsumerFactory(Map<String, Object> consumerConfigs) {
  119. return new DefaultKafkaConsumerFactory<>(consumerConfigs);
  120. }
  121. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> initKafkaListenerContainerFactory(
  122. Map<String, Object> consumerConfigs) {
  123. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  124. factory.setConsumerFactory(initConsumerFactory(consumerConfigs));
  125. // 是否开启批量消费
  126. factory.setBatchListener(false);
  127. // 消费的超时时间
  128. factory.getContainerProperties().setPollTimeout(pollTimeout);
  129. return factory;
  130. }
  131. /**
  132. * 创建test1 Kafka 监听容器工厂。
  133. *
  134. * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象
  135. */
  136. @Bean(name = "test1KafkaListenerContainerFactory")
  137. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test1KafkaListenerContainerFactory() {
  138. Map<String, Object> consumerConfigs = this.test1ConsumerConfigs();
  139. return initKafkaListenerContainerFactory(consumerConfigs);
  140. }
  141. /**
  142. * 创建test2 Kafka 监听容器工厂。
  143. *
  144. * @return KafkaListenerContainerFactory<ConcurrentMessageListenerContainer < String, String>> 返回的 KafkaListenerContainerFactory 对象
  145. */
  146. @Bean(name = "test2KafkaListenerContainerFactory")
  147. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> test2KafkaListenerContainerFactory() {
  148. Map<String, Object> consumerConfigs = this.test2ConsumerConfigs();
  149. return initKafkaListenerContainerFactory(consumerConfigs);
  150. }
  151. }
4. @KafkaListener使用
  1. package com.zhdx.modules.backstage.kafka;
  2. import com.alibaba.fastjson.JSON;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.springframework.kafka.annotation.KafkaListener;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * kafka监听器
  9. */
  10. @Slf4j
  11. @Component
  12. public class test1KafkaListener {
  13. @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx")
  14. public void handleHyPm(ConsumerRecord<String, String> record) {
  15. log.info("消费到topic xxx消息:{}", JSON.toJSONString(record.value()));
  16. }
  17. }
 

        

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/600581
推荐阅读
相关标签
  

闽ICP备14008679号