赞
踩
1.前置条件 配置好了RedissonConfig 能加载RedissonClient。详见另一篇博客搭建Redis 和 Redisson 项目通用组件
2.在config目录里分别创建RedisDelayedQueue RedisDelayedQueueInit 和 RedisDelayedQueueListener
import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * redisson延时队列配置 */ @Slf4j @Component public class RedisDelayedQueue { @Autowired private RedissonClient redissonClient; /** * 添加队列 * * @param t 队列里DTO传输类 * @param delay 延时时间 * @param timeUnit 时间单位 * @param queueName 队列名称 * @param <T> 泛型 */ public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) { log.info("添加队列{},delay:{},timeUnit:{}", queueName, delay, timeUnit); RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); } }
import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.util.Map; @Slf4j @Component public class RedisDelayedQueueInit implements ApplicationContextAware { @Autowired private RedissonClient redissonClient; /** * 获取应用上下文并获取相应的接口实现类 * * @param applicationContext * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class); for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) { String listenerName = taskEventListenerEntry.getValue().getClass().getName(); startThread(listenerName, taskEventListenerEntry.getValue()); } } /** * 启动线程获取队列* * * @param queueName queueName * @param redisDelayedQueueListener 任务回调监听 * @param <T> 泛型 * @return */ private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName); //由于此线程需要常驻,可以新建线程,不用交给线程池管理 Thread thread = new Thread(() -> { log.info("启动监听队列线程:" + queueName); while (true) { try { T t = blockingFairQueue.take(); log.info("监听队列线程:{},获取到值:{}", queueName, JSON.toJSONString(t)); new Thread(() -> { redisDelayedQueueListener.invoke(t); }).start(); } catch (Exception e) { log.info("监听队列线程错误,", e); try { Thread.sleep(10000); } catch (InterruptedException ex) { } } } }); thread.setName(queueName); thread.start(); } }
/**
* 实现此接口,完成对队列数据的消费
*
* @param <T>
*/
public interface RedisDelayedQueueListener<T> {
/**
* 执行方法
*
* @param t
*/
void invoke(T t);
}
3.放入延时队列消息
@Resource
private RedisDelayedQueue redisDelayedQueue;
@GetMapping("/index")
public Object index() {
//延时队列里放入 value,延时10秒
redisDelayedQueue.addQueue("value", 10, TimeUnit.SECONDS, TaskJinyiUserListener.class.getName());
return "ok";
}
4.消费延迟队列消息
import com.jinyi.up.common.redis.config.RedisDelayedQueueListener; import com.jinyi.up.security.service.IJinyiUserService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * redisson延时队列消费 * * @author huangchong * @date 2023/1/14 9:48 * @desc */ @Component @Slf4j public class TaskJinyiUserListener implements RedisDelayedQueueListener<String> { @Autowired private IJinyiUserService jinyiUserService; @Override public void invoke(String userId) { log.info("==========>账号队列任务开始 用户Id{}", userId); //消费队列消息,处理相应的业务 jinyiUserService.getById(userId); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。