当前位置:   article > 正文

Redis 消息发布订阅 与 Spring boot 使用快速入门_redis消息订阅与发布springboot

redis消息订阅与发布springboot

目录

Redis 消息发布订阅

Spring boot 实现消息发布订阅

发布消息

消息监听

主题订阅

Spring boot 监听 Key 过期事件

消息监听

主题订阅


Redis 消息发布订阅

Reis Pub/Sub:REDIS pubsub -- Redis中国用户组(CRUG)

Redis 发布订阅 命令:redis命令手册

1、Redis 中"pub/sub"的消息,为"即发即失",server 不会保存消息,如果 publish 的消息没有任何 client 处于 "subscribe" 状态,消息将会被丢弃;如果 client 在 subcribe 时,链接断开后重连,那在么此期间的消息也将丢失。

2、Redis server 将会"尽力"将消息发送给处于 subscribe 状态的 client,但是仍不会保证每条消息都能被正确接收。

命令描述
Redis Unsubscribe 命令指退订给定的频道。UNSUBSCRIBE channel [channel ...]
Redis Subscribe 命令订阅给定的一个或多个频道的信息。SUBSCRIBE channel [channel ...]
Redis Pubsub 命令查看订阅与发布系统状态。PUBSUB <subcommand> [argument [argument ...]]
Redis Punsubscribe 命令退订所有给定模式的频道。PUNSUBSCRIBE [pattern [pattern ...]]
Redis Publish 命令将信息发送到指定的频道。PUBLISH channel message
Redis Psubscribe 命令

订阅一个或多个符合给定模式的频道。PSUBSCRIBE pattern [pattern ...]

每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等)

Spring boot 实现消息发布订阅

1、引入 Redis 依赖:https://gitee.com/wangmx1993/thymeleafapp/blob/master/pom.xml

  1. <!--Spring Boot redis 启动器-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-data-redis</artifactId>
  5. </dependency>

 2、Redis 数据库配置:https://gitee.com/wangmx1993/thymeleafapp/blob/master/src/main/resources/application.yml

  1. spring:
  2. data:
  3. redis:
  4. database: 0
  5. host: localhost
  6. port: 6379
  7. password:

环境:Java Jdk 1.8 + Spring boot 2.1.3 + Redis 5.0 。

发布消息

  1. /**
  2. * redis 将信息发送到指定的频道
  3. * http://localhost:8080/redis/pub?topic=memoryCache&context=flushAll
  4. * http://localhost:8080/redis/pub?topic=basic-service&context=flushDb
  5. *
  6. * @param topic :消息所属的主题/频道
  7. * @param context :消息内容
  8. * @return
  9. */
  10. @GetMapping("redis/pub")
  11. @ResponseBody
  12. public Map<String, Object> retryable(@RequestParam String topic, @RequestParam Object context) {
  13. /**
  14. * void convertAndSend(String destination, Object message): 将给定消息发布到给定通道
  15. * 1、 destination:要发布到的通道不能为 null
  16. */
  17. redisTemplate.convertAndSend(topic, context);
  18. Map<String, Object> dataMap = new HashMap<>(4);
  19. dataMap.put(topic, context);
  20. return dataMap;
  21. }

https://gitee.com/wangmx1993/thymeleafapp/blob/master/src/main/java/com/wmx/thymeleafapp/controller/ExampleController.java

消息监听

