赞
踩
废话不多说,直接上代码
代码结构
1.导包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
2.在application.properties中配置redis
#配置Redis
spring.redis.database=0
spring.redis.host=192.168.57.155
spring.redis.port=6379
spring.redis.password=
#最大连接数
spring.redis.pool.max-active=8
#最大阻塞时间,负值表示没有限制
spring.redis.pool.max-wait=-1
#最小空闲连接数
spring.redis.pool.min-idle=0
#最大空闲连接数
spring.redis.pool.max-idle=8
#连接超时时间(毫秒)
spring.redis.timeout=10000
3.创建redis配置类
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisPassword; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import redis.clients.jedis.JedisPoolConfig; /** * 描述 * * @author wangYaNan * @version 1.0 * @date 2021/04/14 11:26:08 */ @Configuration public class RedisCacheConfig { @Value("${spring.redis.database}") private int database; @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.pool.max-active}") private int maxActive; @Value("${spring.redis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.pool.min-idle}") private int minIdle; @Value("${spring.redis.pool.max-wait}") private int maxWait; /** * 配置JedisConnectionFactory * spring 2.*版本 * * @return */ @Bean public JedisConnectionFactory jedisConnectionFactory() { //spring 2.*版本 RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(); redisStandaloneConfiguration.setDatabase(database); redisStandaloneConfiguration.setHostName(host); redisStandaloneConfiguration.setPassword(RedisPassword.of(password)); redisStandaloneConfiguration.setPort(port); JedisClientConfiguration.JedisClientConfigurationBuilder jedisClientConfigurationBuilder = JedisClientConfiguration.builder(); JedisClientConfiguration jedisClientConfiguration = jedisClientConfigurationBuilder .usePooling().poolConfig(this.poolConfig(minIdle, maxIdle, maxActive, maxWait)).build(); return new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration); } //连接池配置 public JedisPoolConfig poolConfig(int minIdle, int maxIdle, int maxActive, int maxWait) { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(maxActive); poolConfig.setMaxIdle(maxIdle); poolConfig.setMinIdle(minIdle); poolConfig.setMaxWaitMillis(maxWait); return poolConfig; } /** * SpringBoot自定义配置RedisTemplate */ @Bean @SuppressWarnings({"rawtypes", "unchecked"}) public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); // 使用Jackson2JsonRedisSerialize 替换默认序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); // 设置value的序列化规则和 key的序列化规则 template.setValueSerializer(jackson2JsonRedisSerializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; } }
4.创建redis监听类(在里面可以自定义监听频道也就是订阅)
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import java.util.concurrent.CountDownLatch; /** * 描述 * * @author wangYaNan * @version 1.0 * @date 2021/04/14 13:40:22 */ @Configuration public class RedisMessageListener { /** * 创建连接工厂 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, MessageListenerAdapter listenerAdapterWang, MessageListenerAdapter listenerAdapterTest2) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //(不同的监听器可以收到同一个频道的信息)接受消息的频道 container.addMessageListener(listenerAdapter, new PatternTopic("phone")); container.addMessageListener(listenerAdapterWang, new PatternTopic("phone")); container.addMessageListener(listenerAdapterTest2, new PatternTopic("phoneTest2")); return container; } /** * 绑定消息监听者和接收监听的方法 * * @param receiver * @return */ @Bean public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean public MessageListenerAdapter listenerAdapterWang(ReceiverRedisMessage receiver) { return new MessageListenerAdapter(receiver, "receiveMessageWang"); } /** * 绑定消息监听者和接收监听的方法 * * @param receiver * @return */ @Bean public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage receiver) { return new MessageListenerAdapter(receiver, "receiveMessage2"); } /** * 注册订阅者 * * @param latch * @return */ @Bean ReceiverRedisMessage receiver(CountDownLatch latch) { return new ReceiverRedisMessage(latch); } /** * 计数器,用来控制线程 * * @return */ @Bean public CountDownLatch latch() { return new CountDownLatch(1);//指定了计数的次数 1 } }
5.创建redis消息接受类,也就是在这里可以处理消费信息的逻辑
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.CountDownLatch; /** * 描述 * * @author wangYaNan * @version 1.0 * @date 2021/04/14 13:41:39 */ @Slf4j public class ReceiverRedisMessage { private CountDownLatch latch; @Autowired public ReceiverRedisMessage(CountDownLatch latch) { this.latch = latch; } /** * 队列消息接收方法 * * @param jsonMsg */ public void receiveMessage(String jsonMsg) { log.info("[开始消费REDIS消息队列phone数据...]"); try { System.out.println(jsonMsg); log.info("[消费REDIS消息队列phone数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } public void receiveMessageWang(String jsonMsg) { log.info("[王亚南------开始消费REDIS消息队列phone数据...]"); try { System.out.println(jsonMsg); log.info("[王亚南------消费REDIS消息队列phone数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } /** * 队列消息接收方法 * * @param jsonMsg */ public void receiveMessage2(String jsonMsg) { log.info("[开始消费REDIS消息队列phoneTest2数据...]"); try { System.out.println(jsonMsg); /** * 此处执行自己代码逻辑 例如 插入 删除操作数据库等 */ log.info("[消费REDIS消息队列phoneTest2数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phoneTest2数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } }
6.写个单元测试试一下
@Test
public void redisTest0() {
//这个是测试同一个频道,不同的订阅者收到相同的信息,“phone”也就是topic也可以理解为频道
redisTemplate.convertAndSend("phone", "223333");
//这个phoneTest2是另外的一个频道,可以把下面的注释放开同时向phone和phoneTest2这两个topic发送信息看下效果
// redisTemplate.convertAndSend("phoneTest2", "34555665");
}
6.效果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。