当前位置:   article > 正文

ScheduleExecutorService分析_scheduledexecutorservice 继承关系

scheduledexecutorservice 继承关系

ScheduleExecutorService分析

本文和ThreadPoolExecutor 分析的文章结构一样。

在分析之前,建议先了解下面的几篇文章

  1. ArrayList分析

  2. ThreadPoolExecutor 分析

  3. DelayQueue 分析

  4. PriorityQueue分析

    因为下面设计到的有些的内容,会部分内容是和这里面的是重复的,所以,就没在详细的说。

1. 继承关系

从下面的图可以看出,根就是Executor接口,下面就挨个来看看这些接口和抽象类干了什么样的事情。

主要的实现是ScheduledThreadPoolExecutor,从图中可以看到,左边红线部分的相关逻辑已经在ThreadPoolExecutor 分析分析过了,重点有右边的ScheduleExecutorService

在这里插入图片描述

2. ScheduleExecutorService接口分析

继承于ExecutorService,并且这个接口可执行定时的任务,包括延迟启动,周期调用。

schedule 方法可以创建各种延迟的任务并且返回一个Future对象,这个Future对象可用于取消或者检查执行, scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建和执行定期运行直到被取消。命令的提交是通过Executor#execute(Runnable)ExecutorService#submit方法,0和负数代表立即执行。所有的方法都可以接受一个相对时间(延迟或者周期时间)。在Executors中提供了便捷的方法来创建。

命令真正的执行还是交给Executor#execute(Runnable)


public interface ScheduledExecutorService extends ExecutorService {

    // 延迟时间,只执行一次
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

   
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

      
  // 在执行的延迟时间之后,创建并且启用一个周期性的task,也就是说在initialDelay后,任务开始启动,下次执行之间是initialDelay+period,接下来是 initialDelay+period * 2,
  // 如果当前执行任务发送了任何的异常,这个任务就会后序的执行就会被抑制。(也就是不执行)
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

   
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

ScheduledThreadPoolExecutor分析

1. 构造函数

在这里插入图片描述

从上面的图可以知道,他是继承于ThreadPoolExecutor,在构造函数的时候,无一例外都调用到了父类。上面几个方法都大同小异,线程池主要就先看corePoolSize和maximumPoolSize和BlockingQueue,这里就挑一个就好了。

从构造函数可以看到,ScheduledThreadPoolExecutor中corePoolSize是可以设置的,maximumPoolSize是Integer.MAX_VALUE。BlockingQueue是DelayedWorkQueue(这个是一个内部类,类似于DelayedQueue和PriorityQueue)。keepAliveTime是0

  
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2. 属性


    /**
     * 当线程池状态变为shutDown之后,是否还要继续执行周期性的任务
     */
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    /**
     * 当线程池状态变为shutDown之后,是否还要继续执行delay(延迟)的任务
     */
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    /**
     true:如果cancel方法需要将task从queue移除。
     * True if ScheduledFutureTask.cancel should remove from queue
     */
    private volatile boolean removeOnCancel = false;

   // 原子Long型,用他来表示任务的顺序(添加的顺序)
    private static final AtomicLong sequencer = new AtomicLong();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3. schedule方法分析

一开始空指针检查,并且将传递进来的command,装饰变为RunnableScheduledFuture,关于RunnableScheduledFuture的具体分析在下面。decorateTask方法是一个受保护的方法,留给子类拓展的,这里只是将ScheduledFutureTask返回了。

将封装好的RunnableScheduledFuture入任务队列,并且如果线程数是小于coolPollSize,或者线程数是0,就会调用addWorker方法。

   public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
      // 空指针检查
        if (command == null || unit == null)
            throw new NullPointerException();
      // 将command装饰为一个RunnableScheduledFuture,
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
       // 将Future执行。这方法是很重要的。
        delayedExecute(t);
        return t;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
delayedExecute方法分析

实际就是将task添加到队列里面

主要是执行延迟或者周期性的任务,如果pool关闭,拒绝任务,否则就添加到taskQueue里面去,并且在添加之后还会在判断一次。

这里的判断是为了再次检验,如果线程池的转改变为shutDown(也就是说,在提交之前线程池的状态没有变为shutDown,提交了任务之后,这一瞬间,线程池的状态变为shutDown了),并且判断在当前的线程池的状态下面,当前任务是否还能运行,如果不能运行,就从taskQueue中移除任务,并且将task的状态变为CANCELLED,如果Future取消成功,并且removeOnCancel(表示future取消之后是否要从taskQueue中移除)为true,并且headIndex>0,这些条件满足就会从taskQueue出移除this。

ensurePrestart方法就是启动线程,并且这个方法和prestartCoreThread差不多。

问题

