赞
踩
之前遇到过定时任务异常终止的问题,此次对 jdk 的 ScheduledThreadPoolExecutor
与 spring 的 @Scheduled
进行了测试以及源码的分析。
每秒执行一次,当 count == 3
时抛出异常。
public class JdkTest { private static final ScheduledExecutorService EXECUTOR = new ScheduledThreadPoolExecutor(1); private static final AtomicInteger COUNT = new AtomicInteger(); public static void main(String[] args) throws ExecutionException, InterruptedException { ScheduledFuture<?> scheduledFuture = EXECUTOR.scheduleAtFixedRate(task(), 0, 1, TimeUnit.SECONDS); scheduledFuture.get(); } public static Runnable task() { return () -> { System.out.println(COUNT.get()); if (COUNT.get() == 3) { throw new RuntimeException(); } COUNT.incrementAndGet(); }; } }
@Component
public class SpringTest {
private final AtomicInteger count = new AtomicInteger();
@Scheduled(fixedRate = 1000)
public void test() {
System.out.println(count.get());
if (count.get() == 3) {
throw new RuntimeException();
}
count.incrementAndGet();
}
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 用于表示任务 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, // 计算首次的执行时间 triggerTime(initialDelay, unit), unit.toNanos(period)); // 装饰 task,默认实现为空 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 执行 delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { // 如果线程池正在关闭或已经关闭,则拒绝任务 if (isShutdown()) reject(task); else { // 将任务放入队列 super.getQueue().add(task); // 若加入队列后线程池关闭了,根据设置删除并取消任务 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); // 启动线程 else ensurePrestart(); } } void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 若线程数小于 corePoolSize 则新建核心线程 if (wc < corePoolSize) addWorker(null, true); // 当线程数为 0 时,新建普通线程 else if (wc == 0) addWorker(null, false); }
执行任务便是调用 ScheduledFutureTask#run
方法。
// ScheduledFutureTask 继承了 FutureTask 并实现了 RunnableScheduledFuture 接口 private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {...} // 继承了接口 RunnableFuture、ScheduledFuture,并添加了方法 isPeriodic public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> { // 是否是周期性任务,周期性任务会根据某个计划多次运行。非周期性任务只能运行一次。 boolean isPeriodic(); } public interface ScheduledFuture<V> extends Delayed, Future<V> { } // 以延迟相关的核心接口,实现此接口必须同时实现 Comparable,且 compareTo 方法提供与其 getDelay 方法一致的顺序。 // ScheduledThreadPoolExecutor 的延迟队列是一个最小堆,需要依赖于 compareTo 方法进行比较 public interface Delayed extends Comparable<Delayed> { // 以给定的时间单位返回与此对象关联的剩余延迟。 long getDelay(TimeUnit unit); }
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; // 下次执行的时间 private long time; // 重复任务的周期(纳秒)。正值表示固定频率执行。负值表示固定延迟执行。0表示为非重复任务。 private final long period; // 被 reExecutePeriodic 方法重新入队的实际任务 RunnableScheduledFuture<V> outerTask = this; // 当前任务在延迟队列中的索引,能够更加方便的取消当前任务 int heapIndex; ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } // 用下次执行时间减去当前时间,计算出剩余的延迟时间 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 用于延迟队列的入队、出队,将最小的元素(下一个要执行的任务)放在前面 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; // 如果是 ScheduledFutureTask 的实例,则比较 time,即下次执行时间,若 time 相同再比较序列号 if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } // 是否是周期性任务,即固定频率的,或固定延迟的 // 只有通过 scheduleAtFixedRate、scheduleWithFixedDelay 方法提交的任务 period 才不是 0 // 两个 schedule 方法提交的都是单次执行的任务 public boolean isPeriodic() { return period != 0; } // 设置周期性任务的下次执行时间 private void setNextRunTime() { long p = period; // 固定频率,上次任务的开始时间加上任务的执行周期 if (p > 0) time += p; // 固定延迟,上次任务的结束时间加上任务的执行周期 else time = triggerTime(-p); } // 取消执行 public boolean cancel(boolean mayInterruptIfRunning) { // 先取消任务 boolean cancelled = super.cancel(mayInterruptIfRunning); // 从队列中删除 if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; } // 重写了 FutureTask,如果是周期性任务会重新入队 public void run() { boolean periodic = isPeriodic(); // 若当前线程池状态不能运行任务,则取消 if (!canRunInCurrentRunState(periodic)) cancel(false); // 不是周期性任务,直接执行 FutureTask#run else if (!periodic) ScheduledFutureTask.super.run(); // 周期性任务执行 FutureTask#runAndReset // 且如果 runAndReset 返回 true 的话才设置任务的下次执行时间,并重新入队 // 若返回 false 则什么都不做 else if (ScheduledFutureTask.super.runAndReset()) { // 设置任务的下次执行时间 setNextRunTime(); // 重新入队 reExecutePeriodic(outerTask); } } } // 重新入队 void reExecutePeriodic(RunnableScheduledFuture<?> task) { // 若当前线程池状态可以运行周期性任务,则重新入队 if (canRunInCurrentRunState(true)) { super.getQueue().add(task); // 入队后再次检测状态,若状态不符合,则删除并取消任务 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
执行任务单不设置结果,然后将 Future 重置为初始状态,若任务异常或被取消,则不会重置。本方法被设计为实际执行不止一次的任务。
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; // 默认为 false boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { // 执行任务 c.call(); // don't set result // 状态改为已执行 ran = true; } catch (Throwable ex) { // 若抛出异常则设置异常 setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 任务是否已执行 && 当前状态为 NEW return ran && s == NEW; }
从上面可以看出,若抛出异常,那 ran 最终为 false,runAndReset
方法的返回值也是 false,所以不会将任务重新入队,即任务异常终止了。
其实原因很简单,spring 将任务进行了包装,添加了错误处理器,从日志中可以看出是最终日志是由 TaskUtils$LoggingErrorHandler
打印的,从异常堆栈可以找到抛异常的地方是 DelegatingErrorHandlingRunnable#run
方法:
// 类很简单,只有两个字段,delegate 是任务本身,errorHandler 是错误处理器,重写了 run 方法进行错误处理 public class DelegatingErrorHandlingRunnable implements Runnable { private final Runnable delegate; private final ErrorHandler errorHandler; public DelegatingErrorHandlingRunnable(Runnable delegate, ErrorHandler errorHandler) { Assert.notNull(delegate, "Delegate must not be null"); Assert.notNull(errorHandler, "ErrorHandler must not be null"); this.delegate = delegate; this.errorHandler = errorHandler; } @Override public void run() { // 执行任务 try { this.delegate.run(); } // 若抛出异常由 ErrorHandler 处理 catch (UndeclaredThrowableException ex) { this.errorHandler.handleError(ex.getUndeclaredThrowable()); } catch (Throwable ex) { this.errorHandler.handleError(ex); } } @Override public String toString() { return "DelegatingErrorHandlingRunnable for " + this.delegate; } }
TaskUtils
中 ErrorHandler
有两种实现:LoggingErrorHandler、PropagatingErrorHandler
,重复执行的任务使用 LoggingErrorHandler
,只执行一次的任务使用 PropagatingErrorHandler
:
public abstract class TaskUtils { // 只打印 error 日志,但不执行进一步的处理。这将抑制错误,从而不会阻止任务的后续执行。 public static final ErrorHandler LOG_AND_SUPPRESS_ERROR_HANDLER = new LoggingErrorHandler(); // 打印 error 日志,然后重新抛出异常。注意:这通常会阻止计划任务的后续执行。 public static final ErrorHandler LOG_AND_PROPAGATE_ERROR_HANDLER = new PropagatingErrorHandler(); private static class LoggingErrorHandler implements ErrorHandler { private final Log logger = LogFactory.getLog(LoggingErrorHandler.class); @Override public void handleError(Throwable t) { // 只打印日志 logger.error("Unexpected error occurred in scheduled task", t); } } private static class PropagatingErrorHandler extends LoggingErrorHandler { @Override public void handleError(Throwable t) { // 打印日志 super.handleError(t); // 重写抛出异常 ReflectionUtils.rethrowRuntimeException(t); } } // 使用 ErrorHandler 装饰 Runnable public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler( Runnable task, @Nullable ErrorHandler errorHandler, boolean isRepeatingTask) { if (task instanceof DelegatingErrorHandlingRunnable) { return (DelegatingErrorHandlingRunnable) task; } // 根据是否是重复性任务获取 ErrorHandler ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask)); return new DelegatingErrorHandlingRunnable(task, eh); } // 根据是否是重复性任务获取 ErrorHandler,对于重复任务,它将抑制错误,对于一次性任务,它将传播。在这两种情况下,都记录 error 日志。 public static ErrorHandler getDefaultErrorHandler(boolean isRepeatingTask) { return (isRepeatingTask ? LOG_AND_SUPPRESS_ERROR_HANDLER : LOG_AND_PROPAGATE_ERROR_HANDLER); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。