当前位置:   article > 正文

Redission实现延时队列_redisson 延时队列 只消费一次

redisson 延时队列 只消费一次
1、引入redission
  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson</artifactId>
  4. <version>3.16.6</version>
  5. </dependency>

2、设置RedissonConfig

  1. import org.redisson.Redisson;
  2. import org.redisson.api.RedissonClient;
  3. import org.redisson.config.Config;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RedissonConfig {
  8. private String host = "127.0.0.1";
  9. private String port = "6379";
  10. /**
  11. * 获取redissonClient实例
  12. *
  13. * @return
  14. * @throws Exception
  15. */
  16. @Bean
  17. public RedissonClient getRedisson() {
  18. Config config = new Config();
  19. String address = "redis://" + host + ":" + port;
  20. config.useSingleServer().setAddress(address);
  21. return Redisson.create(config);
  22. }
  23. }

3、代码逻辑

  1. import org.springframework.web.bind.annotation.GetMapping;
  2. import org.springframework.web.bind.annotation.RestController;
  3. import javax.annotation.Resource;
  4. import java.util.concurrent.TimeUnit;
  5. @RestController
  6. public class DelayedQueueController {
  7. @Resource
  8. private RedisDelayedQueue redisDelayedQueue;
  9. @GetMapping("/index")
  10. public Object index() {
  11. //延时队列里放入 value,延时10秒
  12. redisDelayedQueue.addQueue("value", 10, TimeUnit.SECONDS, TaskUserListener.class.getName());
  13. return "ok";
  14. }
  15. }
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.redisson.api.RBlockingQueue;
  3. import org.redisson.api.RDelayedQueue;
  4. import org.redisson.api.RedissonClient;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import java.util.concurrent.TimeUnit;
  8. @Slf4j
  9. @Component
  10. public class RedisDelayedQueue {
  11. @Autowired
  12. private RedissonClient redissonClient;
  13. /**
  14. * 添加队列
  15. *
  16. * @param t 队列里DTO传输类
  17. * @param delay 延时时间
  18. * @param timeUnit 时间单位
  19. * @param queueName 队列名称
  20. * @param <T> 泛型
  21. */
  22. public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
  23. log.info("添加队列{},delay:{},timeUnit:{}", queueName, delay, timeUnit);
  24. RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
  25. RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
  26. delayedQueue.offer(t, delay, timeUnit);
  27. }
  28. }
  1. import com.alibaba.fastjson.JSON;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.redisson.api.RBlockingQueue;
  4. import org.redisson.api.RDelayedQueue;
  5. import org.redisson.api.RedissonClient;
  6. import org.springframework.beans.BeansException;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.context.ApplicationContext;
  9. import org.springframework.context.ApplicationContextAware;
  10. import org.springframework.stereotype.Component;
  11. import java.util.Map;
  12. @Slf4j
  13. @Component
  14. public class RedisDelayedQueueInit implements ApplicationContextAware {
  15. @Autowired
  16. private RedissonClient redissonClient;
  17. /**
  18. * 获取应用上下文并获取相应的接口实现类
  19. *
  20. * @param applicationContext
  21. * @throws BeansException
  22. */
  23. @Override
  24. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  25. Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
  26. for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
  27. String listenerName = taskEventListenerEntry.getValue().getClass().getName();
  28. startThread(listenerName, taskEventListenerEntry.getValue());
  29. }
  30. }
  31. /**
  32. * 启动线程获取队列*
  33. *
  34. * @param queueName queueName
  35. * @param redisDelayedQueueListener 任务回调监听
  36. * @param <T> 泛型
  37. * @return
  38. */
  39. private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
  40. RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
  41. // 注意虽然delayedQueue在这个方法里面没有用到,但是这行代码也是必不可少的。
  42. RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
  43. //由于此线程需要常驻,可以新建线程,不用交给线程池管理
  44. Thread thread = new Thread(() -> {
  45. log.info("启动监听队列线程:" + queueName);
  46. while (true) {
  47. try {
  48. T t = blockingFairQueue.take();
  49. log.info("监听队列线程:{},获取到值:{}", queueName, JSON.toJSONString(t));
  50. new Thread(() -> {
  51. redisDelayedQueueListener.invoke(t);
  52. }).start();
  53. } catch (Exception e) {
  54. log.info("监听队列线程错误,", e);
  55. try {
  56. Thread.sleep(10000);
  57. } catch (InterruptedException ex) {
  58. }
  59. }
  60. }
  61. });
  62. thread.setName(queueName);
  63. thread.start();
  64. }
  65. }
  1. /**
  2. * 实现此接口,完成对队列数据的消费
  3. *
  4. * @param <T>
  5. */
  6. public interface RedisDelayedQueueListener<T> {
  7. /**
  8. * 执行方法
  9. *
  10. * @param t
  11. */
  12. void invoke(T t);
  13. }
  1. /**
  2. * redisson延时队列消费
  3. * 多个任务可以写多个Listener
  4. * @date 2023/1/14 9:48
  5. * @desc
  6. */
  7. @Component
  8. @Slf4j
  9. public class TaskUserListener implements RedisDelayedQueueListener<String> {
  10. @Override
  11. public void invoke(String userId) {
  12. log.info("==========>账号队列任务开始 用户Id{}", userId);
  13. //消费队列消息,处理相应的业务
  14. System.out.println("消费消息");
  15. }
  16. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/823216
推荐阅读
相关标签
  

闽ICP备14008679号