当前位置:   article > 正文

SpringBoot 整合:Redis延时队列_c.s.redis.delay.handler.delayjobhandler : 扫描delayb

c.s.redis.delay.handler.delayjobhandler : 扫描delaybucket出错:

业务流程

首先我们分析下这个流程

  1. 用户提交任务。首先将任务推送至延迟队列中。

  2. 延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。

  3. 然后生成延迟任务(仅仅包含任务id)放入某个桶中

  4. 时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。

  5. 监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间

  6. 如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容

  7. 消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。

  8. 完成消费后,发送finish消息,服务端根据job id删除对应信息。

对象

我们现在可以了解到中间存在的几个组件

  • 延迟队列,为Redis延迟队列。实现消息传递

  • Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job

  • Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入

  • Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket

  • Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。

其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。

任务状态

  • ready:可执行状态,

  • delay:不可执行状态,等待时钟周期。

  • reserved:已被消费者读取,但没有完成消费。

  • deleted:已被消费完成或者已被删除。

对外提供的接口

额外的内容

  1. 首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。

  2. 根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。

  3. 文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。

  4. 文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。

实现

现在我们根据设计内容完成设计。这一块设计我们分四步完成

任务及相关对象

目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job)

任务对象

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class Job implements Serializable {
  5.     /**
  6.      * 延迟任务的唯一标识,用于检索任务
  7.      */
  8.     @JsonSerialize(using = ToStringSerializer.class)
  9.     private Long id;
  10.     /**
  11.      * 任务类型(具体业务类型)
  12.      */
  13.     private String topic;
  14.     /**
  15.      * 任务的延迟时间
  16.      */
  17.     private long delayTime;
  18.     /**
  19.      * 任务的执行超时时间
  20.      */
  21.     private long ttrTime;
  22.     /**
  23.      * 任务具体的消息内容,用于处理具体业务逻辑用
  24.      */
  25.     private String message;
  26.     /**
  27.      * 重试次数
  28.      */
  29.     private int retryCount;
  30.     /**
  31.      * 任务状态
  32.      */
  33.     private JobStatus status;
  34. }

任务引用对象

  1. @Data
  2. @AllArgsConstructor
  3. public class DelayJob implements Serializable {
  4.     /**
  5.      * 延迟任务的唯一标识
  6.      */
  7.     private long jodId;
  8.     
  9.     /**
  10.      * 任务的执行时间
  11.      */
  12.     private long delayDate;
  13.     /**
  14.      * 任务类型(具体业务类型)
  15.      */
  16.     private String topic;
  17.     public DelayJob(Job job) {
  18.         this.jodId = job.getId();
  19.         this.delayDate = System.currentTimeMillis() + job.getDelayTime();
  20.         this.topic = job.getTopic();
  21.     }
  22.     public DelayJob(Object value, Double score) {
  23.         this.jodId = Long.parseLong(String.valueOf(value));
  24.         this.delayDate = System.currentTimeMillis() + score.longValue();
  25.     }
  26. }

容器

目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器

job任务池,为普通的K/V结构,提供基础的操作

  1. @Component
  2. @Slf4j
  3. public class JobPool {
  4.     
  5.     @Autowired
  6.     private RedisTemplate redisTemplate;
  7.     private String NAME = "job.pool";
  8.     
  9.     private BoundHashOperations getPool () {
  10.         BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
  11.         return ops;
  12.     }
  13.     /**
  14.      * 添加任务
  15.      * @param job
  16.      */
  17.     public void addJob (Job job) {
  18.         log.info("任务池添加任务:{}", JSON.toJSONString(job));
  19.         getPool().put(job.getId(),job);
  20.         return ;
  21.     }
  22.     /**
  23.      * 获得任务
  24.      * @param jobId
  25.      * @return
  26.      */
  27.     public Job getJob(Long jobId) {
  28.         Object o = getPool().get(jobId);
  29.         if (o instanceof Job) {
  30.             return (Job) o;
  31.         }
  32.         return null;
  33.     }
  34.     /**
  35.      * 移除任务
  36.      * @param jobId
  37.      */
  38.     public void removeDelayJob (Long jobId) {
  39.         log.info("任务池移除任务:{}",jobId);
  40.         // 移除任务
  41.         getPool().delete(jobId);
  42.     }
  43. }

