当前位置:   article > 正文

java + redis zset实现延迟队列(定时到期执行任务)_java redis zset

java redis zset

在Redis中,zet作为有序集合,可以利用其有序的特性,将任务添加到zset中,将任务的到期时间作为score,利用zset的默认有序特性,zrangewithscores可以获取score值最小的元素(也就是最近到期的任务),判断系统时间与该任务的到期时间大小,如果达到到期时间,就执行业务,并删除该到期任务,继续判断下一个元素,如果没有到期,就sleep一段时间(比如1秒),如果集合为空,也sleep一段时间。

1. 添加依赖

  1. <dependency>
  2. <groupId>redis.clients</groupId>
  3. <artifactId>jedis</artifactId>
  4. <version>3.3.0</version>
  5. </dependency>

2. 测试代码

  1. package com.demo;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.Tuple;
  4. import java.text.SimpleDateFormat;
  5. import java.time.LocalDateTime;
  6. import java.time.format.DateTimeFormatter;
  7. import java.util.Iterator;
  8. import java.util.Random;
  9. import java.util.Set;
  10. /**
  11. * 基于redis的延迟队列
  12. */
  13. public class RedisDelayQueue {
  14. public static void main(String[] args) {
  15. System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
  16. RedisProduceThread produceThread=new RedisProduceThread();
  17. produceThread.start();
  18. RedisConsumeThread consumeThread=new RedisConsumeThread();
  19. consumeThread.start();
  20. }
  21. public static class DelayTask {
  22. /* 触发时间*/
  23. private long time;
  24. private String name;
  25. public long getTime() {
  26. return time;
  27. }
  28. public void setTime(long time) {
  29. this.time = time;
  30. }
  31. public String getName() {
  32. return name;
  33. }
  34. public void setName(String name) {
  35. this.name = name;
  36. }
  37. }
  38. // 添加任务线程
  39. public static class RedisProduceThread extends Thread {
  40. public RedisProduceThread() {
  41. }
  42. @Override
  43. public void run() {
  44. Jedis jedis = new Jedis("127.0.0.1",6379);
  45. while (true)
  46. {
  47. long timeMillis = System.currentTimeMillis();
  48. Random rnd = new Random();
  49. int i = rnd.nextInt(30);
  50. double delay = timeMillis / 1000 + i;
  51. jedis.zadd("myzset", delay, "item-" + i);
  52. Double doubleDelay = delay;
  53. long longDelay = doubleDelay.longValue();
  54. System.out.println("添加业务:item-" + i + ",添加时间:" + timeMillis / 1000 + " ,到期时间:" + longDelay + ",延迟时间:" + i + " 秒");
  55. try {
  56. Thread.sleep(3000);
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. }
  62. }
  63. // 读取到期任务线程
  64. public static class RedisConsumeThread extends Thread {
  65. public RedisConsumeThread() {
  66. }
  67. @Override
  68. public void run() {
  69. Jedis jedis = new Jedis("127.0.0.1",6379);
  70. while (true) {
  71. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  72. // 从redis读取时间最小的数据
  73. long timestamp = System.currentTimeMillis() / 1000;
  74. Set<Tuple> myzset = jedis.zrangeWithScores("myzset", 0, 1);
  75. // 如果读取记录为空
  76. if(myzset.isEmpty())
  77. {
  78. // 延时1
  79. try {
  80. Thread.sleep(1000);
  81. } catch (InterruptedException e) {
  82. e.printStackTrace();
  83. }
  84. continue;
  85. }
  86. Iterator<Tuple> iterator = myzset.iterator();
  87. while (iterator.hasNext())
  88. {
  89. Tuple tuple = iterator.next();
  90. String item = tuple.getElement();
  91. Double score = tuple.getScore();
  92. // 如果当前记录到期
  93. if(timestamp >= score)
  94. {
  95. long lscore = score.longValue();
  96. // 执行业务处理
  97. System.out.println("到期业务:" + item + " ,到期时间:" + lscore + ",系统时间:" + timestamp);
  98. // 处理完成后,删除当前记录
  99. jedis.zrem("myzset", item);
  100. // 继续循环读取下一条
  101. }
  102. else
  103. {
  104. // 最小记录未到期,延时1
  105. try {
  106. Thread.sleep(1000);
  107. } catch (InterruptedException e) {
  108. e.printStackTrace();
  109. }
  110. }
  111. }
  112. }
  113. }
  114. }
  115. }

3. 执行测试

  1. 添加业务:item-1,添加时间:1645515070 ,到期时间:1645515071,延迟时间:1
  2. 到期业务:item-5 ,到期时间:1645515069,系统时间:1645515070
  3. 到期业务:item-1 ,到期时间:1645515071,系统时间:1645515071
  4. 添加业务:item-5,添加时间:1645515073 ,到期时间:1645515078,延迟时间:5
  5. 到期业务:item-15 ,到期时间:1645515073,系统时间:1645515074
  6. 添加业务:item-23,添加时间:1645515076 ,到期时间:1645515099,延迟时间:23
  7. 添加业务:item-11,添加时间:1645515079 ,到期时间:1645515090,延迟时间:11
  8. 到期业务:item-5 ,到期时间:1645515078,系统时间:1645515079
  9. 添加业务:item-5,添加时间:1645515082 ,到期时间:1645515087,延迟时间:5
  10. 添加业务:item-7,添加时间:1645515085 ,到期时间:1645515092,延迟时间:7
  11. 添加业务:item-29,添加时间:1645515088 ,到期时间:1645515117,延迟时间:29
  12. 到期业务:item-20 ,到期时间:1645515087,系统时间:1645515088
  13. 到期业务:item-5 ,到期时间:1645515087,系统时间:1645515088
  14. 到期业务:item-11 ,到期时间:1645515090,系统时间:1645515090

可以看到添加业务的时间加上延迟时间就是业务到期时间,在业务到期的下一秒,就输出了到期提示。

可以根据业务量的大小,每次读取的数据可以是一条数据,也可以是多条数据。一般情况下,每秒做一次检查可以满足大多数的业务需要,特殊情况下,可以将sleep的时间缩小(比如500ms或者300ms),这样可以做到更大的精确性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/649472
推荐阅读
相关标签
  

闽ICP备14008679号