当前位置:   article > 正文

Redis实现不可靠发布/订阅功能

redis消费者订阅者是否安全

Redis的发布/订阅模型

Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式, 本文讨论订阅/发布到频道的实现

该种模型类似于RocketMQ中广播模式,消费者订阅topic

如图展示了发布消息到channel1后,各个client都会接收到message

2786935-c4b79f58074f442c.png
image.png

虽然Redis能够实现发布/订阅的功能,但是有如下缺点,所以选用前需谨慎考虑

  • 1.消息无法持久化,存在丢失风险

和常规的MQ不同,redis实现的发布/订阅模型消息无法持久化,一经发布,即使没有任何订阅方处理,该条消息就会丢失

  • 2.没有类似ACK的机制

即发布方不会确保订阅方成功接收

  • 3.广播机制,下游消费能力取决于消费方本身

广播机制无法通过添加多个消费方增强消费能力,因为这和发布/订阅模型本身的目的是不符的.广播机制的目的是一个一个发布者被多个订阅进行不同的处理

Redis发布/订阅应用场景

由于Redis发布/订阅模型存在的缺陷,所以使用前需要考虑如下几点

  • 1.对于消息处理可靠性要求不强
  • 2.消费能力无需通过增加消费方进行增强
    考虑如上两点后,可以想到的场景有如下
  • 1.用户注册后,发送相关优惠信息
  • 2.用户修改名称,由于有业务表对用户名称进行了字段冗余,通过订阅修改名称的channel,触发各个业务表的字段修改

具体使用还是需要考虑业务场景需求

SpringBoot使用Redis的发布/订阅功能

在目前SpringBoot使用Redis的操作中,官方推荐使用SpringData模块中的spring-data-redis,所以下文会以spring-data-redis进行

下文需要对Springboot工程有一定的基础认识

1.Maven依赖redis组件

  1. <dependency>
  2. <groupId>org.springframework.data</groupId>
  3. <artifactId>spring-data-redis</artifactId>
  4. </dependency>

2.redis序列化相关配置

序列化使用的是GenericJackson2JsonRedisSerializer,使用这个类可以正确序列化Null的对象.如果使用Jackson2JsonRedisSerializer,会将对象序列号成空数组.

  1. @Configuration
  2. public class RedisConfig {
  3. @Bean
  4. public RedisTemplate<Object, Object> redisTemplate(RedisSerializer<Object> redisSerializer, RedisConnectionFactory redisConnectionFactory) {
  5. RedisTemplate<Object, Object> template = new RedisTemplate<>();
  6. template.setConnectionFactory(redisConnectionFactory);
  7. template.setKeySerializer(RedisSerializer.string());
  8. template.setDefaultSerializer(redisSerializer);
  9. return template;
  10. }
  11. @Bean
  12. public RedisSerializer<Object> redisSerializer(){
  13. GenericJackson2JsonRedisSerializer redisSerializer = new GenericJackson2JsonRedisSerializer();
  14. return redisSerializer;
  15. }
  16. }

3.配置发布方法

简单起见,在这里使用SpringSchedule,周期性发布消息

  1. @EnableScheduling
  2. @Component
  3. public class RedisPublisher {
  4. private static final Logger log = LoggerFactory.getLogger(RedisPublisher.class);
  5. @Autowired
  6. private RedisTemplate<Object, Object> redisTemplate;
  7. private AtomicInteger incrInteger = new AtomicInteger();
  8. @Scheduled(initialDelay = 500, fixedDelay = 10000)
  9. public void publish() {
  10. int incrementAndGet = incrInteger.incrementAndGet();
  11. String topic = "redis/test";
  12. String message = "current num : " + incrementAndGet;
  13. log.info("发布消息..topic:{},内容:{}", topic, message);
  14. redisTemplate.convertAndSend(topic, message);
  15. }
  16. }

4.订阅配置

在订阅程序中,有两个比较重要的类,分别是MessageListenerAdapterRedisMessageListenerContainer
其中MessageListenerAdapter实现MessageListener作用是将自定义的消费类进行适配.这个类必须有一个public的消费方法,并且方法需要有两个参数,arg1为channel,arg2是Message.原因可以在MessageListenerAdapter源码中发现

4.1MessageListenerAdapter和一个自定义消费类

MessageListenerAdapter.onMessage方法中,通过反射对消费类进行了方法调用,并且方法的参数和顺序进行了硬编码,所以必须在消费类中提供一个public方法

2786935-a190e872442af910.png
image.png
4.2RedisMessageListenerContainer

从官方文档中,可以得知RedisMessageListenerContainer的作用是用于接收消息后进行分发,并且通过内部的线程池进行异步分发,(也可以使用自定义的线程池和相关失败策略)

4.3完整订阅配置
  1. @Configuration
  2. public class ConsumerConfig {
  3. @Bean
  4. public MessageListenerAdapter processorOne(RedisSerializer<Object> serializer, RedisConsumer redisConsumer) {
  5. MessageListenerAdapter adapter = new MessageListenerAdapter(redisConsumer, "onMessage");
  6. adapter.setSerializer(serializer);
  7. return adapter;
  8. }
  9. /**
  10. *
  11. * 支持动态添加监听
  12. *
  13. * @param adapter
  14. * @return
  15. */
  16. @Bean
  17. public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory redisConnectionFactory,
  18. MessageListenerAdapter adapter) {
  19. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  20. container.setConnectionFactory(redisConnectionFactory);
  21. //制定topic的序列化方式,String
  22. container.setTopicSerializer(RedisSerializer.string());
  23. //添加监听
  24. container.addMessageListener(adapter, new PatternTopic("redis/**"));
  25. return container;
  26. }
  27. }

启动发布者和订阅者,查看日志

启动一个发布者,两个订阅者

由于先启动的发布者,所以部分已经发布的消息,会直接被丢弃,这也是Redis发布订阅模型的一个缺点

发布者日志
2786935-5080e1fc5c804d1b.png
image.png
订阅者1日志
2786935-d764370e17d7d9fd.png
image.png
订阅者2日志
2786935-d94c3790ded08c5d.png
image.png
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/472489
推荐阅读
相关标签
  

闽ICP备14008679号