赞
踩
问题场景:我的一个微服务A会在本地缓存一些业务配置数据,配置更新时由相关服务B发送
问题场景:
我的一个微服务A会在本地缓存一些业务配置数据,配置更新时由相关服务B发送一个变动消息。A收到消息更新本地缓存。那么问题来了,同一个服务A的多个实例a1,a2如何多次消费同一个topic消息
解决方案:
通过redis setif 加代码动态配置groupID、不同实例获取不动groupID。启动的时候会配置kafka消费工厂ConsumerFactory 这个时候生成groupID
setIfAbsent()redis。如果失败就继续生成直到成功
配置代码如下:
- package com.jieshun.open.config;
-
- import com.jieshun.open.util.LocalMacUtils;
- import com.jieshun.open.validate.StringUtils;
- import javafx.application.Application;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.SpringApplication;
- import org.springframework.context.ConfigurableApplicationContext;
- import org.springframework.context.annotation.Bean;
- import org.springframework.core.env.Environment;
- import org.springframework.data.redis.core.StringRedisTemplate;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
- import org.springframework.stereotype.Component;
-
- import java.net.UnknownHostException;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- /**
- * @Description
- * @Date 2021-7-23 16:04
- * @Created by yyk
- */
- @Slf4j
- @Component
- public class KafkaConsumerConfig {
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String BROKERS;
- @Value("${spring.kafka.consumer.enable-auto-commit}")
- private Boolean ENABLE_AUTO_COMMIT;
- @Value("${spring.kafka.consumer.auto-commit-interval-ms}")
- private String AUTO_COMMIT_INTERVAL_MS;
- @Value("${spring.kafka.consumer.session-timeout-ms}")
- private Integer SESSION_TIMEOUT_MS;
- @Value("${spring.kafka.consumer.auto-offset-reset}")
- private String AUTO_OFFSET_RESET;
- @Value("${spring.kafka.consumer.group-id}")
- private String GROUP_ID;
- @Value("${spring.kafka.consumer.max-poll-records}")
- private String MAX_POLL_RECORDS;
- @Value("${server.port}")
- private int serverPort;
- /**缓存名称前缀*/
- private final String CACHE_GROUP_NAME_PREFIX = "jop_gateway_group:";
- private final String KAFKA_GROUP="gateway_kafka_group";
- private String CURRENT_INSTANCE_GROUP_ID;
-
- @Autowired
- private StringRedisTemplate redisTemplate;
-
-
- /**构建kafka监听工厂*/
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() throws Exception {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
- factory.setConsumerFactory(consumerFactory());
- return factory;
- }
-
-
- /**通过redis限制获取的分组名称*/
- public String getSerializeGroupId(Integer currValue) throws Exception {
- String ip = LocalMacUtils.getLocalIp();
- String mac = LocalMacUtils.getLocalMac();
- String key=ip+"&"+serverPort+"&"+mac;
- String value = CACHE_GROUP_NAME_PREFIX.concat(currValue.toString());
- List<Object> values = redisTemplate.opsForHash().values(KAFKA_GROUP);
- if(values.stream().filter(p->p.toString().equals(value)).count()>0){
- currValue++;
- return getSerializeGroupId(currValue);
- }
- Object group = redisTemplate.opsForHash().get(KAFKA_GROUP, key);
- if(group==null|| StringUtils.isEmpty(group.toString())){
- redisTemplate.opsForHash().put(KAFKA_GROUP,key,value);
- return value;
- }
- return group.toString();
- }
-
-
- /**初始化消费工厂配置 其中会动态指定消费分组*/
- private ConsumerFactory<String, String> consumerFactory() throws Exception {
- Map<String, Object> properties = new HashMap<String, Object>();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS);
- properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- /**多实例部署每个实例设置不同groupId 实现发布订阅*/
- CURRENT_INSTANCE_GROUP_ID = getSerializeGroupId(0);
- log.info("当前实例kafka分组id---{}",CURRENT_INSTANCE_GROUP_ID);
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, CURRENT_INSTANCE_GROUP_ID);
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET);
- return new DefaultKafkaConsumerFactory<String, String>(properties);
- }
-
-
-
- }
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。