赞
踩
Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式, 本文讨论订阅/发布到频道的实现
该种模型类似于RocketMQ中广播模式,消费者订阅topic
如图展示了发布消息到channel1
后,各个client都会接收到message
虽然Redis能够实现发布/订阅的功能,但是有如下缺点,所以选用前需谨慎考虑
和常规的MQ不同,redis实现的发布/订阅模型消息无法持久化,一经发布,即使没有任何订阅方处理,该条消息就会丢失
即发布方不会确保订阅方成功接收
广播机制无法通过添加多个消费方增强消费能力,因为这和发布/订阅模型本身的目的是不符的.广播机制的目的是一个一个发布者被多个订阅进行不同的处理
由于Redis发布/订阅模型存在的缺陷,所以使用前需要考虑如下几点
具体使用还是需要考虑业务场景需求
在目前SpringBoot使用Redis的操作中,官方推荐使用SpringData模块中的spring-data-redis
,所以下文会以spring-data-redis
进行
下文需要对Springboot工程有一定的基础认识
- <dependency>
- <groupId>org.springframework.data</groupId>
- <artifactId>spring-data-redis</artifactId>
- </dependency>
序列化使用的是GenericJackson2JsonRedisSerializer
,使用这个类可以正确序列化Null的对象.如果使用Jackson2JsonRedisSerializer
,会将对象序列号成空数组.
- @Configuration
- public class RedisConfig {
-
- @Bean
- public RedisTemplate<Object, Object> redisTemplate(RedisSerializer<Object> redisSerializer, RedisConnectionFactory redisConnectionFactory) {
- RedisTemplate<Object, Object> template = new RedisTemplate<>();
- template.setConnectionFactory(redisConnectionFactory);
- template.setKeySerializer(RedisSerializer.string());
- template.setDefaultSerializer(redisSerializer);
- return template;
- }
-
- @Bean
- public RedisSerializer<Object> redisSerializer(){
- GenericJackson2JsonRedisSerializer redisSerializer = new GenericJackson2JsonRedisSerializer();
- return redisSerializer;
- }
- }
-
简单起见,在这里使用SpringSchedule,周期性发布消息
- @EnableScheduling
- @Component
- public class RedisPublisher {
-
- private static final Logger log = LoggerFactory.getLogger(RedisPublisher.class);
-
- @Autowired
- private RedisTemplate<Object, Object> redisTemplate;
-
- private AtomicInteger incrInteger = new AtomicInteger();
-
-
- @Scheduled(initialDelay = 500, fixedDelay = 10000)
- public void publish() {
- int incrementAndGet = incrInteger.incrementAndGet();
- String topic = "redis/test";
- String message = "current num : " + incrementAndGet;
- log.info("发布消息..topic:{},内容:{}", topic, message);
- redisTemplate.convertAndSend(topic, message);
- }
- }
在订阅程序中,有两个比较重要的类,分别是MessageListenerAdapter
和RedisMessageListenerContainer
其中MessageListenerAdapter
实现MessageListener
作用是将自定义的消费类进行适配.这个类必须有一个public
的消费方法,并且方法需要有两个参数,arg1为channel
,arg2是Message
.原因可以在MessageListenerAdapter
源码中发现
在MessageListenerAdapter.onMessage
方法中,通过反射对消费类进行了方法调用,并且方法的参数和顺序进行了硬编码,所以必须在消费类中提供一个public方法
从官方文档中,可以得知RedisMessageListenerContainer
的作用是用于接收消息后进行分发,并且通过内部的线程池进行异步分发,(也可以使用自定义的线程池和相关失败策略)
- @Configuration
- public class ConsumerConfig {
-
- @Bean
- public MessageListenerAdapter processorOne(RedisSerializer<Object> serializer, RedisConsumer redisConsumer) {
-
- MessageListenerAdapter adapter = new MessageListenerAdapter(redisConsumer, "onMessage");
- adapter.setSerializer(serializer);
- return adapter;
- }
-
- /**
- *
- * 支持动态添加监听
- *
- * @param adapter
- * @return
- */
- @Bean
- public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory redisConnectionFactory,
- MessageListenerAdapter adapter) {
- RedisMessageListenerContainer container = new RedisMessageListenerContainer();
- container.setConnectionFactory(redisConnectionFactory);
- //制定topic的序列化方式,String
- container.setTopicSerializer(RedisSerializer.string());
- //添加监听
- container.addMessageListener(adapter, new PatternTopic("redis/**"));
- return container;
- }
-
- }
启动一个发布者,两个订阅者
由于先启动的发布者,所以部分已经发布的消息,会直接被丢弃,这也是Redis发布订阅模型的一个缺点
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。