1、 Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理。

  1. /**
  2. * Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理
  3. * <p>
  4. * 1、可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
  5. * 2、自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
  6. *
  7. * @author wangMaoXiong
  8. * @version 1.0
  9. * @date 2022/5/21 16:00
  10. */
  11. @Component
  12. public class RedisSubListener implements MessageListener {
  13. // 直接从容器中获取
  14. @Resource
  15. private RedisTemplate redisTemplate;
  16. /**
  17. * 监听到的消息必须进行与发送时相同的方式进行反序列
  18. * 1、订阅端与发布端 Redis 序列化的方式必须相同,否则会乱码。
  19. *
  20. * @param message :消息实体
  21. * @param pattern :匹配模式
  22. */
  23. @Override
  24. public void onMessage(Message message, byte[] pattern) {
  25. // 消息订阅的匹配规则,如 new PatternTopic("basic-*") 中的 basic-*
  26. String msgPattern = new String(pattern);
  27. // 消息所属的通道,可以根据不同的通道做不同的业务逻辑
  28. String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
  29. // 接收的消息内容,可以根据自己需要强转为自己需要的对象,但最好先使用 instanceof 判断一下
  30. Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());
  31. // 收到 Redis 订阅消息: channel=basic-service body=flushDb pattern=basic-*
  32. // 收到 Redis 订阅消息: channel=memoryCache body=flushAll pattern=memoryCache
  33. log.info("收到 Redis 订阅消息: channel={} body={} pattern={} ", channel, body, msgPattern);
  34. // 手动延迟,模拟数据处理
  35. ThreadUtil.safeSleep(800 + new Random().nextInt(3000));
  36. log.info("------------数据处理完成.......");
  37. }
  38. }

https://gitee.com/wangmx1993/thymeleafapp/blob/master/src/main/java/com/wmx/thymeleafapp/listeners/RedisSubListener.java

主题订阅

1、自定义 RedisTemplate 序列化方式(发布者和订阅者必须相同)。

2、配置主题订阅 - Redis 消息监听器绑定监听指定通道。

  1. /**
  2. * 自定义 RedisTemplate 序列化方式
  3. * 配置主题订阅 - Redis 消息监听器绑定监听指定通道
  4. *
  5. * @author wangMaoXiong
  6. * @version 1.0
  7. * @date 2022/5/21 16:13
  8. */
  9. @Configuration
  10. public class RedisConfig {
  11. // 自定义的消息订阅监听器,当收到阅订的消息时,会将消息交给这个类处理
  12. @Resource
  13. private RedisSubListener redisSubListener;
  14. // 自定义 RedisTemplate 序列化方式
  15. @Bean
  16. public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
  17. RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();//创建 RedisTemplate,key 和 value 都采用了 Object 类型
  18. redisTemplate.setConnectionFactory(redisConnectionFactory);//绑定 RedisConnectionFactory
  19. //创建 Jackson2JsonRedisSerializer 序列方式,对象类型使用 Object 类型,
  20. Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
  21. ObjectMapper objectMapper = new ObjectMapper();
  22. objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
  23. objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
  24. jackson2JsonRedisSerializer.setObjectMapper(objectMapper);//设置一下 jackJson 的 ObjectMapper 对象参数
  25. // 设置 RedisTemplate 序列化规则。因为 key 通常是普通的字符串,所以使用 StringRedisSerializer 即可。
  26. // 而 value 是对象时,才需要使用序列化与反序列化
  27. redisTemplate.setKeySerializer(new StringRedisSerializer());// key 序列化规则
  28. redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value 序列化规则
  29. redisTemplate.setHashKeySerializer(new StringRedisSerializer());// hash key 序列化规则
  30. redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// hash value 序列化规则
  31. redisTemplate.afterPropertiesSet();//属性设置后操作
  32. return redisTemplate;//返回设置好的 RedisTemplate
  33. }
  34. /**
  35. * 配置主题订阅
  36. * RedisMessageListenerContainer - Redis 消息监听器绑定监听指定通道
  37. * 1、可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
  38. * 2、订阅的通道可以配置在全局配置文件中,也可以配置在数据库中,
  39. * <p>
  40. * addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
  41. * addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
  42. *
  43. * @param connectionFactory
  44. * @return
  45. */
  46. @Bean
  47. public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
  48. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  49. // 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
  50. container.setConnectionFactory(connectionFactory);
  51. // 订阅名称叫 memoryCache 的通道, 类似 Redis 中的 subscribe 命令
  52. container.addMessageListener(redisSubListener, new ChannelTopic("memoryCache"));
  53. // 订阅名称以 'basic-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
  54. container.addMessageListener(redisSubListener, new PatternTopic("basic-*"));
  55. return container;
  56. }
  57. }

