赞
踩
目录
- @Getter
- @AllArgsConstructor
- public enum RedisDelayQueueEnum {
- /**
- * 运营通知
- */
- OPERATING_NOTIFY("OPERATING_NOTIFY", "运营通知", "operatingNotifyListener");
-
- private final String queueCode;
- private final String desc;
- private final String beanId;
- }
- @Slf4j
- @Component
- @RequiredArgsConstructor
- public class RedisDelayQueueUtil {
- private final RedissonClient redissonClient;
-
- /**
- * 添加延迟队列
- *
- * @param message 消息
- * @param delay 时长
- * @param timeUnit 单位
- * @param queueCode 队列键
- */
- public <T> void addDelayQueue(T message, long delay, TimeUnit timeUnit, String queueCode) {
- try {
- RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
- RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
- delayedQueue.offer(message, delay, timeUnit);
- log.info("添加延时队列成功,队列键:{},队列值:{},延迟时间:{}", queueCode, message, timeUnit.toSeconds(delay) + "秒");
- } catch (Exception e) {
- log.error("添加延时队列失败:{}", e.getMessage());
- throw new RuntimeException("添加延时队列失败");
- }
- }
-
- /**
- * 获取延迟队列
- *
- * @param queueCode 队列键
- * @return 队列值
- */
- public <T> T getDelayQueue(String queueCode) throws InterruptedException {
- RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(queueCode);
- redissonClient.getDelayedQueue(blockingDeque);
- T value = (T) blockingDeque.take();
- return value;
- }
- }
- @Slf4j
- @Component
- @RequiredArgsConstructor
- public class RedisDelayQueueRunner implements CommandLineRunner {
- private final RedisDelayQueueUtil redisDelayQueueUtil;
-
- @Override
- public void run(String... args) throws Exception {
- ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
- .setNameFormat("redis-delay-%d").build();
- ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
-
- singleThreadPool.execute(() -> {
- while (true) {
- RedisDelayQueueEnum[] queueArray = RedisDelayQueueEnum.values();
- for (RedisDelayQueueEnum queue : queueArray) {
- try {
- Object o = redisDelayQueueUtil.getDelayQueue(queue.getQueueCode());
- if (null != o) {
- RedisDelayQueueExecutor redisDelayQueueHandle = SpringContextHolder.getBean(queue.getBeanId());
- redisDelayQueueHandle.execute(o);
- }
- } catch (Exception e) {
- log.error("Redis延迟队列{}异常中断:{}", queue.getDesc(), e.getMessage(), e);
- }
- }
- }
- });
-
- log.info("Redis延迟队列启动成功");
- }
-
- }
- public interface RedisDelayQueueExecutor<T> {
-
- /**
- * 执行
- * @param t 消息
- * @author shiweijia
- * @date 2021/8/6 16:46
- */
- void execute(T t);
- }
- @Slf4j
- @Component
- @RequiredArgsConstructor
- public class OperatingNotifyListener implements RedisDelayQueueExecutor<String> {
-
- @Override
- public void execute(String message) {
- log.info("运营通知消息:{}", message);
- }
- }
redisDelayQueueUtil.addDelayQueue("运营通知消息", 60, TimeUnit.SECONDS, RedisDelayQueueEnum.OPERATING_NOTIFY.getQueueCode());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。