当前位置:   article > 正文

Redisson延迟队列_redissonclient.getblockingqueue

redissonclient.getblockingqueue

场景:

需求:

支付的二维码,超过两个小时以后,如果还未支付,则自动转为取消支付,或者支付超时的状态

需求分析:

1,动态定时任务:

每个支付的二维码创建的时候,创建一个动态的定时任务,两个小时候自动执行,更新支付状态,可以解决这个问题。

(1)持久化:

如果服务重启了,动态定时任务会丢失,导致部分数据没办法更新状态。

(2)分布式:

如果当服务重启时,自动扫描数据,重新计算时间,再次创建动态定时任务。可以解决(1)的问题,但是当分布式,多个节点的时候,都会重新加载所有的任务,这样性能上不是最优解,只能在数据源上加上节点名称,不同的服务节点,加载属于自己的定时任务,可以解决这个问题。总的想想,太麻烦了,还是算了。

2,Redisson延迟队列

(1)持久化:队列信息放在Redis上,服务重启不影响。

(2)分布式:多节点去Redis拿去数据,谁抢到算谁的,不会存在同一个任务,多个节点支持。唯一不足就是过度依赖Redis,万一Redis崩了,那就凉凉了(那就是要把Redis配置高可用,当前业务就不用管了)。总体来说还是比较好用的。

实现

1,创建延迟队列的监听任务【RedisDelayedQueueListener】,消费延迟队列

2,创建新增延迟队列的类,用于创建延迟队列

3,整体初始化,把监听任务与spring绑定,扫描各个监听延迟队列的实现类,并开启单独线程,监听任务。

4,创建延迟任务(开始测试使用)

连接Redis

不贴代码了,自己在网上搜

监听延迟队列

接口:

  1. /**
  2. * 队列事件监听接口,需要实现这个方法
  3. *
  4. * @module
  5. * @author frank
  6. * @date 2021/8/19 10:50
  7. */
  8. public interface RedisDelayedQueueListener<T> {
  9. /**
  10. * 执行方法
  11. *
  12. * @param t
  13. */
  14. void invoke(T t);
  15. }

实现:

  1. import com.sxmaps.netschool.common.redisson.RedisDelayedQueueListener;
  2. import com.sxmaps.netschool.service.vo.school.SchoolAccountPayStateReqVO;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * 支付二维码监听器
  9. *
  10. * @module
  11. * @author frank
  12. * @date 2021/8/19 10:49
  13. */
  14. @Component
  15. public class PayQCordListener implements RedisDelayedQueueListener<SchoolAccountPayStateReqVO> {
  16. private final Logger logger = LoggerFactory.getLogger(PayQCordListener.class);
  17. @Autowired
  18. private SchoolAccountService schoolAccountService;
  19. @Override
  20. public void invoke(SchoolAccountPayStateReqVO payStateReqVO) {
  21. logger.info("支付二维码-延迟失效,内容:{}", payStateReqVO);
  22. //处理业务,更新二维码状态
  23. logger.info("支付二维码-延迟失效,内容:{},处理结果:{}", payStateReqVO,respDTO);
  24. }
  25. }

增加延迟队列

  1. import org.redisson.api.RBlockingQueue;
  2. import org.redisson.api.RDelayedQueue;
  3. import org.redisson.api.RedissonClient;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import java.util.concurrent.TimeUnit;
  9. /**
  10. * 增加延迟信息
  11. *
  12. * @author frank
  13. * @module
  14. * @date 2021/8/19 10:49
  15. */
  16. @Component
  17. public class RedisDelayedQueue {
  18. private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);
  19. @Autowired
  20. RedissonClient redissonClient;
  21. /**
  22. * 添加队列
  23. *
  24. * @param t DTO传输类
  25. * @param delay 时间数量
  26. * @param timeUnit 时间单位
  27. * @param <T> 泛型
  28. */
  29. private <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
  30. logger.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,t);
  31. RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
  32. RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
  33. delayedQueue.offer(t, delay, timeUnit);
  34. }
  35. /**
  36. * 添加队列-秒
  37. *
  38. * @param t DTO传输类
  39. * @param delay 时间数量
  40. * @param <T> 泛型
  41. */
  42. public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
  43. addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());
  44. }
  45. /**
  46. * 添加队列-分
  47. *
  48. * @param t DTO传输类
  49. * @param delay 时间数量
  50. * @param <T> 泛型
  51. */
  52. public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
  53. addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());
  54. }
  55. /**
  56. * 添加队列-时
  57. *
  58. * @param t DTO传输类
  59. * @param delay 时间数量
  60. * @param <T> 泛型
  61. */
  62. public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
  63. addQueue(t, delay, TimeUnit.HOURS, clazz.getName());
  64. }
  65. /**
  66. * 添加队列-天
  67. *
  68. * @param t DTO传输类
  69. * @param delay 时间数量
  70. * @param <T> 泛型
  71. */
  72. public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
  73. addQueue(t, delay, TimeUnit.DAYS, clazz.getName());
  74. }
  75. }

整体初始化

  1. import org.redisson.api.RBlockingQueue;
  2. import org.redisson.api.RedissonClient;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.BeansException;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.context.ApplicationContext;
  8. import org.springframework.context.ApplicationContextAware;
  9. import org.springframework.stereotype.Component;
  10. import java.util.Map;
  11. /**
  12. * 初始化队列监听
  13. *
  14. * @module
  15. * @author frank
  16. * @date 2021/8/19 10:49
  17. */
  18. @Component
  19. public class RedisDelayedQueueInit implements ApplicationContextAware {
  20. private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);
  21. @Autowired
  22. RedissonClient redissonClient;
  23. /**
  24. * 获取应用上下文并获取相应的接口实现类
  25. *
  26. * @param applicationContext
  27. * @throws BeansException
  28. */
  29. @Override
  30. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  31. Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
  32. for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
  33. String listenerName = taskEventListenerEntry.getValue().getClass().getName();
  34. startThread(listenerName, taskEventListenerEntry.getValue());
  35. }
  36. }
  37. /**
  38. * 启动线程获取队列*
  39. *
  40. * @param queueName queueName
  41. * @param redisDelayedQueueListener 任务回调监听
  42. * @param <T> 泛型
  43. * @return
  44. */
  45. private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
  46. RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
  47. //服务重启后,无offer,take不到信息。
  48. redissonClient.getDelayedQueue(blockingFairQueue);
  49. //由于此线程需要常驻,可以新建线程,不用交给线程池管理
  50. Thread thread = new Thread(() -> {
  51. logger.info("启动监听队列线程" + queueName);
  52. while (true) {
  53. try {
  54. T t = blockingFairQueue.take();
  55. logger.info("监听队列线程,监听名称:{},内容:{}", queueName, t);
  56. redisDelayedQueueListener.invoke(t);
  57. } catch (Exception e) {
  58. logger.info("监听队列线程错误,", e);
  59. }
  60. }
  61. });
  62. thread.setName(queueName);
  63. thread.start();
  64. }
  65. }

创建延迟任务

  1. @Autowired
  2. RedisDelayedQueue queue;
  3. .................
  4. queue.addQueueHours(new SchoolAccountPayStateReqVO(dto.getPayNo()),2, PayQCordListener.class);

本文参考了:https://my.oschina.net/wangnian/blog/3167316王念博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/823203
推荐阅读
  

闽ICP备14008679号