当前位置:   article > 正文

java定时任务接口ScheduledExecutorService_java scheduledexecutorservice

java scheduledexecutorservice

一、ScheduledExecutorService 设计思想

ScheduledExecutorService,是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。

需要注意,只有当调度任务来的时候,ScheduledExecutorService才会真正启动一个线程,其余时间ScheduledExecutorService都是出于轮询任务的状态。

1、线程任务

class MyScheduledExecutor implements Runnable {
    
    private String jobName;
    
    MyScheduledExecutor() {
        
    }
    
    MyScheduledExecutor(String jobName) {
        this.jobName = jobName;
    }

    @Override
    public void run() {
        
        System.out.println(jobName + " is running");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2、定时任务

public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
        
        long initialDelay = 1;
        long period = 1;
        // 从现在开始1秒钟之后,每隔1秒钟执行一次job1
        service.scheduleAtFixedRate(new MyScheduledExecutor("job1"), initialDelay, period, TimeUnit.SECONDS);
        
        // 从现在开始2秒钟之后,每隔2秒钟执行一次job2
        service.scheduleWithFixedDelay(new MyScheduledExecutor("job2"), initialDelay, period, TimeUnit.SECONDS);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

ScheduledExecutorService 中两种最常用的调度方法 ScheduleAtFixedRate 和 ScheduleWithFixedDelay。ScheduleAtFixedRate 每次执行时间为上一次任务开始起向后推一个时间间隔,即每次执行时间为 :initialDelay, initialDelay+period, initialDelay+2*period, …;ScheduleWithFixedDelay 每次执行时间为上一次任务结束起向后推一个时间间隔,即每次执行时间为:initialDelay, initialDelay+executeTime+delay, initialDelay+2*executeTime+2*delay。由此可见,ScheduleAtFixedRate 是基于固定时间间隔进行任务调度,ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔进行任务调度。


如果在一个ScheduleExecutorService中提交一个任务,这个任务的调度周期设置
的时间比任务本身执行的时间短的话会出现什么情况?也就是在线程调度时间已经到了
但是上次的任务还没有做完的情况下,ScheduleExecutorService是怎么处理的?

  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

这个问题曾经困扰了我很久,我们都知道,ScheduleExecutorService是一个支持周期调度的线程池,我们可以设置调度的周期period,ScheduleExecutorService会按照设定好的周期调度我们的任务,如果我们设定的调度周期小于任务运行时间,那么很好理解,比如说我们设置的调度周期为1秒,而任务实际只需要10毫秒就可以执行完成一次,那么执行完成之后放到调度队列即可,下次调度时间到了再次调度执行。那么,如果我们的任务执行时间大于我们设定的调度时间会怎么样?比如我们设定的调度周期为1秒,但是我们的任务每次需要执行2秒,这个情况是不是很奇怪呢?

对于ScheduleExecutorService来说,你给我设定的调度周期是1秒,那么我当然1秒就会去运行一次你,但是运行1秒后发现你还在运行,那我是再次运行你还是等你运行完成再调度你运行?

当然,这都是我的主观臆断来猜测ScheduleExecutorService的原理,ScheduleExecutorService的真正原理需要去阅读源码来理解,下面带着这个问题,以解决这个问题为目标去看一下ScheduleExecutorService的源码吧。

首先,我们使用下面的代码作为测试:


    private static Runnable blockRunner = () -> {
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("one round:" + new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
<span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> ScheduledExecutorService scheduledExecutorService =
        Executors.newScheduledThreadPool(<span class="hljs-number">2</span>);

<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span><span class="hljs-params">(String ... args)</span> </span>{


    scheduledExecutorService
            .scheduleAtFixedRate(blockRunner, <span class="hljs-number">0</span>, <span class="hljs-number">100</span>, TimeUnit.MILLISECONDS);

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

我们设定了调度周期为100毫秒,但是blockRunner实际上需要执行2秒才能返回。关于java的线程池,已经在前面的文章中写到了,可以参考下面的文章:

先来看一下scheduleAtFixedRate这个方法:


    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

我们的任务command被包装了两次,一次变成了一个ScheduledFutureTask类型的对象,然后又变成了RunnableScheduledFuture类型的对象。然后执行了一个方法delayedExecute,这个方法字面意思上看起来像是延时执行的意思,看一下它的代码:


    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

它的执行逻辑是:如果线程池被关闭了,那么拒绝提交的任务,否则,将该任务添加队列中去。这个队列就是ThreadPoolExecutor中的workQueue,而这个workQueue是在ThreadPoolExecutor的构造函数中被初始化的,也就是下面这关键的一句:

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

也就是说,我们的任务被添加到了一个DelayedWorkQueue队列中去了,而DelayedWorkQueue我们在Java阻塞队列详解中已经分析过,它是一个可以延迟消费的阻塞队列。而延时的时间是通过接口Delayed的getDelay方法来获得的,我们最后找到ScheduledFutureTask实现了Delayed的getDelay方法。


        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

time变量是什么?原来是delay,好像和period无关啊!!分析了这么久,发现这是第一次执行任务的逻辑啊,我想知道的是第二次、第三次以后和初始的delay无关之后的周期调度的情况啊,继续找吧!

然后发现了ScheduledFutureTask的run方法,很明显这就是任务调度被执行的关键所在,看下代码:


        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

最为关键的地方在于:


            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

首先是:runAndReset()这个方法,然后是setNextRunTime()这个方法,然后是reExecutePeriodic(outerTask)这个方法。
第一个方法runAndReset()貌似是执行我们的提交的任务的,我们看下代码:


    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

关键的地方是c.call()这一句,这个c就是我们提交的任务。
第二个方法setNextRunTime()的意思是设置下次执行的时间,下面是他的代码细节:


        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

我们只需要看p>0这个分支就可以了,其实这是两种策略。我们的示例对应了第一个分支的策略,所以很显然,time这个变量会被加p,而p则是我们设定好的period。下面我们找一下这个time是在哪里初始化的,回忆一下scheduleAtFixedRate这个方法的内,我们说我们的任务被包装了两次,而time就是在这里被初始化的:


    /**
     * Returns the trigger time of a delayed action.
     */
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
<span class="hljs-comment">/**
 * Returns the trigger time of a delayed action.
 */</span>
<span class="hljs-function"><span class="hljs-keyword">long</span> <span class="hljs-title">triggerTime</span><span class="hljs-params">(<span class="hljs-keyword">long</span> delay)</span> </span>{
    <span class="hljs-keyword">return</span> now() +
        ((delay &lt; (Long.MAX_VALUE &gt;&gt; <span class="hljs-number">1</span>)) ? delay : overflowFree(delay));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

无论如何,我们知道一个任务会被运行完一次之后再次设置时间,然后线程池会获取任务来执行,而任务队列是一个延时阻塞队列,所以也就造成了周期性运行的假象。可以看下下面获取任务的take方法:


  public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

可以看到,如果delay小于等于0,那么就是说需要被立即调度,否则延时delay这样一段时间。也就是延时消费。

结论就是,一个任务会被重复添加到一个延时任务队列,所以同一时间任务队列中会有多个任务待调度,线程池会首先获取优先级高的任务执行。如果我们的任务运行时间大于设置的调度时间,那么效果就是任务运行多长时间,调度时间就会变为多久,因为添加到任务队列的任务的延时时间每次都是负数,所以会被立刻执行。

转载自:https://www.jianshu.com/p/8c4c160ebdf7
https://www.cnblogs.com/chenmo-xpw/p/5555931.html

参考文献 :

http://www.ibm.com/developerworks/cn/java/j-lo-taskschedule/

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/83920
推荐阅读
相关标签
  

闽ICP备14008679号