赞
踩
发布订阅:消息发布者发布消息 和 消息订阅者接收消息,两者之间通过某种媒介联系起来
Redis 发布订阅(pub/sub)是一种 消息通信模式 :发送者(pub)发送消息,订阅者(sub)接收消息。
Redis 客户端可以订阅任意数量的频道。
订阅 / 发布消息图:
图中可以看出,所需:
- 消息发送者 、 2. 频道 、 3. 消息订阅者
PUBLISH
命令向订阅者发布消息的时候,称这个客户端为发布者publisher
subscribe
或者 PSUBSCRIBE
接收消息时,称这个客户端为 订阅者 subscriber
channel(频道)
作为两者之间的中介,发布者直接把消息发送给 channel,而 channel 负责把消息发送给订阅者,发布者和订阅者之间没有直接的联系,都不知道对方的存在订阅者 1,2,3 订阅了频道 channel,当有消息发布给频道时,这个消息就会被发送到三个订阅者客户端
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
添加配置文件
spring:
redis:
host: 127.0.0.1
database: 5
password:
port: 6379
新建一个监听类,来监听消息
package com.maoxs.listener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; /** * 监听发送的消息 */ @Component public class CatListener extends MessageListenerAdapter implements MessageListener { @Autowired RedisTemplate redisTemplate; @Override public void onMessage(Message message, byte[] bytes) { System.out.println("我是Cat监听" + message.toString()); } }
package com.maoxs.listener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; /** * 监听发送的消息 */ public class FishListener extends MessageListenerAdapter implements MessageListener { @Autowired RedisTemplate redisTemplate; @Override public void onMessage(Message message, byte[] bytes) { System.out.println("我是Fish监听" + message.toString()); } }
创建一个监听容器
package com.maoxs.redis; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.maoxs.pojo.MessageReceiver; import org.springframework.cache.CacheManager; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import java.util.Arrays; @Configuration public class RedisMessageConfig { /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, CatListener catAdapter, FishListener fishAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat 的通道 container.addMessageListener(catAdapter, new PatternTopic("cat")); container.addMessageListener(fishAdapter, new PatternTopic("fish")); //这个container 可以添加多个 messageListener return container; } /** * 消息监听器适配器,绑定消息处理器 * * @param receiver * @return */ // @Bean // MessageListenerAdapter CatAdapter() { // return new MessageListenerAdapter(new CatListener()); // } /** * 消息监听器适配器,绑定消息处理器 * * @param receiver * @return */ // @Bean // MessageListenerAdapter FishAdapter() { // return new MessageListenerAdapter(new FishListener()); // } /** * redis 读取内容的template */ @Bean StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } @Bean public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); //定义value的序列化方式 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashKeySerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
建测试类,测试发布监听
@RestController public class TestController { @Resource StringRedisTemplate stringRedisTemplate; @GetMapping("cat") public void sendCatMessage(){ stringRedisTemplate.convertAndSend("cat","猫"); } @GetMapping("fish") public void sendFishMessage(){ stringRedisTemplate.convertAndSend("fish","鱼"); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。