当前位置:   article > 正文

springboot kafka 同一服务的多个实例,如何设置成不同的消费组group. 动态group_@kafkalistener 多实例如何设置多个groupid

@kafkalistener 多实例如何设置多个groupid

问题场景:我的一个微服务A会在本地缓存一些业务配置数据,配置更新时由相关服务B发送

问题场景:

我的一个微服务A会在本地缓存一些业务配置数据,配置更新时由相关服务B发送一个变动消息。A收到消息更新本地缓存。那么问题来了,同一个服务A的多个实例a1,a2如何多次消费同一个topic消息

解决方案:

通过redis setif 加代码动态配置groupID、不同实例获取不动groupID。启动的时候会配置kafka消费工厂ConsumerFactory 这个时候生成groupID
setIfAbsent()redis。如果失败就继续生成直到成功

配置代码如下:

  1. package com.jieshun.open.config;
  2. import com.jieshun.open.util.LocalMacUtils;
  3. import com.jieshun.open.validate.StringUtils;
  4. import javafx.application.Application;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.kafka.clients.consumer.ConsumerConfig;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.boot.SpringApplication;
  11. import org.springframework.context.ConfigurableApplicationContext;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.core.env.Environment;
  14. import org.springframework.data.redis.core.StringRedisTemplate;
  15. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  16. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  17. import org.springframework.kafka.core.ConsumerFactory;
  18. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  19. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  20. import org.springframework.stereotype.Component;
  21. import java.net.UnknownHostException;
  22. import java.util.HashMap;
  23. import java.util.List;
  24. import java.util.Map;
  25. import java.util.concurrent.Executors;
  26. import java.util.concurrent.ScheduledExecutorService;
  27. import java.util.concurrent.TimeUnit;
  28. /**
  29. * @Description
  30. * @Date 2021-7-23 16:04
  31. * @Created by yyk
  32. */
  33. @Slf4j
  34. @Component
  35. public class KafkaConsumerConfig {
  36. @Value("${spring.kafka.bootstrap-servers}")
  37. private String BROKERS;
  38. @Value("${spring.kafka.consumer.enable-auto-commit}")
  39. private Boolean ENABLE_AUTO_COMMIT;
  40. @Value("${spring.kafka.consumer.auto-commit-interval-ms}")
  41. private String AUTO_COMMIT_INTERVAL_MS;
  42. @Value("${spring.kafka.consumer.session-timeout-ms}")
  43. private Integer SESSION_TIMEOUT_MS;
  44. @Value("${spring.kafka.consumer.auto-offset-reset}")
  45. private String AUTO_OFFSET_RESET;
  46. @Value("${spring.kafka.consumer.group-id}")
  47. private String GROUP_ID;
  48. @Value("${spring.kafka.consumer.max-poll-records}")
  49. private String MAX_POLL_RECORDS;
  50. @Value("${server.port}")
  51. private int serverPort;
  52. /**缓存名称前缀*/
  53. private final String CACHE_GROUP_NAME_PREFIX = "jop_gateway_group:";
  54. private final String KAFKA_GROUP="gateway_kafka_group";
  55. private String CURRENT_INSTANCE_GROUP_ID;
  56. @Autowired
  57. private StringRedisTemplate redisTemplate;
  58. /**构建kafka监听工厂*/
  59. @Bean
  60. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() throws Exception {
  61. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
  62. factory.setConsumerFactory(consumerFactory());
  63. return factory;
  64. }
  65. /**通过redis限制获取的分组名称*/
  66. public String getSerializeGroupId(Integer currValue) throws Exception {
  67. String ip = LocalMacUtils.getLocalIp();
  68. String mac = LocalMacUtils.getLocalMac();
  69. String key=ip+"&"+serverPort+"&"+mac;
  70. String value = CACHE_GROUP_NAME_PREFIX.concat(currValue.toString());
  71. List<Object> values = redisTemplate.opsForHash().values(KAFKA_GROUP);
  72. if(values.stream().filter(p->p.toString().equals(value)).count()>0){
  73. currValue++;
  74. return getSerializeGroupId(currValue);
  75. }
  76. Object group = redisTemplate.opsForHash().get(KAFKA_GROUP, key);
  77. if(group==null|| StringUtils.isEmpty(group.toString())){
  78. redisTemplate.opsForHash().put(KAFKA_GROUP,key,value);
  79. return value;
  80. }
  81. return group.toString();
  82. }
  83. /**初始化消费工厂配置 其中会动态指定消费分组*/
  84. private ConsumerFactory<String, String> consumerFactory() throws Exception {
  85. Map<String, Object> properties = new HashMap<String, Object>();
  86. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
  87. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT);
  88. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS);
  89. properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
  90. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  91. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  92. /**多实例部署每个实例设置不同groupId 实现发布订阅*/
  93. CURRENT_INSTANCE_GROUP_ID = getSerializeGroupId(0);
  94. log.info("当前实例kafka分组id---{}",CURRENT_INSTANCE_GROUP_ID);
  95. properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID);
  96. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
  97. return new DefaultKafkaConsumerFactory<String, String>(properties);
  98. }
  99. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/419840
推荐阅读
相关标签
  

闽ICP备14008679号