当前位置:   article > 正文

RedisMessageListenerContainer 监听redis的key的变化

redismessagelistenercontainer

RedisMessageListenerContainer 监听redis的key的变化

maven依赖

		<!-- redis -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.20.0</version>
		</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

配置文件


  redis:
    database: 0
    # Redis服务器地址
    host: 127.0.0.1
    # Redis服务器连接端口
    port: 8973
    # Redis服务器连接密码(默认为空)
    password: xxxx@2023
    # 连接超时时间(毫秒)
    timeout: 1000
    listen:
      # 监听新增或者修改事件(这里是两个短下划线)
      keyspace: __keyspace@${spring.redis.database}__:house:floor:*


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

1.RedisMessageListenerContainer的监听流程:

用于消息监听,需要将 Topic和MessageListener 注册到 RedisMessageListenerContainer 中。
当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener,MessageListener通过 onMessage 方法拿到消息后,自行处理。

package com.example.demo.mqttRedisListener.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;

/**
 * redis监听器:监听redis的key的变化
 */
@Configuration
@Slf4j
public class RedisMessageListener {
    @Value("${spring.redis.listen.keyspace}")
    public String patternKeyspace;

    @Value("${spring.redis.host}")
    public String host;


    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定
     *
     * @param redisConnectionFactory Redis 连接的线程安全工厂
     * @param topicMessageListener 实际处理的监听器
     * @return
     */
    @Bean
    public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnectionFactory,
                                                           MyTopicMessageListener topicMessageListener) {
        log.info("pattern_set:" + patternKeyspace);
        log.info("当前redis:" + host);
        /*
        模式   __keyspace@0__:xx:xxx_xxs:*

        在 Redis 中 __keyspace@0__ 是一个特殊的键空间,用于监听键的事件。
        0 是 Redis 数据库的索引,表示监听的是索引为 0 的数据库。
        xx:xxx_xxs:* 是一个模式匹配的键,* 表示匹配任意字符。

        监听 Redis 数据库索引为 0 的数据库中以 xx:xxx_xxs: 开头的键的新增或修改事件。当有键被新增或修改时,该监听器会触发相应的事件处理逻辑。
         */

        RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
        listenerContainer.setConnectionFactory(redisConnectionFactory);
        Topic topic_set = new PatternTopic(patternKeyspace);
        /*RedisMessageListenerContainer 用于消息监听,需要将 Topic和MessageListener 注册到 RedisMessageListenerContainer 中。
        当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener,
        MessageListener通过 onMessage 方法拿到消息后,自行处理。*/
        listenerContainer.addMessageListener(topicMessageListener, topic_set);
        return listenerContainer;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

2.topic模式匹配

模式 (这里是两个短下划线)  __keyspace@0__:rx:xxx_xxs:*
  • 1
  1. “——keyspace@0——” : 在 Redis 中是一个特殊的键空间,用于监听键的事件。
  2. 0 是 Redis 数据库的索引,表示监听的是索引为 0 的数据库。
  3. rx:xxx_xxs:* 是一个模式匹配的键,* 表示匹配任意字符。

作用:监听 Redis 数据库索引为 0 的数据库中以 rx:xxx_xxs:开头的键的事件。键发生新增、修改、过期等redis操作,该监听器会触发相应的事件处理逻辑。

3.自定义监听器MessageListener

通过 onMessage 方法拿到消息后,自行处理,写入我们的处理逻辑。

@Component
@Slf4j
public class MyTopicMessageListener implements MessageListener {

    @Autowired
    private MqttSendService mqttSendService;
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 发布 事件
     *
     * @param message 监听key的操作类型  主要是设值和过期。set ,expired ,del
     * @param bytes   监听的主题
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        String key = new String(message.getBody());//set,expired,del
        String type = new String(message.getChannel());//__keyspace@0__:house:floor:cc
        String redisKey = type.substring(15);//house:floor:cc
        if (key.contains(RedisCommndEnum.SET.getCommand())) {
            log.info("收到key设值(更新)的消息:操作{},redisKey={}", key, redisKey);
            //查询
            String content = (String) redissonClient.getBucket(redisKey).get();
            //把redis的分隔符:替换成 mqtt的分隔符/
            String topic = redisKey.replace(":", "/");
            //发送到mqtt
            mqttSendService.sendToMqtt(topic, 0, content);
            log.info("发送到mqtt,topic={},content={}", topic, content);
        } else if (key.contains(RedisCommndEnum.EXPIRED.getCommand())) {
            log.info("收到key过期的消息:操作{},redisKey={}", key, redisKey);
            //...todo
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/791496
推荐阅读
相关标签
  

闽ICP备14008679号