当前位置:   article > 正文

Redisson实现延迟队列_redisson延时队列

redisson延时队列
  1. import org.springframework.stereotype.Component;
  2. import java.lang.annotation.*;
  3. @Component
  4. @Documented
  5. @Target({ ElementType.TYPE })
  6. @Retention(RetentionPolicy.RUNTIME)
  7. public @interface RedisDelay {
  8. /**
  9. * 延迟队列 queueName
  10. */
  11. String queueName();
  12. /**
  13. * 延迟队列描述
  14. */
  15. String desc() default "";
  16. }
  1. public interface RedisDelayQueueListener<T> {
  2. void execute(T t);
  3. }
  1. import com.tizo.biz.base.exception.type.ServerException;
  2. import com.tizo.biz.base.utils.AssertLog;
  3. import jakarta.annotation.Resource;
  4. import org.redisson.api.RBlockingDeque;
  5. import org.redisson.api.RDelayedQueue;
  6. import org.redisson.api.RedissonClient;
  7. import org.springframework.stereotype.Component;
  8. import java.util.concurrent.TimeUnit;
  9. /**
  10. * redis延迟队列工具
  11. */
  12. @Component
  13. public class RedisDelayQueueService {
  14. @Resource
  15. private RedissonClient redissonClient;
  16. /**
  17. * 添加队列-秒
  18. *
  19. * @param t DTO传输类
  20. * @param delay 时间数量
  21. * @param <T> 泛型
  22. */
  23. public <T> void pushQueueSeconds(T t, long delay, String queueName) {
  24. pushQueue(t, delay, TimeUnit.SECONDS, queueName);
  25. }
  26. /**
  27. * 添加队列-分
  28. *
  29. * @param t DTO传输类
  30. * @param delay 时间数量
  31. * @param <T> 泛型
  32. */
  33. public <T> void pushQueueMinutes(T t, long delay, String queueName) {
  34. pushQueue(t, delay, TimeUnit.MINUTES, queueName);
  35. }
  36. /**
  37. * 添加队列-时
  38. *
  39. * @param t DTO传输类
  40. * @param delay 时间数量
  41. * @param <T> 泛型
  42. */
  43. public <T> void pushQueueHours(T t, long delay, String queueName) {
  44. pushQueue(t, delay, TimeUnit.HOURS, queueName);
  45. }
  46. /**
  47. * 添加队列-天
  48. *
  49. * @param t DTO传输类
  50. * @param delay 时间数量
  51. * @param <T> 泛型
  52. */
  53. public <T> void pushQueueDays(T t, long delay, String queueName) {
  54. pushQueue(t, delay, TimeUnit.DAYS, queueName);
  55. }
  56. /**
  57. * 创建延迟队列
  58. *
  59. * @param value 队列值
  60. * @param delay 延迟时间
  61. * @param timeUnit 时间单位
  62. * @param queueName 队列键
  63. * @param <T>
  64. */
  65. public <T> void pushQueue(T value, long delay, TimeUnit timeUnit, String queueName) {
  66. try {
  67. RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueName);
  68. RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
  69. delayedQueue.offer(value, delay, timeUnit);
  70. AssertLog.info("创建延时队列成功 queueName:{},value值:{},延迟时间:{}", queueName, value, timeUnit.toSeconds(delay) + "-" + timeUnit.name());
  71. delayedQueue.destroy(); // 释放队列
  72. } catch (Exception e) {
  73. AssertLog.error("创建延时队列失败 {}", e.getMessage());
  74. throw new ServerException("创建延时队列失败");
  75. }
  76. }
  77. /**
  78. * 获取延迟队列
  79. *
  80. * @param queueName
  81. * @param <T>
  82. * @return
  83. * @throws InterruptedException
  84. */
  85. public <T> T pollQueue(String queueName) throws InterruptedException {
  86. RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueName);
  87. redissonClient.getDelayedQueue(blockingDeque);// 避免消息伪丢失(应用重启未消费),官网推荐
  88. T value = (T) blockingDeque.take();
  89. return value;
  90. }
  91. }
  1. import com.tizo.biz.base.utils.AssertLog;
  2. import com.tizo.biz.redis.core.delay.RedisDelayQueueService;
  3. import com.tizo.biz.redis.core.delay.annoation.RedisDelay;
  4. import jakarta.annotation.Resource;
  5. import org.apache.commons.lang3.ObjectUtils;
  6. import org.apache.commons.lang3.concurrent.BasicThreadFactory;
  7. import org.springframework.beans.BeansException;
  8. import org.springframework.context.ApplicationContext;
  9. import org.springframework.context.ApplicationContextAware;
  10. import org.springframework.stereotype.Component;
  11. import org.springframework.util.CollectionUtils;
  12. import java.util.Map;
  13. import java.util.TimerTask;
  14. import java.util.concurrent.ScheduledExecutorService;
  15. import java.util.concurrent.ScheduledThreadPoolExecutor;
  16. import java.util.concurrent.TimeUnit;
  17. @Component
  18. public class RedisDelayedQueueInit implements ApplicationContextAware {
  19. @Resource
  20. private RedisDelayQueueService redisDelayQueueService;
  21. @Override
  22. @SuppressWarnings("unchecked")
  23. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  24. Map<String, Object> handlers = applicationContext.getBeansWithAnnotation(RedisDelay.class);
  25. if (!CollectionUtils.isEmpty(handlers)) {
  26. ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new BasicThreadFactory.Builder().namingPattern("redisDelay-schedule-pool-%d").daemon(true).build());
  27. handlers.values().forEach(tempHandler -> {
  28. boolean result = tempHandler.getClass().isAnnotationPresent(RedisDelay.class);
  29. if (result) {
  30. RedisDelay annotation = tempHandler.getClass().getAnnotation(RedisDelay.class);
  31. AssertLog.info("Redis延迟队列:{} 监听中>>>>>>>>>>>>>", annotation.queueName());
  32. executorService.scheduleAtFixedRate(new TimerTask() {
  33. @Override
  34. public void run() {
  35. while (true) {
  36. try {
  37. Object value = redisDelayQueueService.pollQueue(annotation.queueName());
  38. if (ObjectUtils.isNotEmpty(value)) {
  39. RedisDelayQueueListener<Object> redisDelayQueueHandle = (RedisDelayQueueListener<Object>) tempHandler;
  40. redisDelayQueueHandle.execute(value);
  41. }
  42. } catch (InterruptedException e) {
  43. AssertLog.error("Redis延迟队列异常中断{}", e.getMessage());
  44. }
  45. }
  46. }
  47. }, 0, 1, TimeUnit.SECONDS);// 每秒检测一次
  48. }
  49. });
  50. }
  51. }
  52. }
  1. import com.tizo.biz.base.utils.AssertLog;
  2. import com.tizo.biz.redis.core.delay.RedisDelayQueueService;
  3. import com.tizo.biz.redis.core.delay.annoation.RedisDelay;
  4. import com.tizo.biz.redis.core.delay.handle.RedisDelayQueueListener;
  5. import jakarta.annotation.Resource;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.web.bind.annotation.PostMapping;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @Component
  11. public class RedisDelayTest {
  12. @Resource
  13. private RedisDelayQueueService redisDelayQueueService;
  14. //生产
  15. @ApiOperationSupport(order = 1)
  16. @Operation(summary = "测试Queue")
  17. @PostMapping("/addQueue")
  18. public void addQueue() {
  19. redisDelayQueueService.pushQueue("6666", 10, TimeUnit.SECONDS, "TTS-10");
  20. Map<String, String> map2 = new HashMap<>();
  21. map2.put("orderId", "200");
  22. map2.put("remark", "订单超时未评价,系统默认好评");
  23. // 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟
  24. redisDelayQueueService.pushQueue(map2, 20, TimeUnit.SECONDS, "TTS-20");
  25. }
  26. //消费
  27. @RedisDelay(queueName = "TTS-10")
  28. public class Test1 implements RedisDelayQueueListener<String> {
  29. @Override
  30. public void execute(String t) {
  31. AssertLog.info("AAA=====>{}", JSON.toJSONString(t));
  32. }
  33. }
  34. @RedisDelay(queueName = "TTS-20")
  35. public class Test2 implements RedisDelayQueueListener<Map<String, String>> {
  36. @Override
  37. public void execute(Map<String, String> t) {
  38. AssertLog.info("BBB=====>{}", JSON.toJSONString(t));
  39. }
  40. }
  41. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/823256
推荐阅读
  

闽ICP备14008679号