https://gitee.com/wangmx1993/thymeleafapp/blob/master/src/main/java/com/wmx/thymeleafapp/config/RedisConfig.java

Spring boot 监听 Key 过期事件

1、Redis 数据库可以通过命令设置 Key 的有效时间,当一个 Key 过期后会自动从数据库中删除,释放空间。得益于于这个特性,可以很轻松地实现诸多类似于 “Session” 管理、数据缓存等功能。它们都有一个共同点就是,数据不会永久保存!

2、在有些场景中,可能希望在某些 Key 过期的时候获取到通知,进行一些业务处理。或者是干脆用于 “定时通知/任务” 功能,例如:下单 30 分钟后未支付,则取消订单。那么可以在用户下单的时候使用订单号作为 key 设置到 Redis 数据库中,并且设置过期时间为 30 分钟。当超时后,可以在 “key 过期通知” 中获取到 key 也就是订单号,判断用户是否已经支付从而是否取消订单。

3、Redis 的 Key 过期通知功能本质上是通过 发布/订阅 功能实现的,所以它「不能保证通知消息的交付」,当 Key 过期时如果服务器停机、重启后则该通知消息会永久丢失。

消息监听

1、Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。

2、doHandleMessage 方法用于处理 Redis Key 过期通知事件,其中 Message 参数表示通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。

3、在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.data.redis.connection.Message;
  4. import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
  5. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * Redis 缓存 Key 过期监听器
  9. * Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,
  10. * 自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。
  11. *
  12. * @author wangMaoXiong
  13. * @version 1.0
  14. * @date 2023/11/17 17:37
  15. */
  16. @Component
  17. public class KeyExpireListener extends KeyExpirationEventMessageListener {
  18. private static final Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);
  19. /**
  20. * 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener
  21. *
  22. * @param listenerContainer : Redis消息侦听器容器
  23. */
  24. public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {
  25. super(listenerContainer);
  26. }
  27. /**
  28. * doHandleMessage 方法用于处理 Redis Key 过期通知事件,
  29. * 在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」
  30. *
  31. * @param message:通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。
  32. */
  33. @Override
  34. public void doHandleMessage(Message message) {
  35. // 过期的 key
  36. String key = new String(message.getBody());
  37. // 消息通道
  38. String channel = new String(message.getChannel());
  39. logger.info("过期key={} 消息通道(channel)={}", key, channel);
  40. }
  41. }

src/main/java/com/wmx/thymeleafapp/listeners/KeyExpireListener.java · 汪少棠/thymeleafapp - Gitee.com

主题订阅

1、与上面稍微有点不同,因为 key 过期事件属于 Redis 内部消息,内部频道/通道,所以只需要往容器中注入 RedisMessageListenerContainer 就行,不需要 addMessageListener 手动设置监听器 监听指定的通道/频道(topic 表达式)。

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.data.redis.connection.RedisConnectionFactory;
  4. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  5. @Configuration
  6. public class RedisConfig {
  7. @Bean
  8. public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
  9. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  10. container.setConnectionFactory(factory);
  11. // container.setTaskExecutor(null); // 设置用于执行监听器方法的 Executor
  12. // container.setErrorHandler(null); // 设置监听器方法执行过程中出现异常的处理器
  13. // container.addMessageListener(null, null); // 手动设置监听器 & 监听的 topic 表达式
  14. return container;
  15. }
  16. }

src/main/java/com/wmx/thymeleafapp/config/RedisConfig.java · 汪少棠/thymeleafapp - Gitee.com

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/754493
推荐阅读
相关标签
  

闽ICP备14008679号