赞
踩
在java业务开发过程中,经常可能会有这样的需求,我需要在未来的某个时间点执行一个任务,而这个任务是一次性的。又或者是需要动态的创建一个时间线,在某个时间点对应的做某一件事情。而通过定时任务来做的话,很难达到这样的功能,只能通过一个短间隔的定时任务去一直判断当前时间,从而执行某个任务。而如果是需要在未来某一时间点执行某任务的时候,如果仅仅只是使用内存来存这个未来的时间点,则会有进程重启后丢失的风险。这里提供一种通用的时间线任务的实现方案供大家参考
最核心的一个实现还是需要依赖一个定时器。这个定时器可以是1秒一次,也可以是100毫秒一次。全局唯一的用于监测时间的一个定时器。然后时间线存储在redis里,通过redis zset结构来存储,key为以服务名或者id生成的固定key,member为指定时间需要执行任务的一些信息,score为时间线的时间戳。这样设计,就可以通过对score范围来拉取需要执行的任务。此外,还需要一个redis Set结构来存储正在执行的任务,并在执行完成后remove成员。这个set可以用于监测是否有执行异常的任务,是否需要自动或者是手动重试。
此外,因为要做成通用的,所以所执行的方法名和参数也不能是固定,由于涉及一些方法,需要在指定实例里运行,比如spring里面的service bean,所以这里设计有两种方案:
1、实例获取使用实例注册的方式,在spring启动后创建bean的时候,通过把bean本身注册到一个自定义的bean中,并在bean里面用map存储实例,需要执行的时候只需要用key去把实例取出来。由于是Object类型的示例,所以需要用反射来获取方法。获取方法这个步骤,也可以在初始化的时候完成并缓存到一个map中,可以提升一些性能。
2、实例不需要注册,在创建时间任务的时候把this参数传入,通过反射获取Method(反射内容其实可以做缓存),并缓存实例和Method。在执行的时候再拿出来。
说了这么多,可能理解的不是很明白,下面直接上伪代码 :
首先我们把这个实现时间线任务的类定义为Timer,下面是Timer的接口
- public interface Timer {
- /**
- * 业务调用的方法,用于创建时间任务
- *
- * @param obj 执行任务所在方法的示例
- * @param method 方法名字
- * @param futureTime 需要执行任务的时间点
- * @param args 方法的参数,按顺序
- */
- void runAtFuture(Object obj, String method, Date futureTime, Object... args);
-
- }
接口的实现方法,这里的逻辑,是通过示例和methord还有args,获得并缓存执行任务的示例和方法。并把任务信息和任务执行时间节点插入到redis的zset。
- @Override
- public void runAt(Object obj, String method, Date at, Object... args) {
- log.info("runAt: method {}, at {}, args {}, {}", method, at, args, args.length);
- method = getMethodName(obj, method,args);
- runAtTypeCheck(method, args);
- var timerTask = new TimerTask();
- timerTask .setAt(at);
- timerTask .setMethod(method);
- timerTask .setArgs(args);
- timerTask .setUniqueKey(RandomStringUtils.randomAlphanumeric(8));
- timerRedisSortedList.zadd(timerDto);
- }
脉冲定时任务的时间
- @Scheduled(initialDelay = 5000, fixedRate = 100)
- public void schedule() {
- if (this.handlerMethods.size() == 0) {
- log.warn("this.handlerMethods.size() == 0");
- return;
- }
-
- var now = new Date();
- for (int i = 0; i < 10; i++) {
- //这里为lua脚本
- var timerTasks= timerRedisSortedList.zpop(now, 100);
- if (timerTask.size() <= 0) {
- return;
- }
- timerTasks.forEach(timerTask-> {
- var uniqueKey = timerTask.getUniqueKey();
- var methodOnceKey = String.format(TIMER_UNIQUE_FMT, Context.ActID, uniqueKey);
- // 确保不会重复运行
- if (redisWrap.done(methodOnceKey, 60)) {
- return;
- }
-
- // invoke first, then zrem, do not throw exception
- try {
- //这个invoke里面的逻辑实现可以是通过缓存的实例和方法,使用Method.invoke来执行任务
- this.invoke(timerTask.getMethod(),timerTask.getTaskName, timerTask.getArgs());
- } catch (Exception ex) {
- log.error("timer: {}, catch exception", timerTask, ex);
- }
- });
- }
- }
lua脚本参考
- local key = KEYS[1]
- local items = redis.call('ZRANGEBYSCORE', key, 0, ARGV[1], 'LIMIT', 0, ARGV[2])
- for i = 1, table.getn(items) do
- redis.call('ZREM', key, items[i])
- end
- return items
业务调用方式
- @Autowired
- private Timer timer;
-
- public void xxxxx(){
- xxxxxx;
- xxxxx;
- xxxxx;
- timer.runAtFuture(this,"doSomeThing",new Date(xxxxx),new XXX(),new AAA(),new XXXX());
- }
- pubic void doSomeThing(XXX param1,AAA param2,XXXX param3){
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。