赞
踩
1.pom引入redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.1</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.27.0</version>
</dependency>
3.配置redis的队列
import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @email 1965654634@qq.com */ @Configuration public class RedisQueueConfig { @Bean public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) { //队列名称可以自己定义 return redissonClient.getBlockingQueue("delayedQueue"); } @Bean public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue, RedissonClient redissonClient) { return redissonClient.getDelayedQueue(blockingQueue); } }
4.创建redis的队列类
import cn.hutool.core.date.LocalDateTimeUtil; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.time.LocalDateTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @email 1965654634@qq.com */ @Slf4j @Service public class DelayedQueueService{ @Autowired private RDelayedQueue<String> delayedQueue; @Autowired private RBlockingQueue<String> blockingQueue; @PostConstruct public void init() { // 创建延迟队列 ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.submit(() -> { while (true) { try { // 从延迟队列中取出任务 String task = blockingQueue.take(); // 处理延迟任务,例如执行某个操作 log.info("redis的延迟队列:{},当前时间:{}", task, LocalDateTimeUtil.formatNormal(LocalDateTime.now())); } catch (Exception e) { log.error("redis的延迟队列抛出异常", e); } } }); } public String addDelayedTask(String task, long delay) { log.info("redis的延迟队列(添加)的key:{},time:{},当前时间:{}",task,delay, LocalDateTimeUtil.formatNormal(LocalDateTime.now())); // 将任务加入延迟队列 delayedQueue.offerAsync(task, delay, TimeUnit.SECONDS); // 返回任务的唯一标识 return task; } public void cancelDelayedTask(String task) { log.info("redis的延迟队列(移除)的key:{},当前时间:{}",task, LocalDateTimeUtil.formatNormal(LocalDateTime.now())); // 从延迟队列中移除任务 delayedQueue.remove(task); } }
5.测试controller
@PostMapping("/queueAdd")
public ResponseResult<Boolean> queueAdd(String task,Long time){
log.info("测试添加的key:{},time:{},当前时间:{}",task,time, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
delayedQueueService.addDelayedTask(task,time);
return ResponseResult.ok();
}
@PostMapping("/queueDel")
public ResponseResult<Boolean> dianTiAlarm(String task,Long time){
log.info("测试删除的key:{},time:{},当前时间:{}",task,time, LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
delayedQueueService.cancelDelayedTask(task);
return ResponseResult.ok();
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。