赞
踩
本文和ThreadPoolExecutor 分析的文章结构一样。
在分析之前,建议先了解下面的几篇文章
因为下面设计到的有些的内容,会部分内容是和这里面的是重复的,所以,就没在详细的说。
从下面的图可以看出,根就是Executor
接口,下面就挨个来看看这些接口和抽象类干了什么样的事情。
主要的实现是ScheduledThreadPoolExecutor
,从图中可以看到,左边红线部分的相关逻辑已经在ThreadPoolExecutor 分析分析过了,重点有右边的ScheduleExecutorService
。
继承于ExecutorService,并且这个接口可执行定时的任务,包括延迟启动,周期调用。
schedule 方法可以创建各种延迟的任务并且返回一个Future对象,这个Future对象可用于取消或者检查执行, scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建和执行定期运行直到被取消。命令的提交是通过Executor#execute(Runnable)
和ExecutorService#submit方法
,0和负数代表立即执行。所有的方法都可以接受一个相对时间(延迟或者周期时间)。在Executors
中提供了便捷的方法来创建。
命令真正的执行还是交给Executor#execute(Runnable)
public interface ScheduledExecutorService extends ExecutorService { // 延迟时间,只执行一次 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); // 在执行的延迟时间之后,创建并且启用一个周期性的task,也就是说在initialDelay后,任务开始启动,下次执行之间是initialDelay+period,接下来是 initialDelay+period * 2, // 如果当前执行任务发送了任何的异常,这个任务就会后序的执行就会被抑制。(也就是不执行) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
从上面的图可以知道,他是继承于ThreadPoolExecutor,在构造函数的时候,无一例外都调用到了父类。上面几个方法都大同小异,线程池主要就先看corePoolSize和maximumPoolSize和BlockingQueue,这里就挑一个就好了。
从构造函数可以看到,在ScheduledThreadPoolExecutor
中corePoolSize是可以设置的,maximumPoolSize是Integer.MAX_VALUE。BlockingQueue是DelayedWorkQueue(这个是一个内部类,类似于DelayedQueue和PriorityQueue)。keepAliveTime是0
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
/** * 当线程池状态变为shutDown之后,是否还要继续执行周期性的任务 */ private volatile boolean continueExistingPeriodicTasksAfterShutdown; /** * 当线程池状态变为shutDown之后,是否还要继续执行delay(延迟)的任务 */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** true:如果cancel方法需要将task从queue移除。 * True if ScheduledFutureTask.cancel should remove from queue */ private volatile boolean removeOnCancel = false; // 原子Long型,用他来表示任务的顺序(添加的顺序) private static final AtomicLong sequencer = new AtomicLong();
一开始空指针检查,并且将传递进来的command,装饰变为RunnableScheduledFuture,关于RunnableScheduledFuture的具体分析在下面。decorateTask方法是一个受保护的方法,留给子类拓展的,这里只是将ScheduledFutureTask返回了。
将封装好的RunnableScheduledFuture入任务队列,并且如果线程数是小于coolPollSize,或者线程数是0,就会调用addWorker方法。
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
// 空指针检查
if (command == null || unit == null)
throw new NullPointerException();
// 将command装饰为一个RunnableScheduledFuture,
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 将Future执行。这方法是很重要的。
delayedExecute(t);
return t;
}
实际就是将task添加到队列里面
主要是执行延迟或者周期性的任务,如果pool关闭,拒绝任务,否则就添加到taskQueue里面去,并且在添加之后还会在判断一次。
这里的判断是为了再次检验,如果线程池的转改变为shutDown(也就是说,在提交之前线程池的状态没有变为shutDown,提交了任务之后,这一瞬间,线程池的状态变为shutDown了),并且判断在当前的线程池的状态下面,当前任务是否还能运行,如果不能运行,就从taskQueue中移除任务,并且将task的状态变为CANCELLED
,如果Future取消成功,并且removeOnCancel(表示future取消之后是否要从taskQueue中移除)为true,并且headIndex>0,这些条件满足就会从taskQueue出移除this。
ensurePrestart方法就是启动线程,并且这个方法和prestartCoreThread
差不多。
问题
在task.cacnel之前,已经remove掉过了,task.cacnel的remove有什么意义。
首先,这个task是继承与FutaskTask的子类,代码如下
public boolean cancel(boolean mayInterruptIfRunning) { // 调用父类的 cancel方法, boolean cancelled = super.cancel(mayInterruptIfRunning); // 自己的逻辑处理,我感觉没有什么意义,这里的remove。和前面的remove都是一样的。感觉没有啥意义。 if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
调用了FutureTask的remove方法,将Future的状态改为
CANCELLED
或者INTERRUPTING
,如果mayInterruptIfRunning的话,可能还会中断当前执行的线程。但是这里的remove操作,确实和之前的操作重复了,我没有看懂这里是什么逻辑
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
// 拒绝策略
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
// 这里的remove方法和cancel中的remove方法重复了。
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
在运行或者SHUTDOWN的时候,判断当前的task是否可以运行,
periodic表示当前的task是否是一个周期性的任务
continueExistingPeriodicTasksAfterShutdown 表示,当线程池的状态变为Shutdown,是否继续运行当前存在的周期性的任务,默认是false。
executeExistingDelayedTasksAfterShutdown表示,当线程池的状态变为Shutdown,是否继续运行当前存在的延迟性的任务,默认是true。
通过上面两个,可以知道,在默认的情况下,ScheduledThreadPoolExecutor在调用了shutDown方法之后,也就是线程池的状态变为Shutdown之后,默认会继续执行已经存在的delay(延迟任务)
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
检查必要线程是否需要创建
启动一个线程,代码很简单就不说了。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
从上面可以看出,几个比较重点的地方
带着这几个主要的问题,一起往下面看看
固定频率
一开始还是非空检查,将提交的runnable还是变为ScheduledFutureTask,这里的ScheduledFutureTask和上面的schedule是一个对象,就是参数不一样,decorateTask也是一样,重点是创建ScheduledFutureTask的时候多了几个构造参数,这个在下面说。
还是一样,调用decorateTask,让子类装饰一下,默认还是传递进去的ScheduledFutureTask。多了一个设置outerTask的步骤。outerTask表示实际真正的重新入queue的任务。默认就是this。
最后还是调用delayedExecute方法,作用在上面已经说了。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); // ScheduledFutureTask的outerTask表示,实际真正的重新入queue的任务。默认就是this, // 那如果子类重写了decorateTask方法,返回了一个自定义的RunnableScheduledFuture,再次入队就是自定义的了。 sft.outerTask = t; // 还是这个方法 delayedExecute(t); return t; }
固定间隔
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
重写了这几个方法,就表示,不能像之前那样来提交任务了。可以看到,就是复用了schedule方法。
提交的任务是一个delay为0的任务。
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
得到下次触发的时间
其实就是当前时间+时间差
问题
为啥要判断delay < Long.MAX_VALUE >> 1?
防止溢出。这样的话就能有效的避免 now + delay导致溢出的情况。
overflowFree方法里面拿到堆头元素,然后比较,最后的一个 Long.MAX_VALUE + headDelay 是什么想法?
首先要知道,进去到overflowFree是因为delay 大于等于Long.MAX_VALUE >> 1。也就是 Long.MAX_VALUE的一半。
拿到堆头元素,如果delay都比headDelay小,也就是headDelay的值大。delay = Long.MAX_VALUE + headDelay。这样做事为什么?是因为栈溢出,就会导致值变小?这里我不太明白。并且如果堆头元素要是比delay小。那这个方法的返回值还原来的值,也有可能造成位溢出。
//
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
// 就是now+delay
// 如果delay 小于Long.MAX_VALUE >> 1。就是delay,否则就是overflowFree。
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
// 拿到堆头元素,如果堆头元素小于0,并且 delay - headDelay < 0。delay就等于Long.MAX_VALUE + headDelay。
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
// 这个操作不就移除了嘛。难道溢出是正确的
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
和这个方法类似的还要一个setExecuteExistingDelayedTasksAfterShutdownPolicy
方法分析
在正常的设置值之后,还会判断线程池的状态,调用onShutdown方法。线程池的状态的判断就是ctl。
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
continueExistingPeriodicTasksAfterShutdown = value;
// 如果为不允许,并且当前线程池已经关闭了,就会调用onShutdown
if (!value && isShutdown())
onShutdown();
}
重新执行周期性的任务
先判断,线程池在当前的状态下,周期性的任务是否能继续运行,可以就将这个任务重新添加到队列里面。在添加完之后,在判断线程池在当前的状态下是否能继续运行周期性的任务,如果不能,调用移除掉这个任务,在中断task的执行。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 判断当前线程池的状态
if (canRunInCurrentRunState(true)) {
// 添加task
super.getQueue().add(task);
// 如果不能运行,就remove掉 这个remove操作和ScheduledFutureTask的cancel重复了。
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 保障必须的线程是否在运行。
ensurePrestart();
}
}
onShutdown方法有下面的几个调用的地方
都是在shutdown的语义下面调用的。
拿到两个策略,看在shutDown的状态下面是否需要运行。判断任务类型,看情况是否要终止。
@Override void onShutdown() { // 拿到队列 BlockingQueue<Runnable> q = super.getQueue(); // 是要要继续执行delay任务 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // 是否要继续执行周期性的任务 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 如果两个不需要运行,遍历数组,直接关闭 if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { // RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; // 是否是周期性task,再看看是否要关闭,满足直接remove,并且cacel if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } tryTerminate(); }
DelayedWorkQueue是基于堆结构的,和DelayQueue,PriorityQueue有点像,堆存储ScheduledFutureTask,在RunnableScheduledFutures(里面存储该元素在堆中数组下标的位置),降低了在取消任务时候的查找时间复杂度。提高了remova方法速度,减少垃圾回收保留。
// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 像DelayQueue和PriorityQueue一样,堆的实现基本都是数组,所以这里就是数组啦。一个RunnableScheduledFuture的数组。
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// 这里也应用了Leader-Follower。
private Thread leader = null;
// 这个主要是是用来做leader-follower的。在之前的PriorityQueue里面已经做了分析
private final Condition available = lock.newCondition();
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
从队列的头部添加元素,在PriorityQueue有类似的方法。基本作用都是一样的。和PriorityQueue不一样的一个地方在于多了一个 setIndex(e, k);
方法。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 找到当前要插入位置的父节点
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
从队列的尾部添加元素,在PriorityQueue有类似的方法。基本作用都是一样的。和PriorityQueue不一样的地方也是在于这个方法多了一个 setIndex(e, k);
方法。记录节点在数组中的下标。
private void siftDown(int k, RunnableScheduledFuture<?> key) { // >>>是逻辑右移动(无符号移动,高位补符号位) int half = size >>> 1; while (k < half) { // 根节点和子节点的关系。, // (父节点)queue[n] (左子节点)queue[2*n+1] (右子节点) queue[2*(n+1)]. int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
private void grow() {
int oldCapacity = queue.length;
// 扩容现在的1.5倍,(1+1/2)
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
/// int移除了
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
//还是调用的Arrays.copyOf
queue = Arrays.copyOf(queue, newCapacity);
}
入堆方法和之前PriorityQueue#offer方法一致。这里就不在说了
问题
当前元素放进去是最小的。为什么要将leader变为null。唤醒Follower?
按照之前在take方法的时候,拿到堆头元素,如果时间没有到就会有一个等待的操作,在这样的前提下面,如果当前存入的元素是堆中最小的,说明,那就不能让之前等待的元素返回。所以,这里就要打断他的等待机制,让leader变为null。其他的follower看leader变为null。就能操作了,之前等待的线程就无效
//入队 public boolean offer(Runnable x) { // 空指针检查 if (x == null) throw new NullPointerException(); // 强制类型转换 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; //扩容 if (i >= queue.length) grow(); // size ++ size = i + 1; // 如果没有,就将当前的元素变为堆的第一个。并且设置他的heapIndex if (i == 0) { queue[0] = e; setIndex(e, 0); } else { //入堆了 siftUp(i, e); } if (queue[0] == e) { // 如果放进去的元素就是堆中最小的。就将leader变为null,等待的Follower开始竞争锁。 leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
通用的方法,再出堆的时候,将要出去的元素的heapIndex变为-1,将堆中最后一个位置的元素重新从堆头插入,这样就能引起整个堆的重新调整。
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
//--size。
int s = --size;
// 拿到堆里的最后一个元素
RunnableScheduledFuture<?> x = queue[s];
// 消除引用关系
queue[s] = null;
// 如果堆中还有元素
if (s != 0)
// 将堆中的最后一个元素放在堆头。这样的话,就能引起整个堆的重新调整。
siftDown(0, x);
// 将要出堆的元素的heapIndex变为-1。表示这个元素已经不在堆里面弄了。
setIndex(f, -1);
return f;
}
问题
这里为啥不等待?
poll是Queue接口的方法,这不是一个等待的接口,所以,不需要。
而
poll(long timeout, TimeUnit unit)、E take() throws InterruptedException;、这些都是属于 BlockingQueue
接口,这接口是一个等待接口。所以,下面看到的take方法和pol带超时时间的方法和这里的差别很大。
public RunnableScheduledFuture<?> poll() { // 获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { //拿到堆头元素。 RunnableScheduledFuture<?> first = queue[0]; // 堆头没有元素或者堆头元素还没到时间,直接返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else // 调用上面分析的方法 return finishPoll(first); } finally { lock.unlock(); } }
阻塞等待,获取元素
主体的操作和DelayQueue
的take方法差不多。这里就不再说了。建议看了DelayQueue的分析的文章之后再来看。
问题
finally里面的代码,在await之后还会执行吗?
不会执行。只有在await被唤醒之后,代码正常走的时候才会执行
看下面的列子
@Test public void testWaitFinally() throws InterruptedException, IOException { AtomicBoolean flag = new AtomicBoolean(false); ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(()->{ lock.lock(); try { if(!flag.get()) { logMessage("wait"); condition.await(); } logMessage("begin work"); } catch (InterruptedException e) { logMessage(e); } finally { logMessage("release lock"); lock.unlock(); } }).start(); TimeUnit.SECONDS.sleep(5); logMessage("start set flag thread"); new Thread(()->{ lock.lock(); try { flag.set(true); condition.signalAll(); logMessage("set flag success"); }finally { lock.unlock(); } }).start(); System.in.read(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
wait的时候线程都被park住了,finally里面的代码肯定就不能执行的啦。只有线程正常了之后,才能执行啦。
public RunnableScheduledFuture<?> take() throws InterruptedException { //获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //拿到堆头元素 RunnableScheduledFuture<?> first = queue[0]; // 如果没有,就说明整个堆就没有值。只能等待,注意,现在这个时候还没有 Leader-Follower。 if (first == null) available.await(); else { // 拿到堆头元素, long delay = first.getDelay(NANOSECONDS); // 已经到期了,直接返回就行,不用等待 if (delay <= 0) return finishPoll(first); first = null; // 到这里,就说明堆头元素还没有过期。并且还有leader,那么Follower就等待吧。 if (leader != null) available.await(); else { // 没有leader,当前线程变为leader。在mainLock关联的Condition上面等待。 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 注意,这里和下面的poll带超时时间的不一样,只能等待delay的时间。注意这里和下面的poll做区别 available.awaitNanos(delay); } finally { // 这个finally,在上面的等待之后,会执行吗? // 如果在等待的时候,堆头的元素没有变化,当前线程的等待是有用的。 // 直接把leader变为null了。别的Follower就能继续了 if (leader == thisThread) leader = null; } } } } } finally { // 如果leader变为null。并且堆头还有元素,唤醒Follower继续 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
带超时的等待获取操作,基本和take差不多。多了一个等待超时的操作
问题
if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos);
- 1
- 2
nacos 为啥要小于delay,大于不行吗?
因为nacos代表的语义是等待超时时间,delay代表的语义是元素到期时间。超时时间都比到期时间小了,肯定等于超时时间,如果等于过期时间的话,poll带超时时间的这个功能不就么得了嘛。
long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft;
- 1
- 2
这里的操作有什么作用?
防止虚假唤醒,timeLeft表示剩下的时间,那么下一次的超时时间就应该是
原来的超时时间-delay-timeLeft
什么虚假唤醒?
因为wait notify的时候是随机的,就有可能出现一种情况。线程不满足条件刚刚wait,又被notify。如果处理不当,超时时间就不精确了。
一般是类似于下面的代码来做处理的。死循环加等待时间的调整。
long waitTime = 0 // 超时时间。 long expectTime = System.currentTimeMillis() + waitTime; for (;;){ wait(waitTime); waitTime = expectTime-System.currentTimeMillis(); if(waitTime <=0) { // 超时 } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { // 转换时间单位,超时时间 long nanos = unit.toNanos(timeout); // 获取mainLock final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 拿到堆头元素 RunnableScheduledFuture<?> first = queue[0]; // 堆头元素没有,看超时时间,如果有超时时间,当前线程就等待 超时时间。等时间到了之后,重新获取锁,在继续拿到堆头元素。 // 如果下次来的时候,正好也没有堆头元素,那他还会再次等待这么长时间的。 if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { // 拿到delay,小于0就直接出堆 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); // 如果还没有到期,但是超时时间到了,就直接返回null if (nanos <= 0) return null; first = null; // 超时时间小于 到期时间 或者 现在是有leader的。就等待超时时间。 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { // 还是和上面一样的操作, Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 注意这里,timeLeft表示delay-available等待的时间,也就是剩下的等待时间 // 小于或者等于0.表示到期了。 long timeLeft = available.awaitNanos(delay); // 剩下的等待时间=等待时间-到期时间-剩下的等待时间。 // 会继续走下次的循环。再次走上面的逻辑。 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } // 这里的逻辑和上面的逻辑都一样的。 } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
将堆变为null,并且将RunnableScheduledFuture的heapIndex变为-1
public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = 0; i < size; i++) { RunnableScheduledFuture<?> t = queue[i]; if (t != null) { queue[i] = null; setIndex(t, -1); } } size = 0; } finally { lock.unlock(); } }
看堆头元素是否过期
拿到堆头的元素,看超时时间是否大于0,大于0就返回null,如果小于等于0,返回堆头元素。
private RunnableScheduledFuture<?> peekExpired() {
RunnableScheduledFuture<?> first = queue[0];
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
public int drainTo(Collection<? super Runnable> c, int maxElements)
方法和这个方法基本一样,就是在下面的peekExpired
的if里面增加了对于maxElements的判断(限制了允许的操作的元素的数量)。
和别的队列的drainTo方法不一样。在collection里面添加的元素,是已经到期的元素。
public int drainTo(Collection<? super Runnable> c) { // 安全检查 if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); // 获取mainLock final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first; int n = 0; // 不断的拿到堆头元素,如果已经过期,就添加到传递进来的集合里面。 // n表示 满足的元素的数量 while ((first = peekExpired()) != null) { c.add(first); finishPoll(first); ++n; } return n; } finally { lock.unlock(); } }
这里的迭代器是很简单的, 就是遍历数组就好了。注意,这里没有快速失败的机制。
关于关于失败,可以看ArrayList分析
private class Itr implements Iterator<Runnable> { final RunnableScheduledFuture<?>[] array; int cursor = 0; // index of next element to return int lastRet = -1; // index of last element, or -1 if no such Itr(RunnableScheduledFuture<?>[] array) { this.array = array; } public boolean hasNext() { return cursor < array.length; } // next就是遍历数组,没有别的特殊的操作 public Runnable next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); DelayedWorkQueue.this.remove(array[lastRet]); lastRet = -1; } }
这接口就是一个标记接口,继承了Delayed和Future,通常用于 ScheduledExecutorService 调度任务。
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
可运行的 ScheduledFuture。
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
//表示这个任务是可以周期性执行的
boolean isPeriodic();
}
从上面的类继承关系可以看出来,Future的一支表示任务执行的各个操作,Runnable表示任务是可以执行的。
// FIFO的序列号,这号码是ScheduledThreadPoolExecutor里面的sequencer,用这个的大小表示任务的先后顺序。FIFO private final long sequenceNumber; // 启用任务以纳米时间为单位执行的时间 private long time; // 重复任务的周期(以纳秒为单位)。 正值表示固定速率执行。 负值表示固定延迟执行。 值 0 表示非重复任务。 private final long period; // reExecutePeriodic 重新入队的实际任务 RunnableScheduledFuture<V> outerTask = this; // 堆中的位置的下标。支持快速取消。 int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
// 启用任务以纳米时间为单位执行的时间
this.time = ns;
// 任务重复周期
this.period = period;
// 任务的序列号,sequencer是ScheduledThreadPoolExecutor的属性
this.sequenceNumber = sequencer.getAndIncrement();
}
time-now表示任务剩余的执行时间。单位是NANOSECONDS。
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
如果是this,就返回0.对于ScheduledFutureTask的非ScheduledFutureTask的任务有不同的比较规则
ScheduledFutureTask任务
先比较time。time一样比较sequenceNumber。
非ScheduledFutureTask任务
直接比较getDelay
public int compareTo(Delayed other) { if (other == this) //如果是自己就返回0. return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; // 如果time相等,就比较sequenceNumber。 // sequenceNumber只比较小于,如果小于就返回-1,否则就是1。 if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } //如果不是ScheduledFutureTask,直接比较delay,下面的三元表达式写的比较绕。 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
设置下一次的执行时间,time表示下一次的执行时间,对应一开始说的初始时间+周期
问题?
为什么有一个大于0的判断,如果小于0,下面还有一个取反的操作(-p)
这是fixRate和fixDelay实现的重点。可以看到正数的p。会直接在time上直接添加p,表示下一次执行任务的时间。
如果是负数,就会在now上+p。作为下一次执行任务的时间。
private void setNextRunTime() {
long p = period;
// 这里为什么会有一个 大于0的判断,同时下面为什么有一个-p(取反)的操作
if (p > 0) //周期大于0.下一次的执行时间就是time的基础上+p
time += p;
else
//没有延期时间调用triggerTime,triggerTime方法是ScheduledThreadPoolExecutor的方法
time = triggerTime(-p);
}
没有啥可多说的。先是调用父类的cancel方法,然后通过三个参数判断,是否要移除。还是调用remove方法
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
先判断是否是周期性执行的任务,如果不是周期性的任务,直接执行,如果是周期性的任务,调用runAndReset
方法,并且设置下次执行任务的时间,并且将任务重新入队。
public void run() { // 是否是周期性的任务 boolean periodic = isPeriodic(); // 看是否能运行 if (!canRunInCurrentRunState(periodic)) //不能运行如果任务cancel cancel(false); else if (!periodic) // 如果不是周期性的任务。直接运行 ScheduledFutureTask.super.run(); // runAndReset是FutureTask的方法,这个方法会不会设置执行的结果,并且future的状态一直是NEW else if (ScheduledFutureTask.super.runAndReset()) { // 设置下次的执行时间 setNextRunTime(); // 重新入队,这方法也是ScheduledThreadPoolExecutor的方法 reExecutePeriodic(outerTask); } }
延期执行的任务和周期性的任务的实现
ScheduledFutureTask
,设置任务的执行实现(triggerTime)。DelayedWorkQueue
里面获取任务,DelayedWorkQueue是基于小头堆实现的。获取任务就开始执行。scheduleWithFixedRate 固定频率
scheduleWithFixedDelay 固定的间隔
这两方法都能实现一个周期性的任务。所以,先看下面的代码例子:
-. scheduleWithFixedRate
固定频率。
@Test public void testFixRate() throws IOException { ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); Runnable command = ()->{ try { logMessage("begin"); TimeUnit.SECONDS.sleep(5); logMessage("end"); } catch (InterruptedException e) { } }; service.scheduleAtFixedRate(command,0,3,TimeUnit.SECONDS); System.in.read(); } //logMessage public static void logMessage(Object o) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(simpleDateFormat.format(new Date()) + "-" + Thread.currentThread().getName() + ":" + o); }
固定频率。下一个任务的开始时间是 第一任务的开始时间+间隔时间。
所以,上面任务的执行时间超过了3秒。所以,第一个任务执行结束,第二个立即执行。
看看下面的例子
任务的执行时间是3秒,间隔时间确实5秒。可以看到,是按照任务的开始时间来算的
@Test
public void testFixRate() throws IOException {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
Runnable command = ()->{
try {
logMessage("begin");
TimeUnit.SECONDS.sleep(3);
logMessage("end");
} catch (InterruptedException e) {
}
};
service.scheduleAtFixedRate(command,0,5,TimeUnit.SECONDS);
System.in.read();
}
-. scheduleWithFixedDelay
固定延迟
@Test
public void testFixDelay() throws IOException {
ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
Runnable command = ()->{
try {
logMessage("begin");
TimeUnit.SECONDS.sleep(5);
logMessage("end");
} catch (InterruptedException e) {
}
};
service.scheduleWithFixedDelay(command,0,3,TimeUnit.SECONDS);
System.in.read();
}
下一次任务开始的时候是上一个任务结束的时间。
原因在于setNextRunTime
方法里面。
在用scheduleWithFixedDelay
提交的时候,ScheduledFutureTask
的period
就会变为负数。在setNextRunTime
里面就会起作用
private void setNextRunTime() {
// 如果是正数,就在time+p。否则就在now+p
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
@Test public void testFixDelay() throws IOException, ExecutionException, InterruptedException { ScheduledExecutorService service = Executors.newScheduledThreadPool(2); Runnable command = ()->{ logMessage("begin"); int a = 1/0; logMessage("end"); }; ScheduledFuture<?> scheduledFuture = service.scheduleWithFixedDelay(command, 0, 3, TimeUnit.SECONDS); logMessage("新的开始"); Runnable command1 = ()->{ logMessage("begin"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } logMessage("end"); }; service.scheduleWithFixedDelay(command1, 0, 2, TimeUnit.SECONDS); System.in.read(); }
如果这样的话, 异常日志也没有,这个任务就会结束掉,如果这个任务是一个周期任务,就只能执行一次。后面正常的任务还是可以运行的,并且还是可以提交任务的。
怎么能打印异常日志呢?
调用Future.get方法,ScheduledFutureTask本质也是一个Future的对象。之前适用于Future的都是可以用的。包括定时任务也是返回结果的。通过get就可以获取到,仅仅限于schedule方法,scheduleAtFixedRate和scheduleWithFixedDelay不可以。只能返回一个Future对象,用于管理任务。
因为提交的任务,不管是Runnable还是Callable,都被包装为Future了。就会走之前Future异常时的处理逻辑。
关于ScheduledThreadPoolExecutor的分析就分析到这里了。 如有不正确的地方,欢迎指出。谢谢。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。