赞
踩
引入redission jar包:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.4</version>
</dependency>
先配置项目中的redission 配置文件:
@Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String redisHost; @Value("${spring.redis.port}") private String redisPort; @Value("${spring.redis.password}") private String password; @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort); if(StringUtils.isBlank(password)) { config.useSingleServer().setPassword(null); } else { config.useSingleServer().setPassword(password); } return Redisson.create(config); } }
也可以使用redission springboot 集成配置,引入redisson-spring-data-2x,这样就无需配置类了。详见官网:
https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter
增加封装操作api的方法,提供crud队列元素功能:
@Service public class RedissonDelayedQueueService { @Autowired private RedissonClient redissonClient; public <E> void addQueue(E e, long delay, TimeUnit timeUnit, String queueName) { RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName); RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); delayedQueue.offer(e, delay, timeUnit); } public <E> RBlockingDeque<E> getQueue(String queueName) { RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName); redissonClient.getDelayedQueue(blockingDeque); return blockingDeque; } public <E> void removeQueueElement(E e, String queueName) { RBlockingDeque<E> blockingDeque = redissonClient.getBlockingDeque(queueName); RDelayedQueue<E> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); delayedQueue.remove(e); } }
现在我们就可以实现我们一开始提到的核心思想,比如我们想实现一个订单的阻塞队列监听:
先定义一个键值:
public interface CacheKeyDefinition {
/**
* 订单延时到生效时间执行
*/
String REDIS_KEY_ORDER = "redis_key_publish_order";
}
@Slf4j @Component public class OrderDelayedQueueListener { @Resource private RedissonDelayedQueueService redissonDelayedQueueService; @Resource private OrderHandler orderHandler; private final ExecutorService singlePoolExecutor = Executors.newSingleThreadExecutor(); /** * 每次 */ @PostConstruct public void listener() { singlePoolExecutor.execute(()->{ while (true){ RBlockingDeque<String> blockingDeque = redissonDelayedQueueService.getQueue(CacheKeyDefinition.REDIS_KEY_ORDER); String priceListCode = null; try { id = blockingDeque.take(); } catch (InterruptedException e) { log.error("消费异常",e); } log.info("获取到订单队列:{}",id); orderHandler.handle(id); } }); } }
在实际业务中使用示例:
public class OrderserviceImpl{ @Autowired private RedissonDelayedQueueService delayedQueueService; void create(){ //创建订单 String id=; //将当前订单放入延时任务中: delayedQueueService.addQueue(id, 30, TimeUnit.MINUTES, CacheKeyDefinition.REDIS_KEY_ORDER); } //付款 void pay(String id){ //判定付款开始。。 //判定付款成功。。 //删除redis 延迟队列元素 delayedQueueService.removeQueueElement(id, CacheKeyDefinition.REDIS_KEY_ORDER); } }
订单的延时处理
class OrderHandler{
void handle(String id){
//关闭当前订单
close(id);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。