当前位置:   article > 正文

Java通过redis依赖实现延迟队列任务/指定时刻执行任务_java redis zpop脚本

java redis zpop脚本

在java业务开发过程中,经常可能会有这样的需求,我需要在未来的某个时间点执行一个任务,而这个任务是一次性的。又或者是需要动态的创建一个时间线,在某个时间点对应的做某一件事情。而通过定时任务来做的话,很难达到这样的功能,只能通过一个短间隔的定时任务去一直判断当前时间,从而执行某个任务。而如果是需要在未来某一时间点执行某任务的时候,如果仅仅只是使用内存来存这个未来的时间点,则会有进程重启后丢失的风险。这里提供一种通用的时间线任务的实现方案供大家参考

实现思路

最核心的一个实现还是需要依赖一个定时器。这个定时器可以是1秒一次,也可以是100毫秒一次。全局唯一的用于监测时间的一个定时器。然后时间线存储在redis里,通过redis zset结构来存储,key为以服务名或者id生成的固定key,member为指定时间需要执行任务的一些信息,score为时间线的时间戳。这样设计,就可以通过对score范围来拉取需要执行的任务。此外,还需要一个redis Set结构来存储正在执行的任务,并在执行完成后remove成员。这个set可以用于监测是否有执行异常的任务,是否需要自动或者是手动重试。
此外,因为要做成通用的,所以所执行的方法名和参数也不能是固定,由于涉及一些方法,需要在指定实例里运行,比如spring里面的service bean,所以这里设计有两种方案:
1、实例获取使用实例注册的方式,在spring启动后创建bean的时候,通过把bean本身注册到一个自定义的bean中,并在bean里面用map存储实例,需要执行的时候只需要用key去把实例取出来。由于是Object类型的示例,所以需要用反射来获取方法。获取方法这个步骤,也可以在初始化的时候完成并缓存到一个map中,可以提升一些性能。
2、实例不需要注册,在创建时间任务的时候把this参数传入,通过反射获取Method(反射内容其实可以做缓存),并缓存实例和Method。在执行的时候再拿出来。
说了这么多,可能理解的不是很明白,下面直接上伪代码 :

代码实现(上述方案二)

首先我们把这个实现时间线任务的类定义为Timer,下面是Timer的接口

  1. public interface Timer {
  2. /**
  3. * 业务调用的方法,用于创建时间任务
  4. *
  5. * @param obj 执行任务所在方法的示例
  6. * @param method 方法名字
  7. * @param futureTime 需要执行任务的时间点
  8. * @param args 方法的参数,按顺序
  9. */
  10. void runAtFuture(Object obj, String method, Date futureTime, Object... args);
  11. }

接口的实现方法,这里的逻辑,是通过示例和methord还有args,获得并缓存执行任务的示例和方法。并把任务信息和任务执行时间节点插入到redis的zset。

  1. @Override
  2. public void runAt(Object obj, String method, Date at, Object... args) {
  3. log.info("runAt: method {}, at {}, args {}, {}", method, at, args, args.length);
  4. method = getMethodName(obj, method,args);
  5. runAtTypeCheck(method, args);
  6. var timerTask = new TimerTask();
  7. timerTask .setAt(at);
  8. timerTask .setMethod(method);
  9. timerTask .setArgs(args);
  10. timerTask .setUniqueKey(RandomStringUtils.randomAlphanumeric(8));
  11. timerRedisSortedList.zadd(timerDto);
  12. }

脉冲定时任务的时间

  1. @Scheduled(initialDelay = 5000, fixedRate = 100)
  2. public void schedule() {
  3. if (this.handlerMethods.size() == 0) {
  4. log.warn("this.handlerMethods.size() == 0");
  5. return;
  6. }
  7. var now = new Date();
  8. for (int i = 0; i < 10; i++) {
  9. //这里为lua脚本
  10. var timerTasks= timerRedisSortedList.zpop(now, 100);
  11. if (timerTask.size() <= 0) {
  12. return;
  13. }
  14. timerTasks.forEach(timerTask-> {
  15. var uniqueKey = timerTask.getUniqueKey();
  16. var methodOnceKey = String.format(TIMER_UNIQUE_FMT, Context.ActID, uniqueKey);
  17. // 确保不会重复运行
  18. if (redisWrap.done(methodOnceKey, 60)) {
  19. return;
  20. }
  21. // invoke first, then zrem, do not throw exception
  22. try {
  23. //这个invoke里面的逻辑实现可以是通过缓存的实例和方法,使用Method.invoke来执行任务
  24. this.invoke(timerTask.getMethod(),timerTask.getTaskName, timerTask.getArgs());
  25. } catch (Exception ex) {
  26. log.error("timer: {}, catch exception", timerTask, ex);
  27. }
  28. });
  29. }
  30. }

lua脚本参考

  1. local key = KEYS[1]
  2. local items = redis.call('ZRANGEBYSCORE', key, 0, ARGV[1], 'LIMIT', 0, ARGV[2])
  3. for i = 1, table.getn(items) do
  4. redis.call('ZREM', key, items[i])
  5. end
  6. return items

业务调用方式

  1. @Autowired
  2. private Timer timer;
  3. public void xxxxx(){
  4. xxxxxx;
  5. xxxxx;
  6. xxxxx;
  7. timer.runAtFuture(this,"doSomeThing",new Date(xxxxx),new XXX(),new AAA(),new XXXX());
  8. }
  9. pubic void doSomeThing(XXX param1,AAA param2,XXXX param3){
  10. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/484317
推荐阅读
相关标签
  

闽ICP备14008679号