当前位置:   article > 正文

Redis实现延迟任务(过期取消订单)_redis延时队列取消订单

redis延时队列取消订单

1. 生产需求:

  • 用户下订单后,15分钟未支付自动取消;
  • 用户成功下单支付后确认收货, 15天默认好评

2. 实现思路

利用redis的排序列表,ZSet进行需求实现, 下面是我的流程图和思路导线

 

3. 思路说明

我们把Zset中的score当成时间戳, 这样我们就可以获得以时间戳排序的任务列表, 这我们通过score区间进行拉取任务,进行消费.

4.代码封装实现

  • 首先是封装延时队列的工厂(完美契合Spring框架), 如果想要创建自己的特色延时队列则需要继承这个抽象工厂

延时队列工厂

  1. package com.zjrcinfo.zjguahao.common.redis.delayqueue;
  2. import com.zjrcinfo.zjguahao.common.redis.cluster.JedisClusterCache;
  3. import com.zjrcinfo.zjguahao.common.utils.ThreadPoolUtil;
  4. import com.zjrcinfo.zjguahao.common.web.log.LoggerName;
  5. import org.apache.shiro.util.CollectionUtils;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import redis.clients.jedis.Tuple;
  10. import javax.annotation.PostConstruct;
  11. import java.util.Calendar;
  12. import java.util.Set;
  13. import java.util.concurrent.TimeUnit;
  14. /**
  15. * Description: 延时队列工厂
  16. * User: zhouzhou
  17. * Date: 2019-09-26
  18. * Time: 14:32
  19. */
  20. public abstract class AbstractDelayQueueMachineFactory {
  21. protected Logger logger = LoggerFactory.getLogger(LoggerName.KAFKA);
  22. @Autowired
  23. protected JedisClusterCache jedisClusterCache;
  24. /**
  25. * 插入任务id
  26. * @param jobId 任务id(队列内唯一)
  27. * @param time 延时时间(单位 :秒)
  28. * @return 是否插入成功
  29. */
  30. public boolean addJobId(String jobId, Integer time) {
  31. Calendar instance = Calendar.getInstance();
  32. instance.add(Calendar.SECOND, time);
  33. long delaySeconds = instance.getTimeInMillis() / 1000;
  34. Long zadd = jedisClusterCache.zadd(setDelayQueueName(), delaySeconds, jobId);
  35. return zadd > 0;
  36. }
  37. private void startDelayQueueMachine() {
  38. logger.info(String.format("延时队列机器{%s}开始运作", setDelayQueueName()));
  39. // 发生异常捕获并且继续不能让战斗停下来
  40. while (true) {
  41. try {
  42. // 获取当前时间的时间戳
  43. long now = System.currentTimeMillis() / 1000;
  44. // 获取当前时间前的任务列表
  45. Set<Tuple> tuples = jedisClusterCache.zrangeByScoreWithScores(setDelayQueueName(), 0, now);
  46. // 如果不为空则遍历判断其是否满足取消要求
  47. if (!CollectionUtils.isEmpty(tuples)) {
  48. for (Tuple tuple : tuples) {
  49. String jobId = tuple.getElement();
  50. Long num = jedisClusterCache.zrem(setDelayQueueName(), jobId);
  51. // 如果移除成功, 则取消订单
  52. if (num > 0) {
  53. ThreadPoolUtil.execute(() ->invoke(jobId));
  54. }
  55. }
  56. }
  57. } catch (Exception e) {
  58. logger.warn(String.format("处理延时任务发生异常,异常原因为{%s}", e.getMessage()), e);
  59. } finally {
  60. // 间隔一秒钟搞一次
  61. try {
  62. TimeUnit.SECONDS.sleep(1L);
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. }
  68. }
  69. /**
  70. * 最终执行的任务方法
  71. * @param jobId 任务id
  72. */
  73. public abstract void invoke(String jobId);
  74. /**
  75. * 要实现延时队列的名字
  76. *
  77. */
  78. public abstract String setDelayQueueName();
  79. @PostConstruct
  80. public void init(){
  81. new Thread(this::startDelayQueueMachine).start();
  82. }
  83. }

测试延时队列的实现

  1. /**
  2. * Description: 测试订单延时队列
  3. * User: zhouzhou
  4. * Date: 2019-09-26
  5. * Time: 15:14
  6. */
  7. @Component
  8. public class TestOrderDelayQueue extends AbstractDelayQueueMachineFactory {
  9. @Autowired
  10. private TestOrderDelayQueueService testOrderDelayQueueService;
  11. @Override
  12. public void invoke(String jobId) {
  13. testOrderDelayQueueService.cancelOrder(jobId);
  14. }
  15. @Override
  16. public String setDelayQueueName() {
  17. return "TestOrder";
  18. }
  19. }

具体延时消费的Service

  1. package com.zjrcinfo.zjguahao.product.service.impl;
  2. import org.springframework.stereotype.Service;
  3. /**
  4. * Description:
  5. * User: zhouzhou
  6. * Date: 2019-09-26
  7. * Time: 15:21
  8. */
  9. @Service
  10. public class TestOrderDelayQueueService {
  11. public void cancelOrder(String orderNumber) {
  12. System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的订单OrderId为" + orderNumber);
  13. }
  14. public void cancelReg(String orderNumber) {
  15. System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的挂号订单OrderId为" + orderNumber);
  16. }
  17. }

测试用的Controller

  1. package com.zjrcinfo.zjguahao.product.controller;
  2. import com.zjrcinfo.zjguahao.common.web.log.LoggerName;
  3. import com.zjrcinfo.zjguahao.product.service.delay.TestOrderDelayQueue;
  4. import io.swagger.annotations.Api;
  5. import io.swagger.annotations.ApiOperation;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.web.bind.annotation.PathVariable;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RequestMethod;
  12. import org.springframework.web.bind.annotation.RestController;
  13. import java.util.Random;
  14. /**
  15. * Description:
  16. * User: zhouzhou
  17. * Date: 2019-06-03
  18. * Time: 17:47
  19. */
  20. @RestController
  21. @Api("缓存测试")
  22. @RequestMapping("/redis")
  23. public class RedisTestController {
  24. private Logger logger = LoggerFactory.getLogger(LoggerName.REMOTE);
  25. @Autowired
  26. private TestOrderDelayQueue testOrderDelayQueue;
  27. // ------------------------ 延时队列 -------------------
  28. @ApiOperation("添加定时orderId")
  29. @RequestMapping(value = "/addDelayOrder/{orderId}/{time}", method = RequestMethod.POST)
  30. public Object addZset(@PathVariable String orderId, @PathVariable Integer time) {
  31. boolean flag = testOrderDelayQueue.addJobId(orderId, time);
  32. return String.format("已经存入了订单id{%s},延时{%s}秒", orderId, time);
  33. }
  34. }

5.启动测试

  • 项目启动:日志打印延时机器启动成功

  • 通过swagger向队列插入任务

  • 等待五秒,日志打印,测试成功

 

最后奉上, github代码示例: https://github.com/zjhzzhouzhou/redis-project  希望帮助到大家.

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
  

闽ICP备14008679号