当前位置:   article > 正文

redis实现延时队列_redis实现延迟队列 java

redis实现延迟队列 java
  1. import com.greenutility.run.util.DateUtils;
  2. import com.greenutility.run.util.RedisUtils;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.beans.factory.InitializingBean;
  5. import org.springframework.data.redis.core.ZSetOperations;
  6. import org.springframework.scheduling.annotation.Async;
  7. import org.springframework.stereotype.Component;
  8. import org.springframework.util.CollectionUtils;
  9. import javax.annotation.Resource;
  10. import java.util.Date;
  11. import java.util.Set;
  12. /**
  13. * redis zset 实现延时队列demo
  14. */
  15. @Slf4j
  16. @Component
  17. public class OrderDelayService implements InitializingBean {
  18. public static final String DELAY_TASK_KEY = "delayTask:";
  19. @Resource
  20. private RedisUtils redisUtils;
  21. /**
  22. * 加入延时队列
  23. * @param id 任务id
  24. * @param time 延时时间(单位:分钟)
  25. */
  26. public void produce(String id, int time) {
  27. redisUtils.zsetAdd(
  28. DELAY_TASK_KEY,
  29. id,
  30. System.currentTimeMillis() + (time * 60 * 1000)
  31. );
  32. log.info("任务 " + id + " 加入延时队列成功,当前时间 = " + DateUtils.dateToString(new Date(), DateUtils.FULL));
  33. }
  34. /**
  35. * 从延时队列移除
  36. * @param id 任务id
  37. */
  38. public Boolean remove(String id) {
  39. log.info("任务 " + id + " 移除延时队列");
  40. return redisUtils.zsetRemove(DELAY_TASK_KEY, id);
  41. }
  42. //延时任务,也是异步任务,延时任务达到时效之后关闭订单,并将延时任务从redis zset删除
  43. @Async("test")
  44. public void consuming() {
  45. Set<ZSetOperations.TypedTuple<Object>> ids = redisUtils.zsetRangeList(
  46. DELAY_TASK_KEY,
  47. 0, //延时任务score最小值
  48. System.currentTimeMillis() //延时任务score最大值(当前时间)
  49. );
  50. if (!CollectionUtils.isEmpty(ids)) {
  51. for (ZSetOperations.TypedTuple<Object> id : ids) {
  52. log.info("任务 " + id.getValue() + " 超时被自动关闭, 关闭时间 = " + DateUtils.dateToString(new Date(), DateUtils.FULL));
  53. //todo 处理业务逻辑
  54. redisUtils.zsetRemove(DELAY_TASK_KEY, id.getValue());
  55. }
  56. }
  57. }
  58. @Override
  59. public void afterPropertiesSet() {
  60. new Thread(() -> {
  61. while (true) {
  62. try {
  63. //5秒轮询一次,延时任务的时间误差在5秒以内
  64. Thread.sleep(5 * 1000);
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. consuming();
  69. }
  70. }).start();
  71. }
  72. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/484325
推荐阅读
相关标签
  

闽ICP备14008679号