赞
踩
技术方案 | 优点 | 弊端 | 应用场景 |
---|---|---|---|
时间轮定时任务 | 高效 | 数据在内存中、服务重启丢失 | dubbo底层自动连接zk,重试等功能 |
定时任务扫描表 | 稳定 | 频繁扫表、时间误差大 | |
redis订阅过期key | 简单 | 不稳定、推送不及时、丢消息、广播问题(不支持多实例) | |
rocketMq | 简单 | 开源时间固定,非开源收费 | |
Redisson延迟队列 | 理论无延迟,无广播问题 | redis开销大 |
在了解实现原理前,我们需要先了解一下,延时队列的运行,涉及到的主要技术,有以下3点
内存版的定时任务。
io.netty.util.HashedWheelTimer
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
Redis 发布订阅 (pub/sub) 是一种消息通信模式
protected RTopic getTopic() {
return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
}
redisson继承并实现了jdk的BlockingQueue
public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockingQueueAsync<V>{}
public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V>{}
key | value | 类型 | 作用特性 |
---|---|---|---|
redisson_delay_queue_timeout:{target_queue} | 分数和值 | zet | 时间排序 |
redisson_delay_queue:{target_queue} | 值 | list | 删除时用 |
redisson_delay_queue_channel:{target_queue} | 分数 | 发布订阅 | 通知客户端 |
target_queue | 值 | list | 实际取的数据 |
@Override protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, // 取出redisson_delay_queue_timeout:{target_queue}队列中小于当前时间的100条数据 "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " /** * 如果有数据循环 * 1、数据解包 * 2、同步数据到target_queue队列 * 3、删除redisson_delay_queue:{target_queue}队列数据 */ + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " // 删除redisson_delay_queue_timeout:{target_queue}中刚刚取到的数据 + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // 获取redisson_delay_queue_timeout:{target_queue}队列最新一条数据 // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " // 返回最新数据的分数 + "return v[2]; " + "end " + "return nil;", Arrays.asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); }
@Override public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { if (delay < 0) { throw new IllegalArgumentException("Delay can't be negative"); } long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; byte[] random = getServiceManager().generateIdArray(8); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, // 打包数据 "local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);" // 数据添加到redisson_delay_queue_timeout:{target_queue}队列 + "redis.call('zadd', KEYS[2], ARGV[1], value);" // 数据添加到redisson_delay_queue:{target_queue} + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers // 如果添加的数据是redisson_delay_queue_timeout:{target_queue}队列的第一条数据 // 发布消息到redisson_delay_queue_channel:{target_queue},消息体为timeout + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", Arrays.asList(getRawName(), timeoutSetName, queueName, channelName), timeout, random, encode(e)); }
protected RFuture<Boolean> removeAsync(Object o, int count) { return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, // 获取redisson_delay_queue:{target_queue}队列长度 "local s = redis.call('llen', KEYS[1]);" + // 遍历操作by size "for i = 0, s-1, 1 do " + "local v = redis.call('lindex', KEYS[1], i);" + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "if ARGV[1] == value then " // 删除redisson_delay_queue_timeout:{target_queue}数据 + "redis.call('zrem', KEYS[2], v);" // redisson_delay_queue:{target_queue}队列数据 + "redis.call('lrem', KEYS[1], 1, v);" + "return 1;" + "end; " + "end;" + "return 0;", Arrays.<Object>asList(queueName, timeoutSetName), encode(o)); }
@Override public RFuture<Boolean> removeAllAsync(Collection<?> c) { if (c.isEmpty()) { return new CompletableFutureWrapper<>(false); } return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, "local result = 0;" + // 获取redisson_delay_queue:{target_queue}队列长度 "local s = redis.call('llen', KEYS[1]);" + "local i = 0;" + // 遍历 "while i < s do " + "local v = redis.call('lindex', KEYS[1], i);" + "local randomId, value = struct.unpack('Bc0Lc0', v);" // 循环匹配 Collection值 + "for j = 1, #ARGV, 1 do " + "if value == ARGV[j] then " + "result = 1; " + "i = i - 1; " + "s = s - 1; " // 删除redisson_delay_queue_timeout:{target_queue}数据 + "redis.call('zrem', KEYS[2], v);" // redisson_delay_queue:{target_queue}队列数据 + "redis.call('lrem', KEYS[1], 0, v); " + "break; " + "end; " + "end; " + "i = i + 1;" + "end; " + "return result;", Arrays.asList(queueName, timeoutSetName), encode(c).toArray()); }
@Slf4j @Component public class RedisDelayedQueueRunner implements ApplicationRunner { @Resource private RedissonClient redissonClient; private final int threadCount = RedisDelayQueueEnum.values().length; private <T> void startThread(RedisDelayQueueEnum queueEnum) { RedisDelayedQueueListener<T> redisDelayedQueueListener = SpringUtil.getBean(queueEnum.getBeanId()); RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueEnum.getCode()); //服务重启后,无offer,take不到信息。 redissonClient.getDelayedQueue(blockingFairQueue); log.info("启动监听队列线程" + queueEnum.getCode()); while (true) { try { T t = blockingFairQueue.take(); log.info("监听队列线程,监听名称:{},内容:{}", queueEnum.getBeanId(), t); redisDelayedQueueListener.invoke(t); } catch (InterruptedException e) { log.error("take线程 执行异常", e); Thread.currentThread().interrupt(); break; } } } @Override public void run(ApplicationArguments args) { ExecutorService executor = Executors.newFixedThreadPool(threadCount); for (RedisDelayQueueEnum queueEnum : RedisDelayQueueEnum.values()) { executor.execute(() -> startThread(queueEnum)); } } }
public <T> void addQueue(T putInData, long delay, TimeUnit timeUnit, String queueName) {
log.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}", queueName, delay, timeUnit, putInData);
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(putInData, delay, timeUnit);
}
private <T> boolean removeData(Collection<T> putInData, String queueName) {
log.info("删除延迟队列数据,队列名称:{},内容:{}", queueName, putInData);
if (CollUtil.isEmpty(putInData)) {
return false;
}
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
return delayedQueue.removeAll(putInData);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。