赞
踩
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>3.16.6</version>
- </dependency>
2、设置RedissonConfig
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.config.Config;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RedissonConfig {
-
-
- private String host = "127.0.0.1";
-
- private String port = "6379";
- /**
- * 获取redissonClient实例
- *
- * @return
- * @throws Exception
- */
- @Bean
- public RedissonClient getRedisson() {
- Config config = new Config();
- String address = "redis://" + host + ":" + port;
- config.useSingleServer().setAddress(address);
- return Redisson.create(config);
- }
- }
3、代码逻辑
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import java.util.concurrent.TimeUnit;
-
- @RestController
- public class DelayedQueueController {
-
- @Resource
- private RedisDelayedQueue redisDelayedQueue;
-
- @GetMapping("/index")
- public Object index() {
- //延时队列里放入 value,延时10秒
- redisDelayedQueue.addQueue("value", 10, TimeUnit.SECONDS, TaskUserListener.class.getName());
- return "ok";
- }
-
- }
- import lombok.extern.slf4j.Slf4j;
- import org.redisson.api.RBlockingQueue;
- import org.redisson.api.RDelayedQueue;
- import org.redisson.api.RedissonClient;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.TimeUnit;
-
- @Slf4j
- @Component
- public class RedisDelayedQueue {
-
- @Autowired
- private RedissonClient redissonClient;
-
- /**
- * 添加队列
- *
- * @param t 队列里DTO传输类
- * @param delay 延时时间
- * @param timeUnit 时间单位
- * @param queueName 队列名称
- * @param <T> 泛型
- */
- public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
- log.info("添加队列{},delay:{},timeUnit:{}", queueName, delay, timeUnit);
- RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
- RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
- delayedQueue.offer(t, delay, timeUnit);
- }
-
- }
- import com.alibaba.fastjson.JSON;
- import lombok.extern.slf4j.Slf4j;
- import org.redisson.api.RBlockingQueue;
- import org.redisson.api.RDelayedQueue;
- import org.redisson.api.RedissonClient;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
-
- @Slf4j
- @Component
- public class RedisDelayedQueueInit implements ApplicationContextAware {
-
- @Autowired
- private RedissonClient redissonClient;
-
- /**
- * 获取应用上下文并获取相应的接口实现类
- *
- * @param applicationContext
- * @throws BeansException
- */
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
- for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
- String listenerName = taskEventListenerEntry.getValue().getClass().getName();
- startThread(listenerName, taskEventListenerEntry.getValue());
- }
- }
-
- /**
- * 启动线程获取队列*
- *
- * @param queueName queueName
- * @param redisDelayedQueueListener 任务回调监听
- * @param <T> 泛型
- * @return
- */
- private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
- RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
- // 注意虽然delayedQueue在这个方法里面没有用到,但是这行代码也是必不可少的。
- RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
- //由于此线程需要常驻,可以新建线程,不用交给线程池管理
- Thread thread = new Thread(() -> {
- log.info("启动监听队列线程:" + queueName);
- while (true) {
- try {
- T t = blockingFairQueue.take();
- log.info("监听队列线程:{},获取到值:{}", queueName, JSON.toJSONString(t));
- new Thread(() -> {
- redisDelayedQueueListener.invoke(t);
- }).start();
- } catch (Exception e) {
- log.info("监听队列线程错误,", e);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ex) {
- }
- }
- }
- });
- thread.setName(queueName);
- thread.start();
- }
- }
- /**
- * 实现此接口,完成对队列数据的消费
- *
- * @param <T>
- */
- public interface RedisDelayedQueueListener<T> {
-
- /**
- * 执行方法
- *
- * @param t
- */
- void invoke(T t);
- }
- /**
- * redisson延时队列消费
- * 多个任务可以写多个Listener
- * @date 2023/1/14 9:48
- * @desc
- */
- @Component
- @Slf4j
- public class TaskUserListener implements RedisDelayedQueueListener<String> {
-
-
- @Override
- public void invoke(String userId) {
- log.info("==========>账号队列任务开始 用户Id{}", userId);
- //消费队列消息,处理相应的业务
- System.out.println("消费消息");
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。