当前位置:   article > 正文

定时任务异常终止 spring VS JDK_异常结束的任务

异常结束的任务

之前遇到过定时任务异常终止的问题,此次对 jdk 的 ScheduledThreadPoolExecutor 与 spring 的 @Scheduled 进行了测试以及源码的分析。

测试

每秒执行一次,当 count == 3 时抛出异常。

JdkTest

public class JdkTest {

    private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(1);

    private static final AtomicInteger COUNT = new AtomicInteger();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledFuture<?> scheduledFuture = EXECUTOR.scheduleAtFixedRate(task(), 0, 1, TimeUnit.SECONDS);
        scheduledFuture.get();
    }

    public static Runnable task() {
        return () -> {
            System.out.println(COUNT.get());
            if (COUNT.get() == 3) {
                throw new RuntimeException();
            }
            COUNT.incrementAndGet();
        };
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

JdkTest 结果

SpringTest

@Component
public class SpringTest {

    private final AtomicInteger count = new AtomicInteger();

    @Scheduled(fixedRate = 1000)
    public void test() {
        System.out.println(count.get());
        if (count.get() == 3) {
            throw new RuntimeException();
        }
        count.incrementAndGet();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

SpringTest 结果

测试结果

  • JdkTest:在抛出异常后任务就终止了
  • SpringTest:抛出异常后会继续执行,并且是立即执行重试,而不是 1 秒后

源码解析

为什么 JdkTest 抛出异常后就终止了?

提交任务,任务入队

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 用于表示任务
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      // 计算首次的执行时间
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    // 装饰 task,默认实现为空
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 执行
    delayedExecute(t);
    return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果线程池正在关闭或已经关闭,则拒绝任务
    if (isShutdown())
        reject(task);
    else {
        // 将任务放入队列
        super.getQueue().add(task);
        // 若加入队列后线程池关闭了,根据设置删除并取消任务
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        // 启动线程
        else
            ensurePrestart();
    }
}

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // 若线程数小于 corePoolSize 则新建核心线程
    if (wc < corePoolSize)
        addWorker(null, true);
    // 当线程数为 0 时,新建普通线程
    else if (wc == 0)
        addWorker(null, false);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

工作线程从队列中获取任务,进而执行任务

执行任务便是调用 ScheduledFutureTask#run 方法。

// ScheduledFutureTask 继承了 FutureTask 并实现了 RunnableScheduledFuture 接口
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {...}

// 继承了接口 RunnableFuture、ScheduledFuture,并添加了方法 isPeriodic
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    // 是否是周期性任务,周期性任务会根据某个计划多次运行。非周期性任务只能运行一次。
    boolean isPeriodic();
}

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

// 以延迟相关的核心接口,实现此接口必须同时实现 Comparable,且 compareTo 方法提供与其 getDelay 方法一致的顺序。
// ScheduledThreadPoolExecutor 的延迟队列是一个最小堆,需要依赖于 compareTo 方法进行比较
public interface Delayed extends Comparable<Delayed> {
	// 以给定的时间单位返回与此对象关联的剩余延迟。
    long getDelay(TimeUnit unit);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    /** Sequence number to break ties FIFO */
    private final long sequenceNumber;

    // 下次执行的时间
    private long time;

	// 重复任务的周期(纳秒)。正值表示固定频率执行。负值表示固定延迟执行。0表示为非重复任务。
    private final long period;

    // 被 reExecutePeriodic 方法重新入队的实际任务
    RunnableScheduledFuture<V> outerTask = this;

    // 当前任务在延迟队列中的索引,能够更加方便的取消当前任务
    int heapIndex;

    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Callable<V> callable, long ns) {
        super(callable);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    // 用下次执行时间减去当前时间,计算出剩余的延迟时间
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
    }

    // 用于延迟队列的入队、出队,将最小的元素(下一个要执行的任务)放在前面
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        // 如果是 ScheduledFutureTask 的实例,则比较 time,即下次执行时间,若 time 相同再比较序列号
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }


    // 是否是周期性任务,即固定频率的,或固定延迟的
    // 只有通过 scheduleAtFixedRate、scheduleWithFixedDelay 方法提交的任务 period 才不是 0 
    // 两个 schedule 方法提交的都是单次执行的任务
    public boolean isPeriodic() {
        return period != 0;
    }

    // 设置周期性任务的下次执行时间
    private void setNextRunTime() {
        long p = period;
        // 固定频率,上次任务的开始时间加上任务的执行周期
        if (p > 0)
            time += p;
        // 固定延迟,上次任务的结束时间加上任务的执行周期
        else
            time = triggerTime(-p);
    }

    // 取消执行
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 先取消任务
        boolean cancelled = super.cancel(mayInterruptIfRunning);
        // 从队列中删除
        if (cancelled && removeOnCancel && heapIndex >= 0)
            remove(this);
        return cancelled;
    }

    // 重写了 FutureTask,如果是周期性任务会重新入队
    public void run() {
        boolean periodic = isPeriodic();
        // 若当前线程池状态不能运行任务,则取消
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        // 不是周期性任务,直接执行 FutureTask#run
        else if (!periodic)
            ScheduledFutureTask.super.run();
        // 周期性任务执行 FutureTask#runAndReset
        // 且如果 runAndReset 返回 true 的话才设置任务的下次执行时间,并重新入队
        // 若返回 false 则什么都不做
        else if (ScheduledFutureTask.super.runAndReset()) {
            // 设置任务的下次执行时间
            setNextRunTime();
            // 重新入队
            reExecutePeriodic(outerTask);
        }
    }
}

