当前位置:   article > 正文

Redisson 延迟队列_redisson延迟队列

redisson延迟队列

目录

1、延迟队列枚举

2、延迟队列工具类

3、延迟队列启动类

4、延迟队列执行器接口

5、延迟队列执行类

6、添加延迟消息


1、延迟队列枚举

  1. @Getter
  2. @AllArgsConstructor
  3. public enum RedisDelayQueueEnum {
  4. /**
  5. * 运营通知
  6. */
  7. OPERATING_NOTIFY("OPERATING_NOTIFY", "运营通知", "operatingNotifyListener");
  8. private final String queueCode;
  9. private final String desc;
  10. private final String beanId;
  11. }

2、延迟队列工具类

  1. @Slf4j
  2. @Component
  3. @RequiredArgsConstructor
  4. public class RedisDelayQueueUtil {
  5. private final RedissonClient redissonClient;
  6. /**
  7. * 添加延迟队列
  8. *
  9. * @param message 消息
  10. * @param delay 时长
  11. * @param timeUnit 单位
  12. * @param queueCode 队列键
  13. */
  14. public <T> void addDelayQueue(T message, long delay, TimeUnit timeUnit, String queueCode) {
  15. try {
  16. RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
  17. RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
  18. delayedQueue.offer(message, delay, timeUnit);
  19. log.info("添加延时队列成功,队列键:{},队列值:{},延迟时间:{}", queueCode, message, timeUnit.toSeconds(delay) + "秒");
  20. } catch (Exception e) {
  21. log.error("添加延时队列失败:{}", e.getMessage());
  22. throw new RuntimeException("添加延时队列失败");
  23. }
  24. }
  25. /**
  26. * 获取延迟队列
  27. *
  28. * @param queueCode 队列键
  29. * @return 队列值
  30. */
  31. public <T> T getDelayQueue(String queueCode) throws InterruptedException {
  32. RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode);
  33. redissonClient.getDelayedQueue(blockingDeque);
  34. T value = (T) blockingDeque.take();
  35. return value;
  36. }
  37. }

3、延迟队列启动类

  1. @Slf4j
  2. @Component
  3. @RequiredArgsConstructor
  4. public class RedisDelayQueueRunner implements CommandLineRunner {
  5. private final RedisDelayQueueUtil redisDelayQueueUtil;
  6. @Override
  7. public void run(String... args) throws Exception {
  8. ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
  9. .setNameFormat("redis-delay-%d").build();
  10. ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
  11. 0L, TimeUnit.MILLISECONDS,
  12. new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
  13. singleThreadPool.execute(() -> {
  14. while (true) {
  15. RedisDelayQueueEnum[] queueArray = RedisDelayQueueEnum.values();
  16. for (RedisDelayQueueEnum queue : queueArray) {
  17. try {
  18. Object o = redisDelayQueueUtil.getDelayQueue(queue.getQueueCode());
  19. if (null != o) {
  20. RedisDelayQueueExecutor redisDelayQueueHandle = SpringContextHolder.getBean(queue.getBeanId());
  21. redisDelayQueueHandle.execute(o);
  22. }
  23. } catch (Exception e) {
  24. log.error("Redis延迟队列{}异常中断:{}", queue.getDesc(), e.getMessage(), e);
  25. }
  26. }
  27. }
  28. });
  29. log.info("Redis延迟队列启动成功");
  30. }
  31. }

4、延迟队列执行器接口

  1. public interface RedisDelayQueueExecutor<T> {
  2. /**
  3. * 执行
  4. * @param t 消息
  5. * @author shiweijia
  6. * @date 2021/8/6 16:46
  7. */
  8. void execute(T t);
  9. }

5、延迟队列执行类

  1. @Slf4j
  2. @Component
  3. @RequiredArgsConstructor
  4. public class OperatingNotifyListener implements RedisDelayQueueExecutor<String> {
  5. @Override
  6. public void execute(String message) {
  7. log.info("运营通知消息:{}", message);
  8. }
  9. }

6、添加延迟消息

redisDelayQueueUtil.addDelayQueue("运营通知消息", 60, TimeUnit.SECONDS, RedisDelayQueueEnum.OPERATING_NOTIFY.getQueueCode());

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/823226
推荐阅读
  

闽ICP备14008679号