当前位置:   article > 正文

关于延时、周期任务调度 —— ScheduledExecutorService ScheduledThreadPoolExecutor_scheduledexecutorservice threadpooltaskexecutor

scheduledexecutorservice threadpooltaskexecutor

前言

JUC 提供了相关的类以支持对 延时任务周期任务 的支持

功能类似于 java.util.Timer,
但官方推荐使用功能更为强大全面的 
ScheduledThreadPoolExecutor,
因为后者支持多任务作业,即线程池
  • 1
  • 2
  • 3
  • 4

ScheduledExecutorService:拓展了 ExecutorService 接口,提供了任务的 延时执行、周期执行 方法

ScheduledThreadPoolExecutor:拓展了通用线程池 ThreadPoolExecutor,额外提供了任务的 延时执行、周期执行 等方法

关于 ThreadPoolExecutor 介绍的传送门
  • 1

线程池浅析 ThreadPoolExecutor FutureTask

ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {

    /**
     * 提交一个延时非周期任务,
     *      返回的 ScheduledFuture 其 get 方法返回 null
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    /**
     * 提交一个延时非周期任务,
     *      返回的 ScheduledFuture 可以调用 get 方法返回执行结果
     *      cancel 方法可以取消任务(在任务执行完成前)
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    /**
     * 在指定延时后开始周期执行,
     *      后一个任务的执行时机在前一个任务开始执行后算起
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * 在指定延时后开始周期执行,
     *      后一个任务的执行时机在前一个任务执行结束后算起
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
                                                     
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

ScheduledExecutorService 拓展了 schedule 方法以支持对 RunnableCallable 的延时执行,结果都返回一个 ScheduledFuture,当然前者无法返回结果(调用 get 方法返回 null

拓展了 scheduleAtFixedRatescheduleWithFixedDelay 支持在任务在延时指定时间后 周期执行,直到任务被 cancel发生异常

ScheduledThreadPoolExecutor

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService
  • 1
  • 2
  • 3

拓展了 ThreadPoolExecutor(线程池),同时实现了 ScheduledExecutorService

构造方法

	public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), handler);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

允许指定 corePoolSize核心线程数threadFactory线程创建工厂handler任务拒绝策略

execute & submit

	// 0 延时 schedule
    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }

    // 0 延时 schedule
    public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }

    // 0 延时 schedule,将 Runnable 包装成 Callable
    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(Executors.callable(task, result), 0, NANOSECONDS);
    }

    // 0 延时 schedule
    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, NANOSECONDS);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

对常规(指继承自 ThreadPoolExecutor 的)方法的实现都是委托给 schedule 方法,即 0 延时的 schedule

schedule

	public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();

        /**
         * 将任务包装成 ScheduledFutureTask
         */
        RunnableScheduledFuture<Void> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit),
                                          sequencer.getAndIncrement()));

        // 执行任务(ScheduledFutureTask)
        delayedExecute(t);
        return t;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在该方法中,将 Runnable 包装成对应的 ScheduledFutureTask 交由 delayedExecute 方法

scheduleAtFixedRate

	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0L)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period),
                                          sequencer.getAndIncrement());

        /**
         * 钩子方法
         * 允许我们自定义 ScheduledFutureTask
         */
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

将任务包装成对应的 ScheduledFutureTask,并提供 钩子方法 decorateTaskScheduledFutureTask 进行再“装饰”,最终也是委托给 delayedExecute 方法

scheduleWithFixedDelay

	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0L)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          -unit.toNanos(delay),
                                          sequencer.getAndIncrement());
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

雷同,最终也是交给 delayedExecute 方法执行,delayedExecute 方法会把任务交给 线程池 来执行,具体的执行逻辑见 ScheduledFutureTask#run

ScheduledFutureTask#run

	public void run() {
            
        // 判断是否 cnacel
        if (!canRunInCurrentRunState(this))
            cancel(false);
        
        // 是否周期调用,非周期则直接 super.run()
        else if (!isPeriodic())
            super.run();
        
        // 周期则进行下一波准备与调度
        else if (super.runAndReset()) {
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }

	--------------- reExecutePeriodic ---------------

	// 重新加入队列
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(task)) {
            super.getQueue().add(task);
            if (canRunInCurrentRunState(task) || !remove(task)) {
                ensurePrestart();
                return;
            }
        }
        task.cancel(false);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

对于 非周期 的任务,则直接调用 super.run()
对于 周期 任务,则会继续下一次的调用,实现 周期调用

示例

public class ScheduledThreadPoolTest {

    static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor
            = new ScheduledThreadPoolExecutor(2);

    static Runnable runnable = () -> {

        System.out.println("runnable");
    };

    static Callable<String> callable = () -> {

        return "callable";
    };

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        test3();
    }

    /**
     * schedule Runnable
     */
    public static void test1() {
        scheduledThreadPoolExecutor.schedule(
                runnable
                , 1
                , TimeUnit.SECONDS
        );
    }

    /**
     * schedule Callable
     */
    public static void test2() throws ExecutionException, InterruptedException {
        ScheduledFuture<String> schedule = scheduledThreadPoolExecutor.schedule(
                callable
                , 1
                , TimeUnit.SECONDS
        );
        System.out.println(schedule.get());
    }

    /**
     * scheduleAtFixedRate
     */
    public static void test3() {
        scheduledThreadPoolExecutor.scheduleAtFixedRate(
                runnable
                , 1
                , 1
                , TimeUnit.SECONDS
        );
    }

    /**
     * scheduleWithFixedDelay
     */
    public static void test4() {
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(
                runnable
                , 1
                , 1
                , TimeUnit.SECONDS
        );
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

简单地示例 demoScheduledThreadPoolExecutor 的方法进行演示

总结

JUC 借助 ScheduledExecutorService ScheduledThreadPoolExecutor 等类,提供了对 延时、周期任务 的支持

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/83797
推荐阅读
相关标签
  

闽ICP备14008679号