赞
踩
更多JUC源码解读系列文章请持续关注JUC源码解读文章目录JDK8!
延迟阻塞队列DelayedWorkQueue
中放的元素是ScheduledFutureTask
,提交的任务被包装成ScheduledFutureTask
放进工作队列,Woker
工作线程消费工作队列中的任务,即调用ScheduledFutureTask.run()
,ScheduledFutureTask
又调用任务的run()
,这点和ThreadPoolExecutor
差不多,而ScheduledThreadPoolExecutor
是如何实现按时间调度的呢?
ScheduledThreadPoolExecutor
提交任务的核心函数有3个:
schedule(...)
按一定延迟时长执行任务,只执行一次。scheduleAtFixedRate(...)
按固定频率,周期性执行任务。scheduleWithFixedDelay(...)
按固定延迟时间,受任务执行时长影响,周期性执行任务。首先从3个核心函数出发,其入口源码相似,提交的任务都会先创建一个ScheduledFutureTask
对象,然后调用decorateTask
包装返回RunnableScheduledFuture
对象,最后刚才被包装成RunnableScheduledFuture
对象作为参数调用统一的延迟执行函数delayedExecute()
。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); //将任务包装成RunnableScheduledFuture对象 //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //延迟执行 加延迟阻塞队列+启动一个空的Worker线程 delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, //传入第一次延时时间 now+initialDelay triggerTime(initialDelay, unit), unit.toNanos(period)); //将任务包装成RunnableScheduledFuture对象, //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; //延迟执行 加延迟阻塞队列+启动一个空的Worker线程 delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), //重点! 传入的delay取反了,用delay正负来区分执行间隔是否固定 unit.toNanos(-delay)); //将任务包装成RunnableScheduledFuture对象 //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; //延迟执行 加延迟阻塞队列+启动一个空的Worker线程 delayedExecute(t); return t; }
看decorateTask
源码,其有两个参数,任务原始对象runnable
和把原始任务包装成RunnableScheduledFuture
对象。decorateTask
函数直接返回RunnableScheduledFuture
对象,没有做什么事情,那其意图是什么呢?
decorateTask
是想让开发者继承ScheduledThreadPoolExecutor
实现定制化定时线程池时,可以实现这个函数,对原始任务对象和包装后任务对象做特殊DIY处理。
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
delayedExecute()
是延迟执行和周期性执行的主函数,其基本流程如下:
runstate
为shutdown
将拒绝任务提交。runstate
为shutdown
,再判断是否是周期性任务(isPeriodic
),不同的性质不同的处理策略。Worker
线程,循环从阻塞队列中消费任务。private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { //1、直接加入延时阻塞队列 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //2、预启动一个空的worker ensurePrestart(); } } void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) //创建一个空worker,并且启动 addWorker(null, true); else if (wc == 0) addWorker(null, false); }
可以看出提交的任务最重被包装成ScheduledFutureTask
,然后加到工作队列由Worker
工作线程去消费了。
延迟执行和周期性执行的核心代码也就在于ScheduledFutureTask
。
ScheduledFutureTask
继承了FutureTask
并实现了接口RunnableScheduledFuture
。
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ 任务被调用的执行时间 private long time; /** * Period in nanoseconds for repeating tasks. */ //周期性执行的时间间隔 private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this; /** * Index into delay queue, to support faster cancellation. * 索引到延迟队列为了支持快速取消 */ int heapIndex; /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } }
ScheduledFutureTask
实现了接口Delayed
,所以需要重写两个方法getDelay
、compareTo
。
//获取当前延迟时间(距离下次任务执行还有多久) public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } /** * 比较this 和 other谁先执行 * @param other * @return <=0 this先执行 */ public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } //比较Delay long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
延迟执行和周期执行区别在于period
:
period=0
。period!=0
。AtFixedRate
)周期性执行period>0
,每次开始执行的时间的间隔是固定的,不受任务执行时长影响。WithFixedDelay
)周期性执行period<0
,每次执行的时间受任务执行时长影响,是任务执行结束后的当前时间+ (-p)。public boolean isPeriodic() {
return period != 0;
}
private void setNextRunTime() {
long p = period;
//AtFixedRate 当传入period > 0 时 ,每次执行的时间的间隔是固定的
if (p > 0)
time += p;
else
//WithFixedDelay 当传入period < 0 时,每次执行的时间受任务执行时长影响,是任务执行结束后的当前时间+ (-p)
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
ScheduledFutureTask
还有一个变量heapIndex
,是记录任务在阻塞队列的索引的,其方便支持快速取消任务和删除任务。但是其并不会作为删除任务的位置判断,只是当用于判断惹怒是否在阻塞队列中:heapIndex >= 0
在阻塞队列中,取消任务时需要同时从阻塞队列删除任务;heapIndex < 0
不在阻塞队列中。
阻塞队列DelayedWorkQueue
的每次堆化siftUp()
、siftDown()
,以及remove()
都维护着heapIndex
,想必这也是ScheduledThreadPoolExecutor
自行定制延迟阻塞队列的原因之一。
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
//从延迟阻塞队列中删除任务
remove(this);
return cancelled;
}
ScheduledFutureTask
间接实现了接口Runnable
,其核心逻辑就在run()
:
continueExistingPeriodicTasksAfterShutdown
默认为false,意为调用shutdown()
时,会取消和阻止周期性任务的执行。executeExistingDelayedTasksAfterShutdown
默认为true,意为调用shutdown()
时,不会取消和阻止非周期性任务的执行。public void run() { boolean periodic = isPeriodic(); //当runState为SHUTDOWN时,非周期性任务继续,周期性任务会中断取消 if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) //非周期性任务,只执行一次 ScheduledFutureTask.super.run(); //runAndReset返回false周期性任务将不再执行 else if (ScheduledFutureTask.super.runAndReset()) { //runAndReset() 周期性任务执行并reset //设置下一次执行时间 setNextRunTime(); //把自己再放回延时阻塞队列 reExecutePeriodic(outerTask); } }
代码一开始判断线程池的运行状态canRunInCurrentRunState
,当线程池处于SHUTDOWN状态时,是否是周期性任务有不同的策略:
continueExistingPeriodicTasksAfterShutdown
默认为false,意为线程池被关闭时,应该取消和阻止周期性任务的执行。executeExistingDelayedTasksAfterShutdown
默认为true,意为线程池被关闭时,不会取消和阻止非周期性任务的执行。boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
* False if should cancel non-periodic tasks on shutdown.
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
延迟执行任务,即只执行一次,调用了父类的FutureTask.run()
。提交的任务如果是Runnable
型,会被包装成Callable
型作为FutureTask
的成员变量。FutureTask.run()
中直接调度执行任务的代码call()
,同时返回结果。
需要注意的是,任务代码c.call()
若抛出异常会被FutureTask
捕获处理,这样对外查找问题不利,所以最好在任务run()或者call()的核心代码用try-catch包起来。
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //调用callable的call,并设置返回值 //如果传进来的任务是Runnable,会被转换成callable result = c.call(); //若运行异常,ran=false,异常会被捕获处理 //所以传进来的任务的run或者call代码块最好try-catch下 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
从源码看出,周期性执行任务没有返回值,FutureTask.runAndReset
最终返回布尔值,并且也会捕获任务代码异常,最终返回true代表代码没有出现异常,下次可以正常执行,false代表任务代码中有异常,下次不能正常执行。
所以特别强调任务代码必须要try-catch
,否则一旦出现异常,周期性执行将不会再设置下次执行时间和把自己放回延迟阻塞队列。
还需要注意的是runAndReset
和单次run
的一个比较不容易注意的区别,runAndReset
的Future
的正常状态会一直是NEW
,即可通过java.util.concurrent.FutureTask#isDone
判断周期性任务是否还在正常运行。
protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result //如果c.call抛异常,将会被处理,但是没有打印堆栈,使用者不易排查 // 不会再往下执行ran=false //所以传进来的任务run里需要自己try-catch ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
还记得ThreadPoolExecutor.shutdown()
代码里一个空的钩子函数onShutdown()
吗?
ScheduledThreadPoolExecutor
的shutdown()
和shutdownNow()
都是调用了父类ThreadPoolExecutor
的函数,但是实现了onShutdown()
。
回顾当调用shutdownNow()
时,会清空工作队列并中断所有工作线程,这样正在执行的任务也会中断,可额外做的事情几乎没有,所以官方也没有给开发者留钩子函数。
而调用shutdown()
,不会清空工作线程也不会中断正在执行任务的工作线程,对于ScheduledThreadPoolExecutor
有周期性任务,会往复重置执行,如果不额外做些处理就使得即使调用了shutdown()
也不会销毁线程池。
onShutdown()
对于任务性质不同有不同处理策略:
continueExistingPeriodicTasksAfterShutdown
默认为false,意为调用shutdown()
时,会取消和阻止周期性任务的执行。executeExistingDelayedTasksAfterShutdown
默认为true,意为调用shutdown()
时,不会取消和阻止非周期性任务的执行。/** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. Invoked within super.shutdown. */ @Override void onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); //Shutdown后是否保持延时, 默认true boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); //Shutdown后是否保持周期, 默认false boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); //取消并清空队列 } else { // Traverse snapshot to avoid iterator exceptions for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } //尝试中断线程池 tryTerminate(); }
通过阅读源码,知道了一些需要注意的细节,一不小心就会踩坑:
try-catch
,否则异常被ScheduledFutureTask
的父类FutureTask
捕获处理,难以排查问题,同时周期性执行任务会因为任务代码抛异常而不再设置下次执行时间和把自己放回延迟队列的操作,即不会再周期性执行。ScheduledFutureTask
通过一个变量就区分了延迟和周期性执行,period=0
延迟执行,即只执行一次;period>0
固定频率周期执行;``period<0`固定延迟时间周期执行,两次任务开始执行时间间隔受任务执行耗时影响。period
,且看重执行等间隔,使用scheduleWithFixedDelay()
。period
,则可以使用scheduleAtFixedRate()
。shutdown
),周期性任务会被取消和阻止执行,非周期性任务会顺利执行完成不会被阻止。PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。