当前位置:   article > 正文

Spring Boot集成Redisson实现延迟队列_redisson延迟队列

redisson延迟队列

项目场景:

   在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?

   一般实现的方法有几种:使用 redisson、rocketmq、rabbitmq等消息队列的延时投递功能。


解决方案:

   一般项目集成redis的比较多,所以我这篇文章就说下redisson延迟队列,如果使用rocketmq或rabbitmq需要额外集成中间件,比较麻烦一点。

1.集成redisson

maven依赖

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson-spring-boot-starter</artifactId>
  4. <version>3.21.1</version>
  5. </dependency>

yml配置,单节点配置可以兼容redis的配置方式

  1. # redis配置
  2. spring:
  3. redis:
  4. database: 0
  5. host: 127.0.0.1
  6. password: redis@pass
  7. port: 6001

 更详细的配置参考:Spring Boot整合Redisson的两种方式-CSDN博客

2.配置多线程

因为延迟队列可能会多个任务同时执行,所以需要多线程处理。

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.scheduling.annotation.EnableAsync;
  4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. @Configuration
  7. @EnableAsync
  8. public class ExecutorConfig {
  9. /**
  10. * 异步任务自定义线程池
  11. */
  12. @Bean(name = "taskExecutor")
  13. public ThreadPoolTaskExecutor asyncServiceExecutor() {
  14. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  15. //配置核心线程数
  16. executor.setCorePoolSize(50);
  17. //配置最大线程数
  18. executor.setMaxPoolSize(500);
  19. //配置队列大小
  20. executor.setQueueCapacity(300);
  21. //允许线程空闲时间
  22. executor.setKeepAliveSeconds(60);
  23. //配置线程池中的线程的名称前缀
  24. executor.setThreadNamePrefix("taskExecutor-");
  25. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
  26. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
  27. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  28. //调用shutdown()方法时等待所有的任务完成后再关闭
  29. executor.setWaitForTasksToCompleteOnShutdown(true);
  30. //等待所有任务完成后的最大等待时间
  31. executor.setAwaitTerminationSeconds(60);
  32. return executor;
  33. }
  34. }

3.具体业务

比如消息通知、关闭订单等 ,这里加上了@Async注解,可以异步执行

  1. import org.springframework.scheduling.annotation.Async;
  2. import org.springframework.stereotype.Service;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. @Service
  6. public class AsyncService {
  7. @Async
  8. public void executeQueue(Object value) {
  9. System.out.println();
  10. System.out.println("当前线程:"+Thread.currentThread().getName());
  11. System.out.println("执行任务:"+value);
  12. //打印时间方便查看
  13. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  14. System.out.println("执行任务的时间:"+sdf.format(new Date()));
  15. //自己的业务逻辑,可以根据id发送通知消息等
  16. //......
  17. }
  18. }

4.延迟队列(关键代码)

这里包括添加延迟队列,和消费延迟队列,@PostConstruct注解的意思是服务启动加载一次,参考

Spring Boot项目启动时执行指定的方法-CSDN博客Spring Boot中多个PostConstruct注解执行顺序控制_多个postconstruct执行顺序-CSDN博客

  1. import org.redisson.api.RBlockingQueue;
  2. import org.redisson.api.RDelayedQueue;
  3. import org.redisson.api.RedissonClient;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.PostConstruct;
  8. import javax.annotation.Resource;
  9. import java.text.SimpleDateFormat;
  10. import java.util.Date;
  11. import java.util.concurrent.TimeUnit;
  12. @Service
  13. public class TestService {
  14. @Resource
  15. private AsyncService asyncService;
  16. @Resource
  17. private ThreadPoolTaskExecutor executor;
  18. @Autowired
  19. private RedissonClient redissonClient;
  20. /**
  21. * 添加延迟任务
  22. */
  23. public void addQueue() {
  24. //获取延迟队列
  25. RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("delayedQueue");
  26. RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
  27. for (int i = 1; i <= 10; i++) {
  28. long delayTime = 5+i; //延迟时间(秒)
  29. // long delayTime = 5; //这里时间统一,可以测试并发执行
  30. delayedQueue.offer("延迟任务"+i, delayTime, TimeUnit.SECONDS);
  31. }
  32. //打印时间方便查看
  33. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  34. System.out.println("添加任务的时间:"+sdf.format(new Date()));
  35. }
  36. /**
  37. * 服务启动时加载,开始消费延迟队列
  38. */
  39. @PostConstruct
  40. public void consumer() {
  41. System.out.println("服务启动时加载>>>>>>");
  42. //获取延迟队列
  43. RBlockingQueue<Object> delayedQueue = redissonClient.getBlockingQueue("delayedQueue");
  44. //启用一个线程来消费这个延迟队列
  45. executor.execute(() ->{
  46. while (true){
  47. try {
  48. // System.out.println("while中的线程:"+Thread.currentThread().getName());
  49. //获取延迟队列中的任务
  50. Object value = delayedQueue.poll();
  51. if(value == null){
  52. //如果没有任务就休眠1秒,休眠时间根据业务自己定义
  53. Thread.sleep(1000); //这里休眠时间越短,误差就越小
  54. continue;
  55. }
  56. //异步处理延迟队列中的消息
  57. asyncService.executeQueue(value);
  58. } catch (Exception e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. });
  63. }
  64. }

5.测试接口 

  1. import com.test.service.TestService;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. @RequestMapping("/test")
  8. public class TestController {
  9. @Autowired
  10. private TestService testService;
  11. /*
  12. * 添加延迟任务
  13. */
  14. @GetMapping(value = "/addQueue")
  15. public String addQueue() {
  16. testService.addQueue();
  17. return "success";
  18. }
  19. }

6.测试结果


 总结:

  1. Redisson的的RDelayedQueue是基于Redis实现的,而Redis本身并不保证数据的持久性。如果Redis服务器宕机,那么所有在RDelayedQueue中的数据都会丢失。因此,我们需要在应用层面进行持久化设计,例如定期将RDelayedQueue中的数据持久化到数据库。
  2. 在设计延迟任务时,我们应该根据实际需求来合理设置延迟时间,避免设置过长的延迟时间导致内存占用过高。

源码:https://download.csdn.net/download/u011974797/89225515 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/823186
推荐阅读
相关标签
  

闽ICP备14008679号