赞
踩
RDelayedQueue作为redisson封装的一个分布式延迟队列,直接拿来使用还是比较简单的。
本文主要包括以下几部分:
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SingleServerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Redisson配置类 * * @author xxx */ @Configuration public class RedissonConfig { @Value("${spring.application.name}") private String serverName; @Bean public RedissonClient redissonClient(RedisProperties redisProperties) { Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer(); singleServerConfig.setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort()); singleServerConfig.setPassword(redisProperties.getPassword()); singleServerConfig.setKeepAlive(true); singleServerConfig.setDatabase(redisProperties.getDatabase()); singleServerConfig.setConnectionMinimumIdleSize(2); singleServerConfig.setConnectionPoolSize(4); singleServerConfig.setClientName(serverName); return Redisson.create(config); } }
spring:
application:
name: delay-task-service
redis:
host: 192.168.8.18
port: 6379
database: 0
timeout: 3000
作为延迟任务的生产者,你需要根据预期的回调时间,计算出delay延迟时间。
伪代码见下:
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue";
private final RedissonClient redissonClient;
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
long delay = DateUtil.between(event.getNotifyDate(), new DateTime(), DateUnit.SECOND);
delayedQueue.offer(event.getTransNo(), delay < 0 ? 1 : delay, TimeUnit.SECONDS);
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue"; private final RedissonClient redissonClient; @PostConstruct public void init() { new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()) .execute(() -> { while (true) { try { RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDISSON_QUEUE_NAME); RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); String transNo = blockingDeque.take(); if (null == transNo) { return; } if (log.isInfoEnabled()) { log.info("开始执行延迟队列中的任务,transNo={}", transNo); } // 异步执行你的操作 notifyTaskService.handleTask(transNo, null); } catch (Exception e) { log.error("延时队列的任务执行出现异常", e); } } }); }
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue";
private final RedissonClient redissonClient;
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.remove(transNo);
本文主要是摘要一些源码,仅供参考。
延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue
延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。