// 重新入队
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    // 若当前线程池状态可以运行周期性任务,则重新入队
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        // 入队后再次检测状态,若状态不符合,则删除并取消任务
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127

任务异常终止的罪魁祸首:FutureTask#runAndReset

执行任务单不设置结果,然后将 Future 重置为初始状态,若任务异常或被取消,则不会重置。本方法被设计为实际执行不止一次的任务。

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    // 默认为 false
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 执行任务
                c.call(); // don't set result
                // 状态改为已执行
                ran = true;
            } catch (Throwable ex) {
                // 若抛出异常则设置异常
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    // 任务是否已执行 && 当前状态为 NEW
    return ran && s == NEW;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

从上面可以看出,若抛出异常,那 ran 最终为 false,runAndReset 方法的返回值也是 false,所以不会将任务重新入队,即任务异常终止了。

为什么 SpringTest 抛出异常后没有终止?

其实原因很简单,spring 将任务进行了包装,添加了错误处理器,从日志中可以看出是最终日志是由 TaskUtils$LoggingErrorHandler 打印的,从异常堆栈可以找到抛异常的地方是 DelegatingErrorHandlingRunnable#run 方法:

// 类很简单,只有两个字段,delegate 是任务本身,errorHandler 是错误处理器,重写了 run 方法进行错误处理
public class DelegatingErrorHandlingRunnable implements Runnable {

   private final Runnable delegate;

   private final ErrorHandler errorHandler;

   public DelegatingErrorHandlingRunnable(Runnable delegate, ErrorHandler errorHandler) {
      Assert.notNull(delegate, "Delegate must not be null");
      Assert.notNull(errorHandler, "ErrorHandler must not be null");
      this.delegate = delegate;
      this.errorHandler = errorHandler;
   }

   @Override
   public void run() {
      // 执行任务
      try {
         this.delegate.run();
      }
      // 若抛出异常由 ErrorHandler 处理
      catch (UndeclaredThrowableException ex) {
         this.errorHandler.handleError(ex.getUndeclaredThrowable());
      }
      catch (Throwable ex) {
         this.errorHandler.handleError(ex);
      }
   }

   @Override
   public String toString() {
      return "DelegatingErrorHandlingRunnable for " + this.delegate;
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

TaskUtilsErrorHandler 有两种实现:LoggingErrorHandler、PropagatingErrorHandler,重复执行的任务使用 LoggingErrorHandler,只执行一次的任务使用 PropagatingErrorHandler

public abstract class TaskUtils {

    // 只打印 error 日志,但不执行进一步的处理。这将抑制错误,从而不会阻止任务的后续执行。
	public static final ErrorHandler LOG_AND_SUPPRESS_ERROR_HANDLER = new LoggingErrorHandler();

    // 打印 error 日志,然后重新抛出异常。注意:这通常会阻止计划任务的后续执行。
	public static final ErrorHandler LOG_AND_PROPAGATE_ERROR_HANDLER = new PropagatingErrorHandler();

	private static class LoggingErrorHandler implements ErrorHandler {
		private final Log logger = LogFactory.getLog(LoggingErrorHandler.class);

		@Override
		public void handleError(Throwable t) {
            // 只打印日志
			logger.error("Unexpected error occurred in scheduled task", t);
		}
	}

	private static class PropagatingErrorHandler extends LoggingErrorHandler {
		@Override
		public void handleError(Throwable t) {
            // 打印日志
			super.handleError(t);
            // 重写抛出异常
			ReflectionUtils.rethrowRuntimeException(t);
		}
	}
    
    // 使用 ErrorHandler 装饰 Runnable
	public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler(
			Runnable task, @Nullable ErrorHandler errorHandler, boolean isRepeatingTask) {

		if (task instanceof DelegatingErrorHandlingRunnable) {
			return (DelegatingErrorHandlingRunnable) task;
		}
        // 根据是否是重复性任务获取 ErrorHandler
		ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask));
		return new DelegatingErrorHandlingRunnable(task, eh);
	}
	
    // 根据是否是重复性任务获取 ErrorHandler,对于重复任务,它将抑制错误,对于一次性任务,它将传播。在这两种情况下,都记录 error 日志。
	public static ErrorHandler getDefaultErrorHandler(boolean isRepeatingTask) {
		return (isRepeatingTask ? LOG_AND_SUPPRESS_ERROR_HANDLER : LOG_AND_PROPAGATE_ERROR_HANDLER);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/83595
推荐阅读
相关标签
  

闽ICP备14008679号