  1. 在task.cacnel之前,已经remove掉过了,task.cacnel的remove有什么意义。

    首先,这个task是继承与FutaskTask的子类,代码如下

       public boolean cancel(boolean mayInterruptIfRunning) {
          // 调用父类的 cancel方法,
                boolean cancelled = super.cancel(mayInterruptIfRunning);
          // 自己的逻辑处理,我感觉没有什么意义,这里的remove。和前面的remove都是一样的。感觉没有啥意义。
                if (cancelled && removeOnCancel && heapIndex >= 0)
                    remove(this);
                return cancelled;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    调用了FutureTask的remove方法,将Future的状态改为CANCELLED或者INTERRUPTING,如果mayInterruptIfRunning的话,可能还会中断当前执行的线程。但是这里的remove操作,确实和之前的操作重复了,我没有看懂这里是什么逻辑

 private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
           // 拒绝策略
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                 // 这里的remove方法和cancel中的remove方法重复了。
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
canRunInCurrentRunState方法分析

在运行或者SHUTDOWN的时候,判断当前的task是否可以运行,

periodic表示当前的task是否是一个周期性的任务

continueExistingPeriodicTasksAfterShutdown 表示,当线程池的状态变为Shutdown,是否继续运行当前存在的周期性的任务,默认是false。

executeExistingDelayedTasksAfterShutdown表示,当线程池的状态变为Shutdown,是否继续运行当前存在的延迟性的任务,默认是true。

通过上面两个,可以知道,在默认的情况下,ScheduledThreadPoolExecutor在调用了shutDown方法之后,也就是线程池的状态变为Shutdown之后,默认会继续执行已经存在的delay(延迟任务)

boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }
  • 1
  • 2
  • 3
  • 4
  • 5

4. ensurePrestart方法分析

检查必要线程是否需要创建

启动一个线程,代码很简单就不说了。

 void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

从上面可以看出,几个比较重点的地方

  1. 任务是提交到taskQueue中的,那么这个queue是什么样?
  2. 提交进来的任务都是可以包装成一个Future,这个Future是什么样?
  3. 对于延期或者周期性的任务,一次执行结束之后,下次执行的时候是不是将这个task重新添加到Queue?

带着这几个主要的问题,一起往下面看看


5. scheduleAtFixedRate方法分析

固定频率

一开始还是非空检查,将提交的runnable还是变为ScheduledFutureTask,这里的ScheduledFutureTask和上面的schedule是一个对象,就是参数不一样,decorateTask也是一样,重点是创建ScheduledFutureTask的时候多了几个构造参数,这个在下面说。

还是一样,调用decorateTask,让子类装饰一下,默认还是传递进去的ScheduledFutureTask。多了一个设置outerTask的步骤。outerTask表示实际真正的重新入queue的任务。默认就是this。

最后还是调用delayedExecute方法,作用在上面已经说了。

  public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    
       // ScheduledFutureTask的outerTask表示,实际真正的重新入queue的任务。默认就是this,
     // 那如果子类重写了decorateTask方法,返回了一个自定义的RunnableScheduledFuture,再次入队就是自定义的了。
        sft.outerTask = t;
     // 还是这个方法
        delayedExecute(t);
        return t;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

6. scheduleWithFixedDelay方法分析

固定间隔

  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

7. 重写ThreadPoolExecutor中的execute和submit方法

重写了这几个方法,就表示,不能像之前那样来提交任务了。可以看到,就是复用了schedule方法。

提交的任务是一个delay为0的任务。

    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }
  public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

8. triggerTime方法分析

得到下次触发的时间

其实就是当前时间+时间差

