赞
踩
版本说明:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.28.0</version>
</dependency>
application.properties
spring.application.name=springboot-redis-delayed-queue-demo
spring.redis.database=2
spring.redis.host=localhost
spring.redis.password=123456
spring.redis.port=6379
package cn.aohan.delayedqueue.provider; import cn.aohan.delayedqueue.model.DelayedTaskInfo; import cn.aohan.delayedqueue.model.TaskData; import org.redisson.api.RBlockingDeque; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.redisson.codec.JsonJacksonCodec; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; /** * @author 傲寒 * @date 2024/4/19 */ @Component public class DelayedQueueProvider { private final RedissonClient redissonClient; public DelayedQueueProvider(RedissonClient redissonClient) { this.redissonClient = redissonClient; } /** * 添加延迟任务 * * @param delayedName 延迟名称 * @param val 值 * @param delayTime 延迟时间 * @param timeUnit 时间单位 */ public void addDelayedTask(String delayedName, TaskData val, long delayTime, TimeUnit timeUnit) { final DelayedTaskInfo task = new DelayedTaskInfo(); task.setCreateAt(System.currentTimeMillis()); task.setDelayTime(delayTime); task.setTimeUnit(timeUnit); task.setVal(val); task.setDelayedName(delayedName); final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(delayedName); delayedQueue.offer(task, delayTime, timeUnit); } /** * 删除任务 * * @param queueName 队列名称 * @param taskId 任务id */ public void removeTask(String queueName, String taskId) { final RBlockingDeque<DelayedTaskInfo> blockingDeque = getBlockingDeque(queueName); final Predicate<DelayedTaskInfo> predicate = item -> { final TaskData val = item.getVal(); return Objects.nonNull(val) && Objects.equals(taskId, val.taskId); }; blockingDeque.removeIf(predicate); final RDelayedQueue<DelayedTaskInfo> delayedQueue = getDelayedQueue(getBlockingDeque(queueName)); delayedQueue.removeIf(predicate); } /** * 获取阻塞deque * * @param queueName 队列名称 * @return {@link RBlockingDeque}<{@link DelayedTaskInfo}> */ public RBlockingDeque<DelayedTaskInfo> getBlockingDeque(String queueName) { return redissonClient.getBlockingDeque(queueName, JsonJacksonCodec.INSTANCE); } /** * 获取延迟队列 * * @param queueName 队列名称 * @return {@link RDelayedQueue}<{@link DelayedTaskInfo}> */ private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(String queueName) { return redissonClient.getDelayedQueue(getBlockingDeque(queueName)); } /** * 获取延迟队列 * * @param blockingDeque 阻塞deque * @return {@link RDelayedQueue}<{@link DelayedTaskInfo}> */ private RDelayedQueue<DelayedTaskInfo> getDelayedQueue(RBlockingDeque<DelayedTaskInfo> blockingDeque) { return redissonClient.getDelayedQueue(blockingDeque); } }
延时队列名称常量
/**
* @author 傲寒
* @date 2024/4/19
*/
public class QueueConstant {
/**
* 测试延迟任务队列 name
*/
public static final String TEST_DELAYED_TASK_QUEUE = "test_delayed_task_queue";
}
listener监听
package cn.aohan.delayedqueue.listener; import cn.aohan.delayedqueue.constant.QueueConstant; import cn.aohan.delayedqueue.model.DelayedTaskInfo; import cn.aohan.delayedqueue.provider.DelayedQueueProvider; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingDeque; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.concurrent.TimeUnit; /** * 延迟任务监听器 * * @author 傲寒 * @date 2024/4/19 */ @RequiredArgsConstructor @Slf4j @Component public class DelayedTaskListener implements ApplicationRunner { private final DelayedQueueProvider delayedQueueProvider; @Override public void run(ApplicationArguments args) throws Exception { delayedTaskHandle(QueueConstant.TEST_DELAYED_TASK_QUEUE); } public void delayedTaskHandle(String delayedQueueName) { final Thread thread = new Thread(() -> { final RBlockingDeque<DelayedTaskInfo> blockingDeque = delayedQueueProvider.getBlockingDeque(delayedQueueName); while (true) { try { //将到期的数据取出来,等待超时 final DelayedTaskInfo delayedTaskInfo = blockingDeque.poll(2, TimeUnit.MINUTES); if (Objects.isNull(delayedTaskInfo)) { continue; } log.info("DelayedTask task :[{}]", delayedTaskInfo); } catch (Exception e) { log.error("DelayedTaskListener#delayedTaskHandle error delayedQueueName:[{}]", delayedQueueName, e); } } }); thread.setDaemon(true); thread.start(); } }
package cn.aohan.delayedqueue.controller; import cn.aohan.common.dto.Result; import cn.aohan.delayedqueue.constant.QueueConstant; import cn.aohan.delayedqueue.model.dto.TestDelayedDTO; import cn.aohan.delayedqueue.provider.DelayedQueueProvider; import lombok.AllArgsConstructor; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 傲寒 * @date 2024/4/19 */ @AllArgsConstructor @RestController @RequestMapping("/api/test/delayed") public class DelayedQueueTestController { private final DelayedQueueProvider delayedQueueProvider; /** * 添加延迟任务 * * @param delayedTask 延迟任务 * @return {@link Result}<{@link Void}> */ @PostMapping public Result<Void> addDelayedTask(@RequestBody TestDelayedDTO delayedTask) { delayedQueueProvider.addDelayedTask( QueueConstant.TEST_DELAYED_TASK_QUEUE, delayedTask.getVal(), delayedTask.getDelayTime(), delayedTask.getTimeUnit() ); return Result.success(); } }
在一开始创建延时队列的时候会创建一个QueueTransferTask
org.redisson.RedissonDelayedQueue#RedissonDelayedQueue
channelName = prefixName("redisson_delay_queue_channel", getRawName());
pushTaskAsync
去操作lua脚本移除redis 中LIST和ZSET的元素。根据延迟时间插入到对中合适的位置,主要是
org.redisson.RedissonDelayedQueue#offerAsync
方法中的一段lua脚本
local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);
redis.call('zadd', KEYS[2], ARGV[1], value);
redis.call('rpush', KEYS[3], value);
local v = redis.call('zrange', KEYS[2], 0, 0);
if v[1] == value then
redis.call('publish', KEYS[4], ARGV[1]);
end;
总而言之,这段代码的功能是:
然后使用BLPOP阻塞的去获取LIST的元素
redisson实现延迟队列的原理,简单来说,将数据插入到延迟队列时,会存入到延迟队列的list和zset结构中,通过任务调度的方式将延迟队列中到期的数据取出,然后放入到阻塞队列中,客户端通过BLPOP的命令阻塞的拉取阻塞队列的数据,若拉取到数据就可以进行业务逻辑的处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。