赞
踩
JUC 提供了相关的类以支持对 延时任务 和 周期任务 的支持
功能类似于 java.util.Timer,
但官方推荐使用功能更为强大全面的
ScheduledThreadPoolExecutor,
因为后者支持多任务作业,即线程池
ScheduledExecutorService:拓展了 ExecutorService 接口,提供了任务的 延时执行、周期执行 方法
ScheduledThreadPoolExecutor:拓展了通用线程池 ThreadPoolExecutor,额外提供了任务的 延时执行、周期执行 等方法
关于 ThreadPoolExecutor 介绍的传送门
线程池浅析 ThreadPoolExecutor FutureTask
public interface ScheduledExecutorService extends ExecutorService { /** * 提交一个延时非周期任务, * 返回的 ScheduledFuture 其 get 方法返回 null */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * 提交一个延时非周期任务, * 返回的 ScheduledFuture 可以调用 get 方法返回执行结果 * cancel 方法可以取消任务(在任务执行完成前) */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 在指定延时后开始周期执行, * 后一个任务的执行时机在前一个任务开始执行后算起 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 在指定延时后开始周期执行, * 后一个任务的执行时机在前一个任务执行结束后算起 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
ScheduledExecutorService 拓展了 schedule
方法以支持对 Runnable 和 Callable 的延时执行,结果都返回一个 ScheduledFuture,当然前者无法返回结果(调用 get
方法返回 null
)
拓展了 scheduleAtFixedRate
和 scheduleWithFixedDelay
支持在任务在延时指定时间后 周期执行,直到任务被 cancel
或 发生异常
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
拓展了 ThreadPoolExecutor(线程池),同时实现了 ScheduledExecutorService
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler); }
允许指定 corePoolSize
:核心线程数, threadFactory
:线程创建工厂、 handler
:任务拒绝策略
// 0 延时 schedule public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } // 0 延时 schedule public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } // 0 延时 schedule,将 Runnable 包装成 Callable public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, NANOSECONDS); } // 0 延时 schedule public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); }
对常规(指继承自 ThreadPoolExecutor 的)方法的实现都是委托给 schedule
方法,即 0
延时的 schedule
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); /** * 将任务包装成 ScheduledFutureTask */ RunnableScheduledFuture<Void> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit), sequencer.getAndIncrement())); // 执行任务(ScheduledFutureTask) delayedExecute(t); return t; }
在该方法中,将 Runnable 包装成对应的 ScheduledFutureTask 交由 delayedExecute
方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); /** * 钩子方法 * 允许我们自定义 ScheduledFutureTask */ RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
将任务包装成对应的 ScheduledFutureTask,并提供 钩子方法 decorateTask
对 ScheduledFutureTask 进行再“装饰”,最终也是委托给 delayedExecute
方法
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
雷同,最终也是交给 delayedExecute
方法执行,delayedExecute
方法会把任务交给 线程池 来执行,具体的执行逻辑见 ScheduledFutureTask#run
public void run() { // 判断是否 cnacel if (!canRunInCurrentRunState(this)) cancel(false); // 是否周期调用,非周期则直接 super.run() else if (!isPeriodic()) super.run(); // 周期则进行下一波准备与调度 else if (super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } --------------- reExecutePeriodic --------------- // 重新加入队列 void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(task)) { super.getQueue().add(task); if (canRunInCurrentRunState(task) || !remove(task)) { ensurePrestart(); return; } } task.cancel(false); }
对于 非周期 的任务,则直接调用 super.run()
对于 周期 任务,则会继续下一次的调用,实现 周期调用
public class ScheduledThreadPoolTest { static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2); static Runnable runnable = () -> { System.out.println("runnable"); }; static Callable<String> callable = () -> { return "callable"; }; public static void main(String[] args) throws ExecutionException, InterruptedException { test3(); } /** * schedule Runnable */ public static void test1() { scheduledThreadPoolExecutor.schedule( runnable , 1 , TimeUnit.SECONDS ); } /** * schedule Callable */ public static void test2() throws ExecutionException, InterruptedException { ScheduledFuture<String> schedule = scheduledThreadPoolExecutor.schedule( callable , 1 , TimeUnit.SECONDS ); System.out.println(schedule.get()); } /** * scheduleAtFixedRate */ public static void test3() { scheduledThreadPoolExecutor.scheduleAtFixedRate( runnable , 1 , 1 , TimeUnit.SECONDS ); } /** * scheduleWithFixedDelay */ public static void test4() { scheduledThreadPoolExecutor.scheduleWithFixedDelay( runnable , 1 , 1 , TimeUnit.SECONDS ); } }
简单地示例 demo
对 ScheduledThreadPoolExecutor 的方法进行演示
JUC 借助 ScheduledExecutorService ScheduledThreadPoolExecutor 等类,提供了对 延时、周期任务 的支持
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。