延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作

  1. @Slf4j
  2. @Component
  3. public class DelayBucket {
  4.     @Autowired
  5.     private RedisTemplate redisTemplate;
  6.     private static AtomicInteger index = new AtomicInteger(0);
  7.     @Value("${thread.size}")
  8.     private int bucketsSize;
  9.     private List <String> bucketNames = new ArrayList <>();
  10.     @Bean
  11.     public List <String> createBuckets() {
  12.         for (int i = 0; i < bucketsSize; i++) {
  13.             bucketNames.add("bucket" + i);
  14.         }
  15.         return bucketNames;
  16.     }
  17.     /**
  18.      * 获得桶的名称
  19.      * @return
  20.      */
  21.     private String getThisBucketName() {
  22.         int thisIndex = index.addAndGet(1);
  23.         int i1 = thisIndex % bucketsSize;
  24.         return bucketNames.get(i1);
  25.     }
  26.     /**
  27.      * 获得桶集合
  28.      * @param bucketName
  29.      * @return
  30.      */
  31.     private BoundZSetOperations getBucket(String bucketName) {
  32.         return redisTemplate.boundZSetOps(bucketName);
  33.     }
  34.     /**
  35.      * 放入延时任务
  36.      * @param job
  37.      */
  38.     public void addDelayJob(DelayJob job) {
  39.         log.info("添加延迟任务:{}", JSON.toJSONString(job));
  40.         String thisBucketName = getThisBucketName();
  41.         BoundZSetOperations bucket = getBucket(thisBucketName);
  42.         bucket.add(job,job.getDelayDate());
  43.     }
  44.     /**
  45.      * 获得最新的延期任务
  46.      * @return
  47.      */
  48.     public DelayJob getFirstDelayTime(Integer index) {
  49.         String name = bucketNames.get(index);
  50.         BoundZSetOperations bucket = getBucket(name);
  51.         Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(01);
  52.         if (CollectionUtils.isEmpty(set)) {
  53.             return null;
  54.         }
  55.         ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
  56.         Object value = typedTuple.getValue();
  57.         if (value instanceof DelayJob) {
  58.             return (DelayJob) value;
  59.         }
  60.         return null;
  61.     }
  62.     /**
  63.      * 移除延时任务
  64.      * @param index
  65.      * @param delayJob
  66.      */
  67.     public void removeDelayTime(Integer index,DelayJob delayJob) {
  68.         String name = bucketNames.get(index);
  69.         BoundZSetOperations bucket = getBucket(name);
  70.         bucket.remove(delayJob);
  71.     }
  72. }

待完成任务,内部使用topic进行细分,每个topic对应一个list集合

  1. @Component
  2. @Slf4j
  3. public class ReadyQueue {
  4.     @Autowired
  5.     private RedisTemplate redisTemplate;
  6.     private String NAME = "process.queue";
  7.     private String getKey(String topic) {
  8.         return NAME + topic;
  9.     }
  10.     /**
  11.      * 获得队列
  12.      * @param topic
  13.      * @return
  14.      */
  15.     private BoundListOperations getQueue (String topic) {
  16.         BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
  17.         return ops;
  18.     }
  19.     /**
  20.      * 设置任务
  21.      * @param delayJob
  22.      */
  23.     public void pushJob(DelayJob delayJob) {
  24.         log.info("执行队列添加任务:{}",delayJob);
  25.         BoundListOperations listOperations = getQueue(delayJob.getTopic());
  26.         listOperations.leftPush(delayJob);
  27.     }
  28.     /**
  29.      * 移除并获得任务
  30.      * @param topic
  31.      * @return
  32.      */
  33.     public DelayJob popJob(String topic) {
  34.         BoundListOperations listOperations = getQueue(topic);
  35.         Object o = listOperations.leftPop();
  36.         if (o instanceof DelayJob) {
  37.             log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));
  38.             return (DelayJob) o;
  39.         }
  40.         return null;
  41.     }
  42.     
  43. }

