当前位置:   article > 正文

springboot中使用redis的发布与订阅_springboot redis发布订阅

springboot redis发布订阅

1、Configuration配置

@Configuration
@ComponentScan({"xxxxx.core.service.redis"})
public class xxxxRedisConfig {
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> xxxRedisTemplate(
            RedisConnectionFactory factory
    ) {
        //由于源码autoConfig中是<Object, Object>,开发中一般直接使用<String,Object>
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(factory);

        //Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        //String的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        //key采用string的序列化
        template.setKeySerializer(stringRedisSerializer);
        //hash的key采用string的序列化
        template.setHashKeySerializer(stringRedisSerializer);
        //value序列化采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    /**
     * Redis消息监听器容器
     * 这个容器加载了RedisConnectionFactory和消息监听器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     *
     * @param redisConnectionFactory 连接工厂
     * @param adapter                适配器
     * @return redis消息监听容器
     */
    @Bean
    @SuppressWarnings("all")
    public RedisMessageListenerContainer xxxRedisMessageListenercontainer(
            RedisConnectionFactory redisConnectionFactory,
            xxxRedisMessageListener listener,
            xxxPrintMessageReceiver printMessageReceiver
            //MessageListenerAdapter adapter
    ) {

        final String xxxWebsocket = "xxxWebsocket"; // 订阅主题
        final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 监听所有库的key过期事件
        container.setConnectionFactory(redisConnectionFactory);
        // 所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息
        // 可以添加多个 messageListener,配置不同的通道
        container.addMessageListener(listener, new PatternTopic(xxxWebsocket));
        container.addMessageListener(xxxMessageListenerAdapter( printMessageReceiver), new PatternTopic(TOPIC_NAME2));

        /**
         * 设置序列化对象
         * 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化
         *         2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息
         */
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        container.setTopicSerializer(seria);

        return container;
    }

    /**
     * 这个地方是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
     * 也有好几个重载方法,这边默认调用处理器的方法 叫OnMessage
     *
     * @param printMessageReceiver
     * @return
     */
    @Bean
    public MessageListenerAdapter xxxMessageListenerAdapter(xxxPrintMessageReceiver printMessageReceiver) {
        MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "xxxReceiveMessage");

        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        receiveMessage.setSerializer(seria);
        return receiveMessage;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
@Slf4j
@Component
public class xxxPrintMessageReceiver{


    public void xxxReceiveMessage(xxxRedisMessageDto messageDto , String channel) {

        // 接收的topic
        log.info("channel:" + channel);

        log.info("message:" + messageDto.getTitle());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2、 消息接收类

@Slf4j
@Component
public class xxxRedisMessageListener implements MessageListener {
    @Resource
    private RedisTemplate xxxRedisTemplate;

    @Override
    public void onMessage(Message message, byte[] bytes) {
        // 接收的topic
        //log.info("channel:" + new String(bytes));

        //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        xxxRedisMessageDto messageDto = (xxxRedisMessageDto) xxxRedisTemplate.getValueSerializer().deserialize(message.getBody());
        //log.info(messageDto.getData()+","+messageDto.getContent());
        xxxWebsocket xxxWebsocket = new xxxWebsocket();
        String msgtxt = JSONSerializer.serialize(messageDto.getMepeprobleminfo());
        xxxWebsocket.sendMessageByUser(messageDto.getMepeprobleminfo().getWorkcenterid(),msgtxt);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3、 消息发布类

 private final String TOPIC_NAME1 = "xxxWebsocket"; // 订阅主题
public void xxxRedisConvertAndSend(Object data) {
        try {
            // 发布消息
            xxxRedisMessageDto dto = new xxxRedisMessageDto();
            dto.setXxx((xxxXXXxxxEntity) data);
            dto.setTitle("异常信息");
            dto.setContent("异常信息");
            xxxRedisTemplate.convertAndSend(TOPIC_NAME1, dto);
        } catch (Throwable e) {
            log.error("消息发布", e);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4、Dto

@AllArgsConstructor
@NoArgsConstructor
@Data
public class xxxRedisMessageDto implements Serializable {

    private String title;
    private String content;
    private Object data;
    private xxxXXXxxxEntity xxx;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/798412
推荐阅读
相关标签
  

闽ICP备14008679号