问题

  1. 为啥要判断delay < Long.MAX_VALUE >> 1?

    防止溢出。这样的话就能有效的避免 now + delay导致溢出的情况。

  2. overflowFree方法里面拿到堆头元素,然后比较,最后的一个 Long.MAX_VALUE + headDelay 是什么想法?

    首先要知道,进去到overflowFree是因为delay 大于等于Long.MAX_VALUE >> 1。也就是 Long.MAX_VALUE的一半。

    拿到堆头元素,如果delay都比headDelay小,也就是headDelay的值大。delay = Long.MAX_VALUE + headDelay。这样做事为什么?是因为栈溢出,就会导致值变小?这里我不太明白。并且如果堆头元素要是比delay小。那这个方法的返回值还原来的值,也有可能造成位溢出。

 //   
private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
 // 就是now+delay
// 如果delay 小于Long.MAX_VALUE >> 1。就是delay,否则就是overflowFree。
  long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

9. overflowFree方法分析

    private long overflowFree(long delay) {
        Delayed head = (Delayed) super.getQueue().peek();
        if (head != null) {
           // 拿到堆头元素,如果堆头元素小于0,并且 delay - headDelay < 0。delay就等于Long.MAX_VALUE + headDelay。
            long headDelay = head.getDelay(NANOSECONDS);
            if (headDelay < 0 && (delay - headDelay < 0))
               // 这个操作不就移除了嘛。难道溢出是正确的
                delay = Long.MAX_VALUE + headDelay;
        }
        return delay;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

10. setContinueExistingPeriodicTasksAfterShutdownPolicy方法分析

和这个方法类似的还要一个setExecuteExistingDelayedTasksAfterShutdownPolicy方法分析

在正常的设置值之后,还会判断线程池的状态,调用onShutdown方法。线程池的状态的判断就是ctl。

    public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
        continueExistingPeriodicTasksAfterShutdown = value;
       // 如果为不允许,并且当前线程池已经关闭了,就会调用onShutdown
        if (!value && isShutdown())
            onShutdown();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

11. reExecutePeriodic方法分析

重新执行周期性的任务

先判断,线程池在当前的状态下,周期性的任务是否能继续运行,可以就将这个任务重新添加到队列里面。在添加完之后,在判断线程池在当前的状态下是否能继续运行周期性的任务,如果不能,调用移除掉这个任务,在中断task的执行。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    // 判断当前线程池的状态
        if (canRunInCurrentRunState(true)) {
           // 添加task
            super.getQueue().add(task);
            // 如果不能运行,就remove掉 这个remove操作和ScheduledFutureTask的cancel重复了。
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
               // 保障必须的线程是否在运行。
                ensurePrestart();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

14. onShutdown方法分析

onShutdown方法有下面的几个调用的地方

  1. setContinueExistingPeriodicTasksAfterShutdownPolicy
  2. setExecuteExistingDelayedTasksAfterShutdownPolicy
  3. shutdown

都是在shutdown的语义下面调用的。

拿到两个策略,看在shutDown的状态下面是否需要运行。判断任务类型,看情况是否要终止。

 @Override void onShutdown() {
      // 拿到队列
        BlockingQueue<Runnable> q = super.getQueue();
     // 是要要继续执行delay任务
        boolean keepDelayed =
            getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // 是否要继续执行周期性的任务
        boolean keepPeriodic =
            getContinueExistingPeriodicTasksAfterShutdownPolicy();
     // 如果两个不需要运行,遍历数组,直接关闭
        if (!keepDelayed && !keepPeriodic) {
            for (Object e : q.toArray())
                if (e instanceof RunnableScheduledFuture<?>)
                    ((RunnableScheduledFuture<?>) e).cancel(false);
            q.clear();
        }
        else {
          
            for (Object e : q.toArray()) {
                if (e instanceof RunnableScheduledFuture) {
                   // 
                    RunnableScheduledFuture<?> t =
                        (RunnableScheduledFuture<?>)e;
                   // 是否是周期性task,再看看是否要关闭,满足直接remove,并且cacel
                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                        t.isCancelled()) { // also remove if already cancelled
                        if (q.remove(t))
                            t.cancel(false);
                    }
                }
            }
        }
        tryTerminate();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

DelayedWorkQueue分析

DelayedWorkQueue是基于堆结构的,和DelayQueue,PriorityQueue有点像,堆存储ScheduledFutureTask,在RunnableScheduledFutures(里面存储该元素在堆中数组下标的位置),降低了在取消任务时候的查找时间复杂度。提高了remova方法速度,减少垃圾回收保留。

属性分析

        // 初始容量
        private static final int INITIAL_CAPACITY = 16;
        
       // 像DelayQueue和PriorityQueue一样,堆的实现基本都是数组,所以这里就是数组啦。一个RunnableScheduledFuture的数组。
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
      
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;

        // 这里也应用了Leader-Follower。
        private Thread leader = null;

        // 这个主要是是用来做leader-follower的。在之前的PriorityQueue里面已经做了分析
        private final Condition available = lock.newCondition();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

setIndex方法分析

ScheduledFutureTask对比之前放在队列中的任务,这个任务能记录在数组中的下标的位置,这样就就能提高查找效率。

  private void setIndex(RunnableScheduledFuture<?> f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }
  • 1
  • 2
  • 3
  • 4

siftUp方法分析

从队列的头部添加元素,在PriorityQueue有类似的方法。基本作用都是一样的。和PriorityQueue不一样的一个地方在于多了一个 setIndex(e, k);方法。

  private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
               // 找到当前要插入位置的父节点
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

siftDown方法分析

从队列的尾部添加元素,在PriorityQueue有类似的方法。基本作用都是一样的。和PriorityQueue不一样的地方也是在于这个方法多了一个 setIndex(e, k);方法。记录节点在数组中的下标。

       
        private void siftDown(int k, RunnableScheduledFuture<?> key) {
           // >>>是逻辑右移动(无符号移动,高位补符号位)
            int half = size >>> 1;
            while (k < half) {
               // 根节点和子节点的关系。,
               //  (父节点)queue[n]  (左子节点)queue[2*n+1] (右子节点) queue[2*(n+1)].
                int child = (k << 1) + 1;
                RunnableScheduledFuture<?> c = queue[child];
                int right = child + 1;
                if (right < size && c.compareTo(queue[right]) > 0)
                    c = queue[child = right];
                if (key.compareTo(c) <= 0)
                    break;
                queue[k] = c;
                setIndex(c, k);
                k = child;
            }
            queue[k] = key;
            setIndex(key, k);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

grow方法分析

       
        private void grow() {
            int oldCapacity = queue.length;
            // 扩容现在的1.5倍,(1+1/2)
            int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
           /// int移除了
            if (newCapacity < 0) // overflow
                newCapacity = Integer.MAX_VALUE;
           //还是调用的Arrays.copyOf
            queue = Arrays.copyOf(queue, newCapacity);
        }


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

offer方法分析

入堆方法和之前PriorityQueue#offer方法一致。这里就不在说了

问题

  1. 当前元素放进去是最小的。为什么要将leader变为null。唤醒Follower?

    按照之前在take方法的时候,拿到堆头元素,如果时间没有到就会有一个等待的操作,在这样的前提下面,如果当前存入的元素是堆中最小的,说明,那就不能让之前等待的元素返回。所以,这里就要打断他的等待机制,让leader变为null。其他的follower看leader变为null。就能操作了,之前等待的线程就无效

	//入队
        public boolean offer(Runnable x) {
           // 空指针检查
            if (x == null)
                throw new NullPointerException();
           // 强制类型转换
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
               //扩容
                if (i >= queue.length)
                    grow();
               // size ++
                size = i + 1;
               
              // 如果没有,就将当前的元素变为堆的第一个。并且设置他的heapIndex
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                  //入堆了
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                   // 如果放进去的元素就是堆中最小的。就将leader变为null,等待的Follower开始竞争锁。
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

finishPoll方法分析

通用的方法,再出堆的时候,将要出去的元素的heapIndex变为-1,将堆中最后一个位置的元素重新从堆头插入,这样就能引起整个堆的重新调整。

    private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
          //--size。
            int s = --size;
         // 拿到堆里的最后一个元素
            RunnableScheduledFuture<?> x = queue[s];
          // 消除引用关系
            queue[s] = null;
         // 如果堆中还有元素
            if (s != 0)
               // 将堆中的最后一个元素放在堆头。这样的话,就能引起整个堆的重新调整。
                siftDown(0, x);
         // 将要出堆的元素的heapIndex变为-1。表示这个元素已经不在堆里面弄了。
            setIndex(f, -1);
            return f;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

poll方法分析

问题

  1. 这里为啥不等待?

    poll是Queue接口的方法,这不是一个等待的接口,所以,不需要。

    poll(long timeout, TimeUnit unit)、E take() throws InterruptedException;、这些都是属于 BlockingQueue接口,这接口是一个等待接口。所以,下面看到的take方法和pol带超时时间的方法和这里的差别很大。

  public RunnableScheduledFuture<?> poll() {
       // 获取锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
               //拿到堆头元素。
                RunnableScheduledFuture<?> first = queue[0];
                // 堆头没有元素或者堆头元素还没到时间,直接返回null
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                   // 调用上面分析的方法
                    return finishPoll(first);
            } finally {
                lock.unlock();
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

take()方法分析

阻塞等待,获取元素

主体的操作和DelayQueue的take方法差不多。这里就不再说了。建议看了DelayQueue的分析的文章之后再来看。

问题

  1. finally里面的代码,在await之后还会执行吗?

    不会执行。只有在await被唤醒之后,代码正常走的时候才会执行

    看下面的列子

     @Test
        public void testWaitFinally() throws InterruptedException, IOException {
            AtomicBoolean flag = new AtomicBoolean(false);
            ReentrantLock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            new Thread(()->{
                lock.lock();
                try {
                    if(!flag.get()) {
                        logMessage("wait");
                        condition.await();
    
                    }
                    logMessage("begin work");
    
                } catch (InterruptedException e) {
                    logMessage(e);
                } finally {
                    logMessage("release lock");
                    lock.unlock();
                }
    
            }).start();
    
            TimeUnit.SECONDS.sleep(5);
            logMessage("start set flag thread");
            new Thread(()->{
                lock.lock();
                try {
                    flag.set(true);
                    condition.signalAll();
                    logMessage("set flag success");
                }finally {
                    lock.unlock();
                }
            }).start();
    
            System.in.read();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

在这里插入图片描述

wait的时候线程都被park住了,finally里面的代码肯定就不能执行的啦。只有线程正常了之后,才能执行啦。

 public RunnableScheduledFuture<?> take() throws InterruptedException {
        //获取锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                   //拿到堆头元素
                    RunnableScheduledFuture<?> first = queue[0];
                   // 如果没有,就说明整个堆就没有值。只能等待,注意,现在这个时候还没有 Leader-Follower。
                    if (first == null)
                        available.await();
                    else {
                       // 拿到堆头元素,
                        long delay = first.getDelay(NANOSECONDS);
                       // 已经到期了,直接返回就行,不用等待
                        if (delay <= 0)
                            return finishPoll(first);
                      
                        first = null; 
                        // 到这里,就说明堆头元素还没有过期。并且还有leader,那么Follower就等待吧。
                        if (leader != null)
                            available.await();
                        else {
                           // 没有leader,当前线程变为leader。在mainLock关联的Condition上面等待。
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                              // 注意,这里和下面的poll带超时时间的不一样,只能等待delay的时间。注意这里和下面的poll做区别
                                available.awaitNanos(delay);
                            } finally {
                               // 这个finally,在上面的等待之后,会执行吗?
                               // 如果在等待的时候,堆头的元素没有变化,当前线程的等待是有用的。
                               // 直接把leader变为null了。别的Follower就能继续了
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
               // 如果leader变为null。并且堆头还有元素,唤醒Follower继续
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

poll方法分析

带超时的等待获取操作,基本和take差不多。多了一个等待超时的操作

问题

  1. if (nanos < delay || leader != null)
                                nanos = available.awaitNanos(nanos);
    
    • 1
    • 2

​ nacos 为啥要小于delay,大于不行吗?

​ 因为nacos代表的语义是等待超时时间,delay代表的语义是元素到期时间。超时时间都比到期时间小了,肯定等于超时时间,如果等于过期时间的话,poll带超时时间的这个功能不就么得了嘛。

  1. long timeLeft = available.awaitNanos(delay);
                                    nanos -= delay - timeLeft;
    
    • 1
    • 2

    这里的操作有什么作用?

    ​ 防止虚假唤醒,timeLeft表示剩下的时间,那么下一次的超时时间就应该是 原来的超时时间-delay-timeLeft

  2. 什么虚假唤醒?

    因为wait notify的时候是随机的,就有可能出现一种情况。线程不满足条件刚刚wait,又被notify。如果处理不当,超时时间就不精确了。

    一般是类似于下面的代码来做处理的。死循环加等待时间的调整。

     long waitTime = 0 // 超时时间。
            long expectTime = System.currentTimeMillis() + waitTime;
            for (;;){
                wait(waitTime);
                waitTime = expectTime-System.currentTimeMillis();
                if(waitTime <=0) {
                    // 超时
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

        public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
           // 转换时间单位,超时时间
            long nanos = unit.toNanos(timeout);
          // 获取mainLock
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                   // 拿到堆头元素
                    RunnableScheduledFuture<?> first = queue[0];
                   // 堆头元素没有,看超时时间,如果有超时时间,当前线程就等待 超时时间。等时间到了之后,重新获取锁,在继续拿到堆头元素。
          // 如果下次来的时候,正好也没有堆头元素,那他还会再次等待这么长时间的。    
                    if (first == null) {
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                       // 拿到delay,小于0就直接出堆
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        // 如果还没有到期,但是超时时间到了,就直接返回null
                        if (nanos <= 0)
                            return null;
                        first = null; 
                       // 超时时间小于 到期时间 或者 现在是有leader的。就等待超时时间。
                      
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                           // 还是和上面一样的操作,
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 注意这里,timeLeft表示delay-available等待的时间,也就是剩下的等待时间
                               // 小于或者等于0.表示到期了。
                                long timeLeft = available.awaitNanos(delay);
                                 // 剩下的等待时间=等待时间-到期时间-剩下的等待时间。
                               // 会继续走下次的循环。再次走上面的逻辑。
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
              // 这里的逻辑和上面的逻辑都一样的。
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

clear方法分析

将堆变为null,并且将RunnableScheduledFuture的heapIndex变为-1

  public void clear() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                for (int i = 0; i < size; i++) {
                    RunnableScheduledFuture<?> t = queue[i];
                    if (t != null) {
                        queue[i] = null;
                        setIndex(t, -1);
                    }
                }
                size = 0;
            } finally {
                lock.unlock();
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

peekExpired方法分析

看堆头元素是否过期

拿到堆头的元素,看超时时间是否大于0,大于0就返回null,如果小于等于0,返回堆头元素。

    private RunnableScheduledFuture<?> peekExpired() {
            RunnableScheduledFuture<?> first = queue[0];
            return (first == null || first.getDelay(NANOSECONDS) > 0) ?
                null : first;
        }
  • 1
  • 2
  • 3
  • 4
  • 5

drainTo方法分析

public int drainTo(Collection<? super Runnable> c, int maxElements)方法和这个方法基本一样,就是在下面的peekExpired的if里面增加了对于maxElements的判断(限制了允许的操作的元素的数量)。

和别的队列的drainTo方法不一样。在collection里面添加的元素,是已经到期的元素。

 public int drainTo(Collection<? super Runnable> c) {
         // 安全检查
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
   
    // 获取mainLock
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                RunnableScheduledFuture<?> first;
                int n = 0;
               // 不断的拿到堆头元素,如果已经过期,就添加到传递进来的集合里面。
               // n表示 满足的元素的数量
                while ((first = peekExpired()) != null) {
                    c.add(first);   
                    finishPoll(first);
                    ++n;
                }
                return n;
            } finally {
                lock.unlock();
            }
        }


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

迭代器

这里的迭代器是很简单的, 就是遍历数组就好了。注意,这里没有快速失败的机制。

关于关于失败,可以看ArrayList分析


        private class Itr implements Iterator<Runnable> {
            final RunnableScheduledFuture<?>[] array;
            int cursor = 0;     // index of next element to return
            int lastRet = -1;   // index of last element, or -1 if no such

            Itr(RunnableScheduledFuture<?>[] array) {
                this.array = array;
            }

            public boolean hasNext() {
                return cursor < array.length;
            }
       // next就是遍历数组,没有别的特殊的操作
            public Runnable next() {
                if (cursor >= array.length)
                    throw new NoSuchElementException();
                lastRet = cursor;
                return array[cursor++];
            }

            public void remove() {
                if (lastRet < 0)
                    throw new IllegalStateException();
                DelayedWorkQueue.this.remove(array[lastRet]);
                lastRet = -1;
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

ScheduledFutureTask分析

类继承关系

在这里插入图片描述

ScheduledFuture接口

这接口就是一个标记接口,继承了Delayed和Future,通常用于 ScheduledExecutorService 调度任务。

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

  • 1
  • 2
  • 3
RunnableScheduledFuture接口

可运行的 ScheduledFuture。

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
   //表示这个任务是可以周期性执行的
    boolean isPeriodic();
}

  • 1
  • 2
  • 3
  • 4
  • 5

从上面的类继承关系可以看出来,Future的一支表示任务执行的各个操作,Runnable表示任务是可以执行的。

属性分析

// FIFO的序列号,这号码是ScheduledThreadPoolExecutor里面的sequencer,用这个的大小表示任务的先后顺序。FIFO
private final long sequenceNumber; 

      // 启用任务以纳米时间为单位执行的时间
        private long time;

         // 重复任务的周期(以纳秒为单位)。 正值表示固定速率执行。 负值表示固定延迟执行。 值 0 表示非重复任务。
        private final long period;

       // reExecutePeriodic 重新入队的实际任务
        RunnableScheduledFuture<V> outerTask = this;

       // 堆中的位置的下标。支持快速取消。
        int heapIndex;


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

构造函数

在这里插入图片描述

      ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
         // 启用任务以纳米时间为单位执行的时间
            this.time = ns;
         // 任务重复周期
            this.period = period;
         // 任务的序列号,sequencer是ScheduledThreadPoolExecutor的属性
            this.sequenceNumber = sequencer.getAndIncrement();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

getDelay方法分析

time-now表示任务剩余的执行时间。单位是NANOSECONDS。

        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }
  • 1
  • 2
  • 3
compareTo方法分析

如果是this,就返回0.对于ScheduledFutureTask的非ScheduledFutureTask的任务有不同的比较规则

  • ScheduledFutureTask任务

    先比较time。time一样比较sequenceNumber。

  • 非ScheduledFutureTask任务

    直接比较getDelay


        public int compareTo(Delayed other) {
            if (other == this) //如果是自己就返回0.
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
               // 如果time相等,就比较sequenceNumber。
               // sequenceNumber只比较小于,如果小于就返回-1,否则就是1。
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
           //如果不是ScheduledFutureTask,直接比较delay,下面的三元表达式写的比较绕。
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

setNextRunTime(有点重要)

设置下一次的执行时间,time表示下一次的执行时间,对应一开始说的初始时间+周期

问题?

  1. 为什么有一个大于0的判断,如果小于0,下面还有一个取反的操作(-p)

    这是fixRate和fixDelay实现的重点。可以看到正数的p。会直接在time上直接添加p,表示下一次执行任务的时间。

    如果是负数,就会在now上+p。作为下一次执行任务的时间。

     private void setNextRunTime() {
            long p = period;
         // 这里为什么会有一个 大于0的判断,同时下面为什么有一个-p(取反)的操作
            if (p > 0) //周期大于0.下一次的执行时间就是time的基础上+p
                time += p;
            else
                //没有延期时间调用triggerTime,triggerTime方法是ScheduledThreadPoolExecutor的方法
                time = triggerTime(-p);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

cancel方法分析

没有啥可多说的。先是调用父类的cancel方法,然后通过三个参数判断,是否要移除。还是调用remove方法

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = super.cancel(mayInterruptIfRunning);
        if (cancelled && removeOnCancel && heapIndex >= 0)
            remove(this);
        return cancelled;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

run方法分析(重点)

先判断是否是周期性执行的任务,如果不是周期性的任务,直接执行,如果是周期性的任务,调用runAndReset方法,并且设置下次执行任务的时间,并且将任务重新入队。

    public void run() {
       // 是否是周期性的任务
            boolean periodic = isPeriodic();
          // 看是否能运行
            if (!canRunInCurrentRunState(periodic))
               //不能运行如果任务cancel
                cancel(false);
            else if (!periodic)
                // 如果不是周期性的任务。直接运行
                ScheduledFutureTask.super.run();
           // runAndReset是FutureTask的方法,这个方法会不会设置执行的结果,并且future的状态一直是NEW
            else if (ScheduledFutureTask.super.runAndReset()) {
                 // 设置下次的执行时间
                setNextRunTime();
                // 重新入队,这方法也是ScheduledThreadPoolExecutor的方法
                reExecutePeriodic(outerTask);
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

总结

定时任务是怎么实现的?

延期执行的任务和周期性的任务的实现

  1. 任务包装成ScheduledFutureTask,设置任务的执行实现(triggerTime)。
  2. 如果线程池关闭,拒绝策略,
  3. 添加到队列里面。如果线程数小于核心线程,创建线程或者核心线程等于0了,创建线程。
  4. 此时Worker就从DelayedWorkQueue里面获取任务,DelayedWorkQueue是基于小头堆实现的。获取任务就开始执行。
  5. 这个任务就是ScheduledFutureTask,主要就看run方法里面,在run里面会判断this是不是一个周期性的任务,如果是周期性的任务就会调用runAndReset。设置下一次任务的执行时间,重新入队。周期性任务实现的关键。如果不是周期性的任务,就调用父类的run方法。

scheduleWithFixedDelay和scheduleWithFixedRate区别?

scheduleWithFixedRate 固定频率

scheduleWithFixedDelay 固定的间隔

这两方法都能实现一个周期性的任务。所以,先看下面的代码例子:

-. scheduleWithFixedRate

固定频率。

   @Test
    public void testFixRate() throws IOException {
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        Runnable command = ()->{
            try {
                logMessage("begin");
                TimeUnit.SECONDS.sleep(5);
                logMessage("end");
            } catch (InterruptedException e) {

            }
        };
        service.scheduleAtFixedRate(command,0,3,TimeUnit.SECONDS);
        System.in.read();
    }

//logMessage
   public static void logMessage(Object o) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date()) + "-" + Thread.currentThread().getName() + ":" + o);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

在这里插入图片描述

固定频率。下一个任务的开始时间是 第一任务的开始时间+间隔时间。

所以,上面任务的执行时间超过了3秒。所以,第一个任务执行结束,第二个立即执行。

看看下面的例子

任务的执行时间是3秒,间隔时间确实5秒。可以看到,是按照任务的开始时间来算的

 @Test
    public void testFixRate() throws IOException {
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        Runnable command = ()->{
            try {
                logMessage("begin");
                TimeUnit.SECONDS.sleep(3);
                logMessage("end");
            } catch (InterruptedException e) {

            }
        };
        service.scheduleAtFixedRate(command,0,5,TimeUnit.SECONDS);
        System.in.read();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在这里插入图片描述

-. scheduleWithFixedDelay

固定延迟

  @Test
    public void testFixDelay() throws IOException {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
        Runnable command = ()->{
            try {
                logMessage("begin");
                TimeUnit.SECONDS.sleep(5);
                logMessage("end");
            } catch (InterruptedException e) {

            }
        };
        service.scheduleWithFixedDelay(command,0,3,TimeUnit.SECONDS);
        System.in.read();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在这里插入图片描述

下一次任务开始的时候是上一个任务结束的时间。

原因在于setNextRunTime方法里面。

在用scheduleWithFixedDelay提交的时候,ScheduledFutureTaskperiod就会变为负数。在setNextRunTime里面就会起作用

    private void setNextRunTime() {
       // 如果是正数,就在time+p。否则就在now+p
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在执行的任务的时候遇到异常,会怎么样?

    @Test
    public void testFixDelay() throws IOException, ExecutionException, InterruptedException {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
        Runnable command = ()->{
            logMessage("begin");
            int a = 1/0;
            logMessage("end");
        };
        ScheduledFuture<?> scheduledFuture = service.scheduleWithFixedDelay(command, 0, 3, TimeUnit.SECONDS);


        logMessage("新的开始");

        Runnable command1 = ()->{
            logMessage("begin");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logMessage("end");
        };
         service.scheduleWithFixedDelay(command1, 0, 2, TimeUnit.SECONDS);
        System.in.read();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

如果这样的话, 异常日志也没有,这个任务就会结束掉,如果这个任务是一个周期任务,就只能执行一次。后面正常的任务还是可以运行的,并且还是可以提交任务的。

怎么能打印异常日志呢?

调用Future.get方法,ScheduledFutureTask本质也是一个Future的对象。之前适用于Future的都是可以用的。包括定时任务也是返回结果的。通过get就可以获取到,仅仅限于schedule方法,scheduleAtFixedRate和scheduleWithFixedDelay不可以。只能返回一个Future对象,用于管理任务。

因为提交的任务,不管是Runnable还是Callable,都被包装为Future了。就会走之前Future异常时的处理逻辑。


关于ScheduledThreadPoolExecutor的分析就分析到这里了。 如有不正确的地方,欢迎指出。谢谢。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
  

闽ICP备14008679号