赞
踩
@Configuration @ComponentScan({"xxxxx.core.service.redis"}) public class xxxxRedisConfig { @Bean @SuppressWarnings("all") public RedisTemplate<String, Object> xxxRedisTemplate( RedisConnectionFactory factory ) { //由于源码autoConfig中是<Object, Object>,开发中一般直接使用<String,Object> RedisTemplate<String, Object> template = new RedisTemplate(); template.setConnectionFactory(factory); //Json序列化配置 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); //String的序列化 StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); //key采用string的序列化 template.setKeySerializer(stringRedisSerializer); //hash的key采用string的序列化 template.setHashKeySerializer(stringRedisSerializer); //value序列化采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); //hash的value序列化方式采用jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } /** * Redis消息监听器容器 * 这个容器加载了RedisConnectionFactory和消息监听器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * * @param redisConnectionFactory 连接工厂 * @param adapter 适配器 * @return redis消息监听容器 */ @Bean @SuppressWarnings("all") public RedisMessageListenerContainer xxxRedisMessageListenercontainer( RedisConnectionFactory redisConnectionFactory, xxxRedisMessageListener listener, xxxPrintMessageReceiver printMessageReceiver //MessageListenerAdapter adapter ) { final String xxxWebsocket = "xxxWebsocket"; // 订阅主题 final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题 RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 监听所有库的key过期事件 container.setConnectionFactory(redisConnectionFactory); // 所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息 // 可以添加多个 messageListener,配置不同的通道 container.addMessageListener(listener, new PatternTopic(xxxWebsocket)); container.addMessageListener(xxxMessageListenerAdapter( printMessageReceiver), new PatternTopic(TOPIC_NAME2)); /** * 设置序列化对象 * 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化 * 2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息 */ Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); container.setTopicSerializer(seria); return container; } /** * 这个地方是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” * 也有好几个重载方法,这边默认调用处理器的方法 叫OnMessage * * @param printMessageReceiver * @return */ @Bean public MessageListenerAdapter xxxMessageListenerAdapter(xxxPrintMessageReceiver printMessageReceiver) { MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "xxxReceiveMessage"); Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); receiveMessage.setSerializer(seria); return receiveMessage; } }
@Slf4j
@Component
public class xxxPrintMessageReceiver{
public void xxxReceiveMessage(xxxRedisMessageDto messageDto , String channel) {
// 接收的topic
log.info("channel:" + channel);
log.info("message:" + messageDto.getTitle());
}
}
@Slf4j @Component public class xxxRedisMessageListener implements MessageListener { @Resource private RedisTemplate xxxRedisTemplate; @Override public void onMessage(Message message, byte[] bytes) { // 接收的topic //log.info("channel:" + new String(bytes)); //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化) xxxRedisMessageDto messageDto = (xxxRedisMessageDto) xxxRedisTemplate.getValueSerializer().deserialize(message.getBody()); //log.info(messageDto.getData()+","+messageDto.getContent()); xxxWebsocket xxxWebsocket = new xxxWebsocket(); String msgtxt = JSONSerializer.serialize(messageDto.getMepeprobleminfo()); xxxWebsocket.sendMessageByUser(messageDto.getMepeprobleminfo().getWorkcenterid(),msgtxt); } }
private final String TOPIC_NAME1 = "xxxWebsocket"; // 订阅主题
public void xxxRedisConvertAndSend(Object data) {
try {
// 发布消息
xxxRedisMessageDto dto = new xxxRedisMessageDto();
dto.setXxx((xxxXXXxxxEntity) data);
dto.setTitle("异常信息");
dto.setContent("异常信息");
xxxRedisTemplate.convertAndSend(TOPIC_NAME1, dto);
} catch (Throwable e) {
log.error("消息发布", e);
}
}
4、Dto
@AllArgsConstructor
@NoArgsConstructor
@Data
public class xxxRedisMessageDto implements Serializable {
private String title;
private String content;
private Object data;
private xxxXXXxxxEntity xxx;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。