赞
踩
- import org.springframework.stereotype.Component;
-
- import java.lang.annotation.*;
-
- @Component
- @Documented
- @Target({ ElementType.TYPE })
- @Retention(RetentionPolicy.RUNTIME)
- public @interface RedisDelay {
-
- /**
- * 延迟队列 queueName
- */
- String queueName();
-
- /**
- * 延迟队列描述
- */
- String desc() default "";
-
- }
- public interface RedisDelayQueueListener<T> {
-
- void execute(T t);
-
- }
-
- import com.tizo.biz.base.exception.type.ServerException;
- import com.tizo.biz.base.utils.AssertLog;
- import jakarta.annotation.Resource;
- import org.redisson.api.RBlockingDeque;
- import org.redisson.api.RDelayedQueue;
- import org.redisson.api.RedissonClient;
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.TimeUnit;
-
- /**
- * redis延迟队列工具
- */
- @Component
- public class RedisDelayQueueService {
-
- @Resource
- private RedissonClient redissonClient;
-
- /**
- * 添加队列-秒
- *
- * @param t DTO传输类
- * @param delay 时间数量
- * @param <T> 泛型
- */
- public <T> void pushQueueSeconds(T t, long delay, String queueName) {
- pushQueue(t, delay, TimeUnit.SECONDS, queueName);
- }
-
- /**
- * 添加队列-分
- *
- * @param t DTO传输类
- * @param delay 时间数量
- * @param <T> 泛型
- */
- public <T> void pushQueueMinutes(T t, long delay, String queueName) {
- pushQueue(t, delay, TimeUnit.MINUTES, queueName);
- }
-
- /**
- * 添加队列-时
- *
- * @param t DTO传输类
- * @param delay 时间数量
- * @param <T> 泛型
- */
- public <T> void pushQueueHours(T t, long delay, String queueName) {
- pushQueue(t, delay, TimeUnit.HOURS, queueName);
- }
-
- /**
- * 添加队列-天
- *
- * @param t DTO传输类
- * @param delay 时间数量
- * @param <T> 泛型
- */
- public <T> void pushQueueDays(T t, long delay, String queueName) {
- pushQueue(t, delay, TimeUnit.DAYS, queueName);
- }
-
- /**
- * 创建延迟队列
- *
- * @param value 队列值
- * @param delay 延迟时间
- * @param timeUnit 时间单位
- * @param queueName 队列键
- * @param <T>
- */
- public <T> void pushQueue(T value, long delay, TimeUnit timeUnit, String queueName) {
- try {
- RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueName);
- RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
- delayedQueue.offer(value, delay, timeUnit);
- AssertLog.info("创建延时队列成功 queueName:{},value值:{},延迟时间:{}", queueName, value, timeUnit.toSeconds(delay) + "-" + timeUnit.name());
- delayedQueue.destroy(); // 释放队列
- } catch (Exception e) {
- AssertLog.error("创建延时队列失败 {}", e.getMessage());
- throw new ServerException("创建延时队列失败");
- }
- }
-
- /**
- * 获取延迟队列
- *
- * @param queueName
- * @param <T>
- * @return
- * @throws InterruptedException
- */
- public <T> T pollQueue(String queueName) throws InterruptedException {
- RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueName);
- redissonClient.getDelayedQueue(blockingDeque);// 避免消息伪丢失(应用重启未消费),官网推荐
- T value = (T) blockingDeque.take();
- return value;
- }
- }
- import com.tizo.biz.base.utils.AssertLog;
- import com.tizo.biz.redis.core.delay.RedisDelayQueueService;
- import com.tizo.biz.redis.core.delay.annoation.RedisDelay;
- import jakarta.annotation.Resource;
- import org.apache.commons.lang3.ObjectUtils;
- import org.apache.commons.lang3.concurrent.BasicThreadFactory;
- import org.springframework.beans.BeansException;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
-
- import java.util.Map;
- import java.util.TimerTask;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
- @Component
- public class RedisDelayedQueueInit implements ApplicationContextAware {
-
- @Resource
- private RedisDelayQueueService redisDelayQueueService;
-
- @Override
- @SuppressWarnings("unchecked")
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- Map<String, Object> handlers = applicationContext.getBeansWithAnnotation(RedisDelay.class);
- if (!CollectionUtils.isEmpty(handlers)) {
- ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new BasicThreadFactory.Builder().namingPattern("redisDelay-schedule-pool-%d").daemon(true).build());
- handlers.values().forEach(tempHandler -> {
- boolean result = tempHandler.getClass().isAnnotationPresent(RedisDelay.class);
- if (result) {
- RedisDelay annotation = tempHandler.getClass().getAnnotation(RedisDelay.class);
- AssertLog.info("Redis延迟队列:{} 监听中>>>>>>>>>>>>>", annotation.queueName());
- executorService.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- while (true) {
- try {
- Object value = redisDelayQueueService.pollQueue(annotation.queueName());
- if (ObjectUtils.isNotEmpty(value)) {
- RedisDelayQueueListener<Object> redisDelayQueueHandle = (RedisDelayQueueListener<Object>) tempHandler;
- redisDelayQueueHandle.execute(value);
- }
- } catch (InterruptedException e) {
- AssertLog.error("Redis延迟队列异常中断{}", e.getMessage());
- }
- }
- }
- }, 0, 1, TimeUnit.SECONDS);// 每秒检测一次
- }
- });
- }
- }
- }
- import com.tizo.biz.base.utils.AssertLog;
- import com.tizo.biz.redis.core.delay.RedisDelayQueueService;
- import com.tizo.biz.redis.core.delay.annoation.RedisDelay;
- import com.tizo.biz.redis.core.delay.handle.RedisDelayQueueListener;
- import jakarta.annotation.Resource;
- import org.springframework.stereotype.Component;
- import org.springframework.web.bind.annotation.PostMapping;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Component
- public class RedisDelayTest {
-
- @Resource
- private RedisDelayQueueService redisDelayQueueService;
-
- //生产
- @ApiOperationSupport(order = 1)
- @Operation(summary = "测试Queue")
- @PostMapping("/addQueue")
- public void addQueue() {
-
- redisDelayQueueService.pushQueue("6666", 10, TimeUnit.SECONDS, "TTS-10");
-
- Map<String, String> map2 = new HashMap<>();
- map2.put("orderId", "200");
- map2.put("remark", "订单超时未评价,系统默认好评");
-
- // 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟
- redisDelayQueueService.pushQueue(map2, 20, TimeUnit.SECONDS, "TTS-20");
- }
-
- //消费
- @RedisDelay(queueName = "TTS-10")
- public class Test1 implements RedisDelayQueueListener<String> {
- @Override
- public void execute(String t) {
- AssertLog.info("AAA=====>{}", JSON.toJSONString(t));
- }
- }
-
- @RedisDelay(queueName = "TTS-20")
- public class Test2 implements RedisDelayQueueListener<Map<String, String>> {
- @Override
- public void execute(Map<String, String> t) {
- AssertLog.info("BBB=====>{}", JSON.toJSONString(t));
- }
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。