赞
踩
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.0</version>
</dependency>
redis: database: 0 # Redis服务器地址 host: 127.0.0.1 # Redis服务器连接端口 port: 8973 # Redis服务器连接密码(默认为空) password: xxxx@2023 # 连接超时时间(毫秒) timeout: 1000 listen: # 监听新增或者修改事件(这里是两个短下划线) keyspace: __keyspace@${spring.redis.database}__:house:floor:*
用于消息监听,需要将 Topic和MessageListener 注册到 RedisMessageListenerContainer 中。
当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener,MessageListener通过 onMessage 方法拿到消息后,自行处理。
package com.example.demo.mqttRedisListener.listener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; /** * redis监听器:监听redis的key的变化 */ @Configuration @Slf4j public class RedisMessageListener { @Value("${spring.redis.listen.keyspace}") public String patternKeyspace; @Value("${spring.redis.host}") public String host; /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定 * * @param redisConnectionFactory Redis 连接的线程安全工厂 * @param topicMessageListener 实际处理的监听器 * @return */ @Bean public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnectionFactory, MyTopicMessageListener topicMessageListener) { log.info("pattern_set:" + patternKeyspace); log.info("当前redis:" + host); /* 模式 __keyspace@0__:xx:xxx_xxs:* 在 Redis 中 __keyspace@0__ 是一个特殊的键空间,用于监听键的事件。 0 是 Redis 数据库的索引,表示监听的是索引为 0 的数据库。 xx:xxx_xxs:* 是一个模式匹配的键,* 表示匹配任意字符。 监听 Redis 数据库索引为 0 的数据库中以 xx:xxx_xxs: 开头的键的新增或修改事件。当有键被新增或修改时,该监听器会触发相应的事件处理逻辑。 */ RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); listenerContainer.setConnectionFactory(redisConnectionFactory); Topic topic_set = new PatternTopic(patternKeyspace); /*RedisMessageListenerContainer 用于消息监听,需要将 Topic和MessageListener 注册到 RedisMessageListenerContainer 中。 当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener, MessageListener通过 onMessage 方法拿到消息后,自行处理。*/ listenerContainer.addMessageListener(topicMessageListener, topic_set); return listenerContainer; } }
模式 (这里是两个短下划线) __keyspace@0__:rx:xxx_xxs:*
作用:监听 Redis 数据库索引为 0 的数据库中以 rx:xxx_xxs:开头的键的事件。键发生新增、修改、过期等redis操作,该监听器会触发相应的事件处理逻辑。
通过 onMessage 方法拿到消息后,自行处理,写入我们的处理逻辑。
@Component @Slf4j public class MyTopicMessageListener implements MessageListener { @Autowired private MqttSendService mqttSendService; @Autowired private RedissonClient redissonClient; /** * 发布 事件 * * @param message 监听key的操作类型 主要是设值和过期。set ,expired ,del * @param bytes 监听的主题 */ @Override public void onMessage(Message message, byte[] bytes) { String key = new String(message.getBody());//set,expired,del String type = new String(message.getChannel());//__keyspace@0__:house:floor:cc String redisKey = type.substring(15);//house:floor:cc if (key.contains(RedisCommndEnum.SET.getCommand())) { log.info("收到key设值(更新)的消息:操作{},redisKey={}", key, redisKey); //查询 String content = (String) redissonClient.getBucket(redisKey).get(); //把redis的分隔符:替换成 mqtt的分隔符/ String topic = redisKey.replace(":", "/"); //发送到mqtt mqttSendService.sendToMqtt(topic, 0, content); log.info("发送到mqtt,topic={},content={}", topic, content); } else if (key.contains(RedisCommndEnum.EXPIRED.getCommand())) { log.info("收到key过期的消息:操作{},redisKey={}", key, redisKey); //...todo } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。