当前位置:   article > 正文

【教程】Redis实现延迟队列 Redisson_redisson延迟队列数据在redis中看不到

redisson延迟队列数据在redis中看不到

【教程】Redis实现延迟队列 Redisson

前提:需要提前集成Redis

1 添加依赖

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson</artifactId>
  4. <version>3.19.3</version>
  5. </dependency>

2. 创建Redisson 配置类

  1. import org.redisson.Redisson;
  2. import org.redisson.api.RedissonClient;
  3. import org.redisson.config.Config;
  4. import org.redisson.config.SingleServerConfig;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @description Redisson 配置类
  10. */
  11. @Configuration
  12. public class MyRedissonConfig {
  13. @Value(value = "${spring.redis.host}")
  14. private String host;
  15. @Value(value = "${spring.redis.port}")
  16. private int port;
  17. @Value(value = "${spring.redis.database}")
  18. private int database;
  19. @Value(value = "${spring.redis.password}")
  20. private String password;
  21. /**
  22. * 单Redis节点模式配置方法
  23. * 其他配置參數,看:
  24. * <a href = "https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95#26-%E5%8D%95redis%E8%8A%82%E7%82%B9%E6%A8%A1%E5%BC%8F">
  25. * 单Redis节点模式配置方法
  26. * </a>
  27. *
  28. * @return {@link RedissonClient}
  29. */
  30. @Bean(destroyMethod = "shutdown")
  31. RedissonClient redisson() {
  32. Config config = new Config();
  33. //Redis多节点
  34. // config.useClusterServers()
  35. // .addNodeAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:7001");
  36. //Redis单节点
  37. SingleServerConfig singleServerConfig = config.useSingleServer();
  38. //可以用"rediss://"来启用SSL连接
  39. String address = "redis://" + host + ":" + port;
  40. singleServerConfig.setAddress(address);
  41. //设置 数据库编号
  42. singleServerConfig.setDatabase(database);
  43. singleServerConfig.setPassword(password);
  44. //连接池大小:默认值:64
  45. // singleServerConfig.setConnectionPoolSize()
  46. return Redisson.create(config);
  47. }
  48. }

3.创建 RedissonDelayQueue

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.redisson.Redisson;
  3. import org.redisson.api.RBlockingQueue;
  4. import org.redisson.api.RDelayedQueue;
  5. import org.redisson.api.RedissonClient;
  6. import org.redisson.config.Config;
  7. import org.redisson.config.SingleServerConfig;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.PostConstruct;
  10. import javax.annotation.Resource;
  11. import java.util.concurrent.TimeUnit;
  12. @Component
  13. @Slf4j
  14. public class RedissonDelayQueue {
  15. @Resource
  16. private RedissonClient redissonClient;
  17. private RDelayedQueue<String> delayQueue;
  18. private RBlockingQueue<String> blockingQueue;
  19. @PostConstruct
  20. public void init() {
  21. initDelayQueue();
  22. startDelayQueueConsumer();
  23. }
  24. /**
  25. * 初始化
  26. */
  27. private void initDelayQueue() {
  28. // Config config = new Config();
  29. // SingleServerConfig serverConfig = config.useSingleServer();
  30. // serverConfig.setAddress("redis://localhost:6379");
  31. // redissonClient = Redisson.create(config);
  32. blockingQueue = redissonClient.getBlockingQueue("SANYOU");
  33. delayQueue = redissonClient.getDelayedQueue(blockingQueue);
  34. }
  35. /**
  36. * 监听延时任务
  37. */
  38. private void startDelayQueueConsumer() {
  39. new Thread(() -> {
  40. while (true) {
  41. try {
  42. String task = blockingQueue.take();
  43. log.info("接收到延迟任务:{}", task);
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }, "SANYOU-Consumer").start();
  49. }
  50. /**
  51. * 添加延迟任务
  52. * @param task
  53. * @param seconds
  54. */
  55. public void offerTask(String task, long seconds) {
  56. log.info("添加延迟任务:{} 延迟时间:{}s", task, seconds);
  57. delayQueue.offer(task, seconds, TimeUnit.SECONDS);
  58. }
  59. }

4. 使用测试

  1. /**
  2. * redis延迟队列
  3. */
  4. @Resource
  5. private RedissonDelayQueue redissonDelayQueue;
  6. /**
  7. * 添加延迟队列
  8. * @param task
  9. */
  10. @GetMapping("/add")
  11. public void addTask(@RequestParam("task") String task, int time) {
  12. redissonDelayQueue.offerTask(task, time);
  13. }

请求

http://127.0.0.1:8080/add?task=你好&time=7

结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/823250
推荐阅读
相关标签
  

闽ICP备14008679号