赞
踩
ScheduledExecutorService继承于ExecutorService,主要提供任务的延迟和周期性执行的功能。其主要提供了schedule,scheduleAtFixedRate,scheduleWithFixedDelay三个方法,分别用于延迟执行任务,以特定频率周期性执行任务,以特定延迟周期性执行任务。
public interface ScheduledExecutorService extends ExecutorService { // 延迟delay时间执行 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); // 固定频率周期性执行 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // 固定延迟周期性执行 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
ScheduledThreadPoolExecutor内部定义了一个内部类ScheduledFutureTask,ScheduledFutureTask继承于FutureTask,由该内部类来对任务进行封装,提供周期性执行的功能。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
}
ScheduledFutureTask的run方法实现如下:主要在runAndReset方法中执行该周期性任务,如果执行成功返回true,则调用setNextRunTime设置下次执行时间;如果该方法返回false,则不再调用setNextRunTime设置下次执行时间,故该周期性任务停止执行。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期性任务执行,返回true则说明此次执行成功,
// 则调用setNextRunTime设置下次执行时间,
// 否则不再设置下次执行时间,故不再执行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
runAndReset的实现:执行任务,在成功执行时,则ran为true,state的值还是NEW,故方法返回值为true。但是如果在执行过程中,即c.call()调用时,出现了异常,则ran设置为false,并在catch块中通过setExeception处理。在setException中将state更新为了EXCEPTIONAL,同时唤醒等待该任务结果的线程,则最终的ran && s == NEW为false,导致runAndReset方法返回false,由上面run的分析可知,该周期任务不再执行。
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { // 执行任务 c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts // 重置任务状态,如果抛了异常,则 // 在setException中已经将state更新为了EXCEPTIONAL s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
setException方法的定义如下:将state更新为EXCEPTIONAL,以便阻塞等待该任务执行结果的线程知道发生了异常,同时调用finishCompletion唤醒这些等待线程。
/** * Causes this future to report an {@link ExecutionException} * with the given throwable as its cause, unless this future has * already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // 将state更新为EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state // 唤醒阻塞等待该任务执行结果的线程 finishCompletion(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。