当前位置:   article > 正文

集群定时任务单点控制解决方案(分布式锁+续期锁解决方案)_excutor分布式锁集群定时任务

excutor分布式锁集群定时任务

1. 业务需求描述        

        开发过程中有一个这样的需求: 集群环境中有一些Spring定时任务, 这些定时任务大概每分钟汇总生成一些日志数据, 但是我们只想让集群中其中一台服务器执行, 设计一个高性能,并且高稳定性的执行框架. (具体业务涉及到一些公司内部的商业问题, 因此这里不做过多的详细描述)

2. 设计方案思考

思考问题1:首先集群单点控制, 这显然是一个资源的竞争问题. 集群中控制资源竞争通常使用分布式锁,借助的中间件往往是redis或者是zookeeper。本次开发中我们使用的是redis。

思考问题2:既然是分布式锁,并且是高可靠性,这时候就不得不考虑一些问题:假设集群中某一台机器通过加锁抢到了任务,执行过程中这台机器突然断电宕机了,导致这个锁没能释放,那是不是其它的机器就永远不可能抢到任务了?

思考问题3:开发一个低耦合的执行框架,解耦其他开发人员的代码,首先我们想到的就是自定义注解+Spring AOP编程,即使用AspectJ

思考问题4:使用分布式锁解决方案,加锁肯定要用到时间戳,如果集群环境中多台服务器事件有细微的差别,又该如何控制?

3. 方案实现

3.1 首先Spring定时器已经存在一个注解:@Scheduled, 使用cron表达式可以实现全部机器上的定时任务,现在可考虑在此基础上新定义一个注解:@ScheduledOne 使用AOP编程切入,代码如下:

注解定义:

  1. @Target(ElementType.METHOD)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. public @interface ScheduledOne {
  4. String name() default "";//分布式锁的名字
  5. }

AOP切面:

  1. @Around("@annotation(org.springframework.scheduling.annotation.Scheduled) &&" +
  2. "@annotation(com.xxx.scheduled.annotation.ScheduledOne)")
  3. public Object scheduledOneAround(ProceedingJoinPoint joinPoint) throws Throwable {
  4. //锁的名字要根据注解的方法名生成
  5. String lockName = getLockName(joinPoint);
  6. RedisLockBean lockBean = new RedisLockBean(lockName);
  7. if (redisLockService.tryLock(lockBean)) {
  8. // 第一层加锁:首先根据className+methodName作为key进行加锁,防止当前定时任务还没有执行完毕,下一次开始执行的时间又到来了。
  9. try {
  10. boolean acquireLock = isExec(joinPoint);
  11. //第二层加锁:获取到锁,检查此任务是否已被时间较快的机器执行, 默认设置超时时间为2h, 即:集群中时间最快的机器和时间最慢的机器相差时间不能超过两小时
  12. if (acquireLock) {
  13. //设置锁成功,说明没有被执行过,开始执行,注意这里程序执行完并不释放上面的lock锁,而是设置超时时间24小时,以防其他时间慢的机器再次执行
  14. Object result;
  15. result = joinPoint.proceed();
  16. return result;
  17. }
  18. } finally {
  19. redisLockService.unlock(lockBean);
  20. }
  21. }
  22. return null;
  23. }

tryLock()加锁是根据方法名+类名在redis中进行加锁,这一步可以保证当前时间只有一台机器能够抢到定时任务。回到思考问题4,假设我们的定时任务每分钟执行一次,cron表达式为:"0 * * * * * ?",那么如果有台机器事件较慢,具体例如:第一台机器2021-08-20 14:10:00秒抢到了任务执行了一次,第二台机器事件满了5秒,等第一台机器到了2021-08-20 14:10:05秒时由于没有机器和第二台去竞争,第二台机器很自然的就抢到了任务,并开始执行,这显然是不应该的。因此为了解决这个问题,再代码中isExec()方法,可以将其看成是第二次加锁,代码如下:

  1. //判断是否有时间快的机器已经执行过了,执行过了
  2. private boolean isExec(ProceedingJoinPoint joinPoint) {
  3. String lock = jedisConnectionFactory.jedisExecAndClose(jedis -> jedis.set(getLockKey(joinPoint), getLockValue(), "NX", "PX", timeUnit.toMillis(2)));
  4. return StringUtil.isNotBlank(lock) && lock.equalsIgnoreCase("OK");
  5. }

用于判断是否有事件快的机器已经执行过了,并且这个锁超时事件默认设置成2小时,意思是允许集群中全部机器时间相差最大2小时,这时集群中的任务还能够继续执行。上面还有一个思考问题2没有解决:即如果抢到任务的这台机器宕机了怎么办?

这里我们设计的解决方案是tryLock()加的锁默认超时时间是5秒中,并且获得锁之后将这个锁封装成一个bean, 即LockBean, 加入到本机的redis锁管理类中,这个类会维护一个线程,不断的扫描本机已获得的锁,如果超时时间即将超时(例如离过期仅剩5秒了,当然这个时间可以自定义),那么这个线程就会自动的再为这个锁续期一段时间,具体的事件封装咋LockBean中。当本机的任务正常结束就会将这个lockBean从管理类中移除。如果机器宕机的话,那么续期锁线程自然也就不工作了,redis数据库中这个锁到了一定的超时时间就会自动的释放。

通过上面的案例可以发现有几个注意事项:

1. 首先锁的默认超时时间不要设置太长;

2. @Scheduled注解中只支持cron表达式,精确到某一时间单位的写法,不能使用fixRate来控制执行频率。因为每台机器上web服务启动的事件不一致。

如果大家有什么好的其他的解决方案可以一期共同讨论!(将定时任务抽取出来为一个单独的工程然后单独部署的想法除外,因为这和web业务解耦了)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/331557
推荐阅读
相关标签
  

闽ICP备14008679号