轮询处理

设置了线程池为每个bucket设置一个轮询操作

  1. @Component
  2. public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
  3.     @Autowired
  4.     private DelayBucket delayBucket;
  5.     @Autowired
  6.     private JobPool     jobPool;
  7.     @Autowired
  8.     private ReadyQueue  readyQueue;
  9.     
  10.     @Value("${thread.size}")
  11.     private int length;
  12.     
  13.     @Override 
  14.     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
  15.         ExecutorService executorService = new ThreadPoolExecutor(
  16.                 length, 
  17.                 length,
  18.                 0L, TimeUnit.MILLISECONDS,
  19.                 new LinkedBlockingQueue <Runnable>());
  20.         for (int i = 0; i < length; i++) {
  21.             executorService.execute(
  22.                     new DelayJobHandler(
  23.                             delayBucket,
  24.                             jobPool,
  25.                             readyQueue,
  26.                             i));
  27.         }
  28.         
  29.     }
  30. }

测试请求

  1. /**
  2.  * 测试用请求
  3.  * @author daify
  4.  **/
  5. @RestController
  6. @RequestMapping("delay")
  7. public class DelayController {
  8.     
  9.     @Autowired
  10.     private JobService jobService;
  11.     /**
  12.      * 添加
  13.      * @param request
  14.      * @return
  15.      */
  16.     @RequestMapping(value = "add",method = RequestMethod.POST)
  17.     public String addDefJob(Job request) {
  18.         DelayJob delayJob = jobService.addDefJob(request);
  19.         return JSON.toJSONString(delayJob);
  20.     }
  21.     /**
  22.      * 获取
  23.      * @return
  24.      */
  25.     @RequestMapping(value = "pop",method = RequestMethod.GET)
  26.     public String getProcessJob(String topic) {
  27.         Job process = jobService.getProcessJob(topic);
  28.         return JSON.toJSONString(process);
  29.     }
  30.     /**
  31.      * 完成一个执行的任务
  32.      * @param jobId
  33.      * @return
  34.      */
  35.     @RequestMapping(value = "finish",method = RequestMethod.DELETE)
  36.     public String finishJob(Long jobId) {
  37.         jobService.finishJob(jobId);
  38.         return "success";
  39.     }
  40.     @RequestMapping(value = "delete",method = RequestMethod.DELETE)
  41.     public String deleteJob(Long jobId) {
  42.         jobService.deleteJob(jobId);
  43.         return "success";
  44.     }
  45.     
  46. }

测试

添加延迟任务

通过postman请求:localhost:8000/delay/add

此时这条延时任务被添加进了线程池中

  1. 2019-08-12 21:21:36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
  2. 2019-08-12 21:21:36.609  INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}

根据设置10秒钟之后任务会被添加至ReadyQueue中

2019-08-12 21:21:46.744  INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue     : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)

获得任务

这时候我们请求localhost:8000/delay/pop

这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中

  1. 2019-08-09 19:36:02.342  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
  2. 2019-08-09 19:36:02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
  3. 2019-08-09 19:36:02.384  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket    : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}

按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中

  1. 2019-08-12 21:21:48.239  INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue     : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
  2. 2019-08-12 21:21:48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}

任务的删除/消费

现在我们请求:localhost:8000/delay/delete

此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。

  1. 2019-08-12 21:21:54.880  INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool  : 任务池移除任务:3
  2. 2019-08-12 21:21:59.104  INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler  : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/659404
推荐阅读
相关标签
  

闽ICP备14008679号