Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),







java.util.concurrent.ThreadPoolExecutor 类是线程池中最核心的类之一,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. .....
  3. public ThreadPoolExecutor(int corePoolSize,
  4. int maximumPoolSize,
  5. long keepAliveTime,
  6. TimeUnit unit,
  7. BlockingQueue<Runnable> workQueue) {
  8. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  9. Executors.defaultThreadFactory(), defaultHandler);
  10. }
  11. public ThreadPoolExecutor(int corePoolSize,
  12. int maximumPoolSize,
  13. long keepAliveTime,
  14. TimeUnit unit,
  15. BlockingQueue<Runnable> workQueue,
  16. ThreadFactory threadFactory) {
  17. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  18. threadFactory, defaultHandler);
  19. }
  20. public ThreadPoolExecutor(int corePoolSize,
  21. int maximumPoolSize,
  22. long keepAliveTime,
  23. TimeUnit unit,
  24. BlockingQueue<Runnable> workQueue,
  25. RejectedExecutionHandler handler) {
  26. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  27. Executors.defaultThreadFactory(), handler);
  28. }
  29. public ThreadPoolExecutor(int corePoolSize,
  30. int maximumPoolSize,
  31. long keepAliveTime,
  32. TimeUnit unit,
  33. BlockingQueue<Runnable> workQueue,
  34. ThreadFactory threadFactory,
  35. RejectedExecutionHandler handler) {
  36. //代码省略
  37. }
  38. ...
  39. }






① 将maximumPoolSize设置为Integer.MAX_VALUE(线程数不可能达到这个值),CachedThreadPool就是这么做的;

② 使用无限容量的阻塞队列(比如LinkedBlockingQueue),所有处理不过来的任务全部排队去,FixedThreadPool就是这么做的。



  1. TimeUnit.DAYS; //
  2. TimeUnit.HOURS; //小时
  3. TimeUnit.MINUTES; //分钟
  4. TimeUnit.SECONDS; //
  5. TimeUnit.MILLISECONDS; //毫秒
  6. TimeUnit.MICROSECONDS; //微妙
  7. TimeUnit.NANOSECONDS; //纳秒



  1. ArrayBlockingQueue // 数组实现的阻塞队列,数组不支持自动扩容。所以当阻塞队列已满
  2. // 线程池会根据handler参数中指定的拒绝任务的策略决定如何处理后面加入的任务
  3. LinkedBlockingQueue // 链表实现的阻塞队列,默认容量Integer.MAX_VALUE(不限容),
  4. // 当然也可以通过构造方法限制容量
  5. SynchronousQueue // 零容量的同步阻塞队列,添加任务直到有线程接受该任务才返回
  6. // 用于实现生产者与消费者的同步,所以被叫做同步队列
  7. PriorityBlockingQueue // 二叉堆实现的优先级阻塞队列
  8. DelayQueue // 延时阻塞队列,该队列中的元素需要实现Delayed接口
  9. // 底层使用PriorityQueue的二叉堆对Delayed元素排序
  10. // ScheduledThreadPoolExecutor底层就用了DelayQueue的变体"DelayWorkQueue"
  11. // 队列中所有的任务都会封装成ScheduledFutureTask对象(该类已实现Delayed接口)



  1. ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。
  2. ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务




  1. // runState is stored in the high-order bits
  2. private static final int RUNNING = -1 << COUNT_BITS;
  3. private static final int SHUTDOWN = 0 << COUNT_BITS;
  4. private static final int STOP = 1 << COUNT_BITS;
  5. private static final int TIDYING = 2 << COUNT_BITS;
  6. private static final int TERMINATED = 3 << COUNT_BITS;




execute(Runnable command):定义在Executor接口中







  1. // 创建固定线程数量的线程池
  2. public static ExecutorService newFixedThreadPool();
  3. // 创建单个线程的线程池(本质上就是容量为1的FixedThreadPool)
  4. public static ExecutorService newSingleThreadExecutor();
  5. // 创建无数量限制可自动增减线程的线程池
  6. public static ExecutorService newCachedThreadPool();
  7. // 创建(可计划的)任务延时执行线程池
  8. public static ScheduledExecutorService newScheduledThreadPool();
  9. // 单线程版的任务计划执行的线程池
  10. public static ScheduledExecutorService newSingleThreadScheduledExecutor();



public static ExecutorService newFixedThreadPool(int nThreads)



  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. //使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
  5. new LinkedBlockingQueue<Runnable>());
  6. }

 public static ExecutorService newSingleThreadExecutor()



  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. //corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
  4. (new ThreadPoolExecutor(1, 1,
  5. 0L, TimeUnit.MILLISECONDS,
  6. new LinkedBlockingQueue<Runnable>()));
  7. }

public static Sc​​heduledExecutorService newScheduledThreadPool(int corePoolSize)



  1. Executor executor = Executors.newFixedThreadPool(10);
  2. Runnable task = new Runnable() {
  3. @Override
  4. public void run() {
  5. System.out.println("task over");
  6. }
  7. };
  8. executor.execute(task);
  9. executor = Executors.newScheduledThreadPool(10);
  10. ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
  11. scheduler.scheduleAtFixedRate(task, 10, 10, TimeUnit.SECONDS);


  1. public class HeartBeat {
  2. public static void main(String[] args) {
  3. ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
  4. Runnable task = new Runnable() {
  5. public void run() {
  6. System.out.println("HeartBeat.........................");
  7. }
  8. };
  9. executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS); //5秒后第一次执行,之后每隔3秒执行一次
  10. }
  11. }


  1. HeartBeat....................... //5秒后第一次输出
  2. HeartBeat....................... //每隔3秒输出一个

public static ExecutorService newCachedThreadPool()

创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)


  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. //使用同步队列,将任务直接提交给线程
  5. new SynchronousQueue<Runnable>());
  6. }
  1. public class ThreadPoolTest {
  2. public static void main(String[] args) throws InterruptedException {
  3. ExecutorService threadPool = Executors.newCachedThreadPool();//线程池里面的线程数会动态变化,并可在线程线被移除前重用
  4. for (int i = 1; i <= 3; i ++) {
  5. final int task = i; //10个任务
  6. //TimeUnit.SECONDS.sleep(1);
  7. threadPool.execute(new Runnable() { //接受一个Runnable实例
  8. public void run() {
  9. System.out.println("线程名字: " + Thread.currentThread().getName() + " 任务名为: "+task);
  10. }
  11. });
  12. }
  13. }
  14. }


  1. 线程名字: pool-1-thread-1 任务名为: 1
  2. 线程名字: pool-1-thread-2 任务名为: 2
  3. 线程名字: pool-1-thread-3 任务名为: 3


  1. 线程名字: pool-1-thread-1 任务名为: 1
  2. 线程名字: pool-1-thread-1 任务名为: 2
  3. 线程名字: pool-1-thread-1 任务名为: 3

通过使用Executor可以很轻易的实现各种调优 管理 监视 记录日志和错误报告等待。



ExecutorService提供了管理Eecutor生命周期的方法,ExecutorService的生命周期包括了:运行 关闭和终止三种状态。





如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。

  1. ExecutorService executorService = (ExecutorService) executor;
  2. while (!executorService.isShutdown()) {
  3. try {
  4. executorService.execute(task);
  5. } catch (RejectedExecutionException ignored) {
  6. }
  7. }
  8. executorService.shutdown();



  1. public class CallableAndFuture {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. ExecutorService executor = Executors.newSingleThreadExecutor();
  4. Future<String> future = executor.submit(new Callable<String>() { //接受一上callable实例
  5. public String call() throws Exception {
  6. return "MOBIN";
  7. }
  8. });
  9. System.out.println("任务的执行结果:"+future.get());
  10. }
  11. }







  1. private volatile int state;
  2. private static final int NEW = 0;// 任务已被创建(new的时候默认状态为0)
  3. private static final int COMPLETING = 1;// 任务即将完成(已获取返回值或已捕获异常)
  4. private static final int NORMAL = 2;// 任务正常完成(以返回值的形式完成任务)
  5. private static final int EXCEPTIONAL = 3;// 任务异常完成(任务执行过程发生异常并被捕获)
  6. private static final int CANCELLED = 4;// 任务已被取消(任务还没被执行就被取消了,可能在排队)
  7. private static final int INTERRUPTING = 5;// 任务正在中断(任务执行时被取消)
  8. private static final int INTERRUPTED = 6;// 任务已经中断(INTERRUPTING的下一个状态)



1、 应用执行完成(主线程以及其他非守护线程执行完)后自动关闭的线程池

  1. /* 执行完成后,等待terminationTimeout后关闭线程池 */
  2. public static ExecutorService getExitingExecutorService(
  3. ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit);
  4. /* 默认延迟120秒 */
  5. public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor);
  6. /* 对ScheduledExecutorService的包装 */
  7. public static ScheduledExecutorService getExitingScheduledExecutorService(
  8. ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit);
  9. public static ScheduledExecutorService getExitingScheduledExecutorService(
  10. ScheduledThreadPoolExecutor executor)


  1. addShutdownHook(
  2. MoreExecutors.newThread(
  3. "DelayedShutdownHook-for-" + service,
  4. new Runnable() {
  5. @Override
  6. public void run() {
  7. try {
  8. service.shutdown();
  9. service.awaitTermination(terminationTimeout, timeUnit);
  10. } catch (InterruptedException ignored) { }
  11. }
  12. }));


  1. ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
  2. ExecutorService executorService = MoreExecutors.getExitingExecutorService(
  3. executor, 100, TimeUnit.MILLISECONDS);
  4. executorService.submit(() -> { while (true) {} }); // 执行一个无限期的任务





  1. // 枚举单例
  2. Executor executor = MoreExecutors.directExecutor();
  3. // 每次都会创建一个新的对象
  4. ExecutorService executor = MoreExecutors.newDirectExecutorService();




  1. public static ListeningExecutorService listeningDecorator(ExecutorService delegate);
  2. public static ListeningScheduledExecutorService listeningDecorator(ScheduledExecutorService delegate)


  1. ExecutorService delegate = Executors.newCachedThreadPool();
  2. // 包装成Guava的ListeningExecutorService
  3. ListeningExecutorService executor = MoreExecutors.listeningDecorator(delegate);
  4. // 提交有返回结果的任务
  5. final ListenableFuture future = executor.submit(new Callable<Integer>() {
  6. public Integer call() throws Exception {
  7. int result = 0;
  8. Thread.sleep(1000);
  9. return result;
  10. }
  11. });
  12. future.addListener(new Runable() {
  13. public void run() {
  14. System.out.println("result:" + future.get());
  15. }
  16. }, MoreExecutors.directExecutor());
  17. // Futures工具类提供了工具方法用于任务正常或异常情况的处理。
  18. Futures.addCallback(future, new FutureCallback<Integer>() {
  19. public void onSuccess(Integer result) {
  20. // 任务正常返回结果
  21. System.out.println("result:" + result);
  22. }
  23. public void onFailure(Throwable t) {
  24. // 任务抛异常了
  25. t.printStackTrace();
  26. }
  27. }, MoreExecutors.directExecutor());




  1. public class ConcurrentCalculator2 {
  2. private ExecutorService exec;
  3. private CompletionService<Long> completionService;
  4. private int cpuCoreNumber;
  5. // 内部类
  6. class SumCalculator implements Callable<Long> {
  7. ......
  8. }
  9. public ConcurrentCalculator2() {
  10. cpuCoreNumber = Runtime.getRuntime().availableProcessors();
  11. exec = Executors.newFixedThreadPool(cpuCoreNumber);
  12. completionService = new ExecutorCompletionService<Long>(exec);
  13. }
  14. public Long sum(final int[] numbers) {
  15. // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
  16. for (int i = 0; i < cpuCoreNumber; i++) {
  17. int increment = numbers.length / cpuCoreNumber + 1;
  18. int start = increment * i;
  19. int end = increment * i + increment;
  20. if (end > numbers.length)
  21. end = numbers.length;
  22. SumCalculator subCalc = new SumCalculator(numbers, start, end);
  23. if (!exec.isShutdown()) {
  24. completionService.submit(subCalc);
  25. }
  26. }
  27. return getResult();
  28. }
  29. /**
  30. * 迭代每个只任务,获得部分和,相加返回
  31. *
  32. * @return
  33. */
  34. public Long getResult() {
  35. Long result = 0l;
  36. for (int i = 0; i < cpuCoreNumber; i++) {
  37. try {
  38. Long subSum = completionService.take().get();
  39. result += subSum;
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. } catch (ExecutionException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. return result;
  47. }
  48. public void close() {
  49. exec.shutdown();
  50. }
  51. }



  1. public class CompletionServiceTest {
  2. public static void main(String[] args) throws InterruptedException, ExecutionException {
  3. ExecutorService executor = Executors.newFixedThreadPool(10); //创建含10.条线程的线程池
  4. CompletionService completionService = new ExecutorCompletionService(executor);
  5. for (int i =1; i <=10; i ++) {
  6. final int result = i;
  7. completionService.submit(new Callable() {
  8. public Object call() throws Exception {
  9. Thread.sleep(new Random().nextInt(5000)); //让当前线程随机休眠一段时间
  10. return result;
  11. }
  12. });
  13. }
  14. System.out.println(completionService.take().get()); //获取执行结果
  15. }
  16. }






  1. public class ProducerAndConsumer {
  2. public static void main(String[] args){
  3. try{
  4. BlockingQueue queue = new LinkedBlockingQueue(5);
  5. ExecutorService executor = Executors.newFixedThreadPool(5);
  6. Produer producer = new Produer(queue);
  7. for(int i=0;i<3;i++){
  8. executor.execute(producer);
  9. }
  10. executor.execute(new Consumer(queue));
  11. executor.shutdown();
  12. }catch (Exception e){
  13. e.printStackTrace();
  14. }
  15. }
  16. }
  1. class Produer implements Runnable{
  2. private BlockingQueue queue;
  3. private int nums = 20; //循环次数
  4. //标记数据编号
  5. private static volatile AtomicInteger count = new AtomicInteger();
  6. private boolean isRunning = true;
  7. public Produer(){}
  8. public Produer(BlockingQueue queue){
  9. this.queue = queue;
  10. }
  11. public void run() {
  12. String data = null;
  13. try{
  14. System.out.println("开始生产数据");
  15. System.out.println("-----------------------");
  16. while(nums>0){
  17. nums--;
  18. count.decrementAndGet();
  19. Thread.sleep(500);
  20. System.out.println(Thread.currentThread().getId()+ " :生产者生产了一个数据");
  21. queue.put(count.getAndIncrement());
  22. }
  23. }catch(Exception e){
  24. e.printStackTrace();
  25. Thread.currentThread().interrupt();
  26. }finally{
  27. System.out.println("生产者线程退出!");
  28. }
  29. }
  30. }
  1. class Consumer implements Runnable{
  2. private BlockingQueue queue;
  3. private int nums = 20;
  4. private boolean isRunning = true;
  5. public Consumer(){}
  6. public Consumer(BlockingQueue queue){
  7. this.queue = queue;
  8. }
  9. public void run() {
  10. System.out.println("消费者开始消费");
  11. System.out.println("-------------------------");
  12. while(nums>0){
  13. nums--;
  14. try{
  15. while(isRunning){
  16. int data = (Integer)queue.take();
  17. Thread.sleep(500);
  18. System.out.println("消费者消费的数据是" + data);
  19. }
  20. }catch(Exception e){
  21. e.printStackTrace();
  22. Thread.currentThread().interrupt();
  23. }finally {
  24. System.out.println("消费者线程退出!");
  25. }
  26. }
  27. }
  28. }


  1. import java.text.SimpleDateFormat;
  2. import java.util.Date;
  3. import java.util.HashMap;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import org.quartz.Job;
  7. import org.quartz.JobDataMap;
  8. import org.quartz.JobExecutionContext;
  9. import org.quartz.JobExecutionException;
  10. @SuppressWarnings({"unchecked","unused"})
  11. public abstract class ProQuartzJob implements Job {
  12. private ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  13. @Override
  14. public void execute(JobExecutionContext arg0) throws JobExecutionException {
  15. JobDataMap jobDataMap = arg0.getJobDetail().getJobDataMap();
  16. final HashMap<String, Object> ibean = (HashMap<String, Object>) jobDataMap.get("DATA_BEAN");
  17. final JobManager jobManager = (JobManager) ibean.get("CUR_OBJ");
  18. final JobLogMapper jobLogMapper = (JobLogMapper) jobDataMap.get("jobLogMapper");
  19. cachedThreadPool.execute(new Runnable() {
  20. @Override
  21. public void run() {
  22. if(jobManager != null) {
  23. JobLog jobLog = new JobLog();
  24. long startTime = System.currentTimeMillis();
  25. try {
  26. //任务开始时间
  27. SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  28. Date date = new Date();
  29. jobLog.setCreateTime(sd.format(date));
  30. jobLog.setStatus(String.valueOf(Constant.SUCCESS));
  31. execute(ibean);
  32. } catch (Exception e) {
  33. jobLog.setStatus(String.valueOf(Constant.FAILED));
  34. e.printStackTrace();
  35. }
  36. }else {
  37. execute(ibean);
  38. }
  39. }
  40. });
  41. }
  42. public abstract void execute(HashMap<String, Object> ibean);
  43. }
  1. public class PipOperationJob extends ProQuartzJob{
  2. @Override
  3. public void execute(final HashMap<String, Object> arg0) {
  4. WorkManager.addJobByPool(arg0);
  5. }
  6. }
  1. import java.text.MessageFormat;
  2. import java.util.Date;
  3. import java.util.HashMap;
  4. import java.util.concurrent.Executor;
  5. import java.util.concurrent.LinkedBlockingQueue;
  6. import java.util.concurrent.ThreadPoolExecutor;
  7. import java.util.concurrent.TimeUnit;
  8. import org.apache.log4j.Logger;
  9. public class WorkManager {
  10. private static Logger logger = Logger.getLogger(WorkManager.class);
  11. // private static ExecutorService pool = Executors.newFixedThreadPool(2);
  12. private static int queueSize = 10;
  13. private static LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize);
  14. private static Executor pool = new ThreadPoolExecutor(2, queueSize, 600, TimeUnit.SECONDS, queue);
  15. public Executor getThreadPool() {
  16. return pool;
  17. }
  18. public static void addJobByPool(final HashMap<String, Object> arg0) {
  19. if(queue.size() <= 9) {
  20. pool.execute(new Runnable() {
  21. @Override
  22. public void run() {
  23. Date now = new Date();
  24. JobManager jobManager = (JobManager) arg0.get("CUR_OBJ");
  25. /**
  26. * 任务调度获取表名
  27. */
  28. String ruleCode = jobManager.getTaskName();
  29. logger.info(MessageFormat.format("====sys worker {0} syn date start time:{1}====", ruleCode,
  30. DateUtil.formatTime(now)));
  31. /**
  32. *
  33. */
  34. new PipOperationWorker(ruleCode).synData();
  35. Date end = new Date();
  36. long haoshi = (now.getTime() - end.getTime()) / 1000;
  37. logger.info(MessageFormat.format("====sys worker {0} syn date end time:{1}==haoshi:{2}==", ruleCode,
  38. DateUtil.formatTime(end), haoshi));
  39. }
  40. });
  41. }
  42. }
  43. }



Modifier and TypeMethod and Description
booleanawaitTermination(long timeout, TimeUnit unit)


<T> List<Future<T>>invokeAll(Collection<? extends Callable<T>> tasks)


<T> List<Future<T>>invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)


<T> TinvokeAny(Collection<? extends Callable<T>> tasks)


<T> TinvokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)



如果此执行者已关闭,则返回 true


如果所有任务在关闭后完成,则返回 true





<T> Future<T>submit(Callable<T> task)


Future<?>submit(Runnable task)


<T> Future<T>submit(Runnable task, T result)


  1. package java.util.concurrent;
  2. import java.util.List;
  3. import java.util.Collection;
  4. public interface ExecutorService extends Executor {
  5. void shutdown();
  6. List<Runnable> shutdownNow();
  7. boolean isShutdown();
  8. boolean isTerminated();
  9. boolean awaitTermination(long timeout, TimeUnit unit)
  10. throws InterruptedException;
  11. <T> Future<T> submit(Callable<T> task);
  12. <T> Future<T> submit(Runnable task, T result);
  13. Future<?> submit(Runnable task);
  14. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  15. throws InterruptedException;
  16. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  17. long timeout, TimeUnit unit)
  18. throws InterruptedException;
  19. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  20. throws InterruptedException, ExecutionException;
  21. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  22. long timeout, TimeUnit unit)
  23. throws InterruptedException, ExecutionException, TimeoutException;
  24. }
Modifier and TypeMethod and Description
voidexecute(Runnable command)


  1. package java.util.concurrent;
  2. public interface Executor {
  3. void execute(Runnable command);
  4. }
Modifier and TypeMethod and Description
static Callable<Object>callable(PrivilegedAction<?> action)


static Callable<Object>callable(PrivilegedExceptionAction<?> action)


static Callable<Object>callable(Runnable task)


static <T> Callable<T>callable(Runnable task, T result)


static ThreadFactorydefaultThreadFactory()


static ExecutorServicenewCachedThreadPool()


static ExecutorServicenewCachedThreadPool(ThreadFactory threadFactory)


static ExecutorServicenewFixedThreadPool(int nThreads)


static ExecutorServicenewFixedThreadPool(int nThreads, ThreadFactory threadFactory)


static ScheduledExecutorServicenewScheduledThreadPool(int corePoolSize)


static ScheduledExecutorServicenewScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)


static ExecutorServicenewSingleThreadExecutor()


static ExecutorServicenewSingleThreadExecutor(ThreadFactory threadFactory)


static ScheduledExecutorServicenewSingleThreadScheduledExecutor()


static ScheduledExecutorServicenewSingleThreadScheduledExecutor(ThreadFactory threadFactory)


static ExecutorServicenewWorkStealingPool()

创建使用所有available processors作为其目标并行级别的工作窃取线程池。

static ExecutorServicenewWorkStealingPool(int parallelism)


static <T> Callable<T>privilegedCallable(Callable<T> callable)


static <T> Callable<T>privilegedCallableUsingCurrentClassLoader(Callable<T> callable)


static ThreadFactoryprivilegedThreadFactory()


static ExecutorServiceunconfigurableExecutorService(ExecutorService executor)


static ScheduledExecutorServiceunconfigurableScheduledExecutorService(ScheduledExecutorService executor)


  1. package java.util.concurrent;
  2. import java.util.*;
  3. import java.util.concurrent.atomic.AtomicInteger;
  4. import java.security.AccessControlContext;
  5. import java.security.AccessController;
  6. import java.security.PrivilegedAction;
  7. import java.security.PrivilegedExceptionAction;
  8. import java.security.PrivilegedActionException;
  9. import java.security.AccessControlException;
  10. import sun.security.util.SecurityConstants;
  11. public class Executors {
  12. public static ExecutorService newFixedThreadPool(int nThreads) {
  13. return new ThreadPoolExecutor(nThreads, nThreads,
  14. 0L, TimeUnit.MILLISECONDS,
  15. new LinkedBlockingQueue<Runnable>());
  16. }
  17. public static ExecutorService newWorkStealingPool(int parallelism) {
  18. return new ForkJoinPool
  19. (parallelism,
  20. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  21. null, true);
  22. }
  23. public static ExecutorService newWorkStealingPool() {
  24. return new ForkJoinPool
  25. (Runtime.getRuntime().availableProcessors(),
  26. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  27. null, true);
  28. }
  29. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  30. return new ThreadPoolExecutor(nThreads, nThreads,
  31. 0L, TimeUnit.MILLISECONDS,
  32. new LinkedBlockingQueue<Runnable>(),
  33. threadFactory);
  34. }
  35. public static ExecutorService newSingleThreadExecutor() {
  36. return new FinalizableDelegatedExecutorService
  37. (new ThreadPoolExecutor(1, 1,
  38. 0L, TimeUnit.MILLISECONDS,
  39. new LinkedBlockingQueue<Runnable>()));
  40. }
  41. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  42. return new FinalizableDelegatedExecutorService
  43. (new ThreadPoolExecutor(1, 1,
  44. 0L, TimeUnit.MILLISECONDS,
  45. new LinkedBlockingQueue<Runnable>(),
  46. threadFactory));
  47. }
  48. public static ExecutorService newCachedThreadPool() {
  49. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  50. 60L, TimeUnit.SECONDS,
  51. new SynchronousQueue<Runnable>());
  52. }
  53. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  54. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  55. 60L, TimeUnit.SECONDS,
  56. new SynchronousQueue<Runnable>(),
  57. threadFactory);
  58. }
  59. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  60. return new DelegatedScheduledExecutorService
  61. (new ScheduledThreadPoolExecutor(1));
  62. }
  63. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
  64. return new DelegatedScheduledExecutorService
  65. (new ScheduledThreadPoolExecutor(1, threadFactory));
  66. }
  67. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  68. return new ScheduledThreadPoolExecutor(corePoolSize);
  69. }
  70. public static ScheduledExecutorService newScheduledThreadPool(
  71. int corePoolSize, ThreadFactory threadFactory) {
  72. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  73. }
  74. public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
  75. if (executor == null)
  76. throw new NullPointerException();
  77. return new DelegatedExecutorService(executor);
  78. }
  79. public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
  80. if (executor == null)
  81. throw new NullPointerException();
  82. return new DelegatedScheduledExecutorService(executor);
  83. }
  84. public static ThreadFactory defaultThreadFactory() {
  85. return new DefaultThreadFactory();
  86. }
  87. public static ThreadFactory privilegedThreadFactory() {
  88. return new PrivilegedThreadFactory();
  89. }
  90. public static <T> Callable<T> callable(Runnable task, T result) {
  91. if (task == null)
  92. throw new NullPointerException();
  93. return new RunnableAdapter<T>(task, result);
  94. }
  95. public static Callable<Object> callable(Runnable task) {
  96. if (task == null)
  97. throw new NullPointerException();
  98. return new RunnableAdapter<Object>(task, null);
  99. }
  100. public static Callable<Object> callable(final PrivilegedAction<?> action) {
  101. if (action == null)
  102. throw new NullPointerException();
  103. return new Callable<Object>() {
  104. public Object call() { return action.run(); }};
  105. }
  106. public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
  107. if (action == null)
  108. throw new NullPointerException();
  109. return new Callable<Object>() {
  110. public Object call() throws Exception { return action.run(); }};
  111. }
  112. public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
  113. if (callable == null)
  114. throw new NullPointerException();
  115. return new PrivilegedCallable<T>(callable);
  116. }
  117. public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
  118. if (callable == null)
  119. throw new NullPointerException();
  120. return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
  121. }
  122. // Non-public classes supporting the public methods
  123. static final class RunnableAdapter<T> implements Callable<T> {
  124. final Runnable task;
  125. final T result;
  126. RunnableAdapter(Runnable task, T result) {
  127. this.task = task;
  128. this.result = result;
  129. }
  130. public T call() {
  131. task.run();
  132. return result;
  133. }
  134. }
  135. static final class PrivilegedCallable<T> implements Callable<T> {
  136. private final Callable<T> task;
  137. private final AccessControlContext acc;
  138. PrivilegedCallable(Callable<T> task) {
  139. this.task = task;
  140. this.acc = AccessController.getContext();
  141. }
  142. public T call() throws Exception {
  143. try {
  144. return AccessController.doPrivileged(
  145. new PrivilegedExceptionAction<T>() {
  146. public T run() throws Exception {
  147. return task.call();
  148. }
  149. }, acc);
  150. } catch (PrivilegedActionException e) {
  151. throw e.getException();
  152. }
  153. }
  154. }
  155. static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
  156. private final Callable<T> task;
  157. private final AccessControlContext acc;
  158. private final ClassLoader ccl;
  159. PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
  160. SecurityManager sm = System.getSecurityManager();
  161. if (sm != null) {
  162. // Calls to getContextClassLoader from this class
  163. // never trigger a security check, but we check
  164. // whether our callers have this permission anyways.
  165. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
  166. // Whether setContextClassLoader turns out to be necessary
  167. // or not, we fail fast if permission is not available.
  168. sm.checkPermission(new RuntimePermission("setContextClassLoader"));
  169. }
  170. this.task = task;
  171. this.acc = AccessController.getContext();
  172. this.ccl = Thread.currentThread().getContextClassLoader();
  173. }
  174. public T call() throws Exception {
  175. try {
  176. return AccessController.doPrivileged(
  177. new PrivilegedExceptionAction<T>() {
  178. public T run() throws Exception {
  179. Thread t = Thread.currentThread();
  180. ClassLoader cl = t.getContextClassLoader();
  181. if (ccl == cl) {
  182. return task.call();
  183. } else {
  184. t.setContextClassLoader(ccl);
  185. try {
  186. return task.call();
  187. } finally {
  188. t.setContextClassLoader(cl);
  189. }
  190. }
  191. }
  192. }, acc);
  193. } catch (PrivilegedActionException e) {
  194. throw e.getException();
  195. }
  196. }
  197. }
  198. static class DefaultThreadFactory implements ThreadFactory {
  199. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  200. private final ThreadGroup group;
  201. private final AtomicInteger threadNumber = new AtomicInteger(1);
  202. private final String namePrefix;
  203. DefaultThreadFactory() {
  204. SecurityManager s = System.getSecurityManager();
  205. group = (s != null) ? s.getThreadGroup() :
  206. Thread.currentThread().getThreadGroup();
  207. namePrefix = "pool-" +
  208. poolNumber.getAndIncrement() +
  209. "-thread-";
  210. }
  211. public Thread newThread(Runnable r) {
  212. Thread t = new Thread(group, r,
  213. namePrefix + threadNumber.getAndIncrement(),
  214. 0);
  215. if (t.isDaemon())
  216. t.setDaemon(false);
  217. if (t.getPriority() != Thread.NORM_PRIORITY)
  218. t.setPriority(Thread.NORM_PRIORITY);
  219. return t;
  220. }
  221. }
  222. static class PrivilegedThreadFactory extends DefaultThreadFactory {
  223. private final AccessControlContext acc;
  224. private final ClassLoader ccl;
  225. PrivilegedThreadFactory() {
  226. super();
  227. SecurityManager sm = System.getSecurityManager();
  228. if (sm != null) {
  229. // Calls to getContextClassLoader from this class
  230. // never trigger a security check, but we check
  231. // whether our callers have this permission anyways.
  232. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
  233. // Fail fast
  234. sm.checkPermission(new RuntimePermission("setContextClassLoader"));
  235. }
  236. this.acc = AccessController.getContext();
  237. this.ccl = Thread.currentThread().getContextClassLoader();
  238. }
  239. public Thread newThread(final Runnable r) {
  240. return super.newThread(new Runnable() {
  241. public void run() {
  242. AccessController.doPrivileged(new PrivilegedAction<Void>() {
  243. public Void run() {
  244. Thread.currentThread().setContextClassLoader(ccl);
  245. r.run();
  246. return null;
  247. }
  248. }, acc);
  249. }
  250. });
  251. }
  252. }
  253. static class DelegatedExecutorService extends AbstractExecutorService {
  254. private final ExecutorService e;
  255. DelegatedExecutorService(ExecutorService executor) { e = executor; }
  256. public void execute(Runnable command) { e.execute(command); }
  257. public void shutdown() { e.shutdown(); }
  258. public List<Runnable> shutdownNow() { return e.shutdownNow(); }
  259. public boolean isShutdown() { return e.isShutdown(); }
  260. public boolean isTerminated() { return e.isTerminated(); }
  261. public boolean awaitTermination(long timeout, TimeUnit unit)
  262. throws InterruptedException {
  263. return e.awaitTermination(timeout, unit);
  264. }
  265. public Future<?> submit(Runnable task) {
  266. return e.submit(task);
  267. }
  268. public <T> Future<T> submit(Callable<T> task) {
  269. return e.submit(task);
  270. }
  271. public <T> Future<T> submit(Runnable task, T result) {
  272. return e.submit(task, result);
  273. }
  274. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  275. throws InterruptedException {
  276. return e.invokeAll(tasks);
  277. }
  278. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  279. long timeout, TimeUnit unit)
  280. throws InterruptedException {
  281. return e.invokeAll(tasks, timeout, unit);
  282. }
  283. public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  284. throws InterruptedException, ExecutionException {
  285. return e.invokeAny(tasks);
  286. }
  287. public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  288. long timeout, TimeUnit unit)
  289. throws InterruptedException, ExecutionException, TimeoutException {
  290. return e.invokeAny(tasks, timeout, unit);
  291. }
  292. }
  293. static class FinalizableDelegatedExecutorService
  294. extends DelegatedExecutorService {
  295. FinalizableDelegatedExecutorService(ExecutorService executor) {
  296. super(executor);
  297. }
  298. protected void finalize() {
  299. super.shutdown();
  300. }
  301. }
  302. static class DelegatedScheduledExecutorService
  303. extends DelegatedExecutorService
  304. implements ScheduledExecutorService {
  305. private final ScheduledExecutorService e;
  306. DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
  307. super(executor);
  308. e = executor;
  309. }
  310. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  311. return e.schedule(command, delay, unit);
  312. }
  313. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
  314. return e.schedule(callable, delay, unit);
  315. }
  316. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
  317. return e.scheduleAtFixedRate(command, initialDelay, period, unit);
  318. }
  319. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
  320. return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
  321. }
  322. }
  323. /** Cannot instantiate. */
  324. private Executors() {}
  325. }


Modifier and TypeMethod and Description
<T> List<Future<T>>invokeAll(Collection<? extends Callable<T>> tasks)


<T> List<Future<T>>invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)


<T> TinvokeAny(Collection<? extends Callable<T>> tasks)


<T> TinvokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)


protected <T> RunnableFuture<T>newTaskFor(Callable<T> callable)


protected <T> RunnableFuture<T>newTaskFor(Runnable runnable, T value)


<T> Future<T>submit(Callable<T> task)


Future<?>submit(Runnable task)


<T> Future<T>submit(Runnable task, T result)


  1. package java.util.concurrent;
  2. import java.util.*;
  3. public abstract class AbstractExecutorService implements ExecutorService {
  4. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  5. return new FutureTask<T>(runnable, value);
  6. }
  7. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  8. return new FutureTask<T>(callable);
  9. }
  10. public Future<?> submit(Runnable task) {
  11. if (task == null) throw new NullPointerException();
  12. RunnableFuture<Void> ftask = newTaskFor(task, null);
  13. execute(ftask);
  14. return ftask;
  15. }
  16. public <T> Future<T> submit(Runnable task, T result) {
  17. if (task == null) throw new NullPointerException();
  18. RunnableFuture<T> ftask = newTaskFor(task, result);
  19. execute(ftask);
  20. return ftask;
  21. }
  22. public <T> Future<T> submit(Callable<T> task) {
  23. if (task == null) throw new NullPointerException();
  24. RunnableFuture<T> ftask = newTaskFor(task);
  25. execute(ftask);
  26. return ftask;
  27. }
  28. private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
  29. boolean timed, long nanos)
  30. throws InterruptedException, ExecutionException, TimeoutException {
  31. if (tasks == null)
  32. throw new NullPointerException();
  33. int ntasks = tasks.size();
  34. if (ntasks == 0)
  35. throw new IllegalArgumentException();
  36. ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
  37. ExecutorCompletionService<T> ecs =
  38. new ExecutorCompletionService<T>(this);
  39. // For efficiency, especially in executors with limited
  40. // parallelism, check to see if previously submitted tasks are
  41. // done before submitting more of them. This interleaving
  42. // plus the exception mechanics account for messiness of main
  43. // loop.
  44. try {
  45. // Record exceptions so that if we fail to obtain any
  46. // result, we can throw the last exception we got.
  47. ExecutionException ee = null;
  48. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  49. Iterator<? extends Callable<T>> it = tasks.iterator();
  50. // Start one task for sure; the rest incrementally
  51. futures.add(ecs.submit(it.next()));
  52. --ntasks;
  53. int active = 1;
  54. for (;;) {
  55. Future<T> f = ecs.poll();
  56. if (f == null) {
  57. if (ntasks > 0) {
  58. --ntasks;
  59. futures.add(ecs.submit(it.next()));
  60. ++active;
  61. }
  62. else if (active == 0)
  63. break;
  64. else if (timed) {
  65. f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
  66. if (f == null)
  67. throw new TimeoutException();
  68. nanos = deadline - System.nanoTime();
  69. }
  70. else
  71. f = ecs.take();
  72. }
  73. if (f != null) {
  74. --active;
  75. try {
  76. return f.get();
  77. } catch (ExecutionException eex) {
  78. ee = eex;
  79. } catch (RuntimeException rex) {
  80. ee = new ExecutionException(rex);
  81. }
  82. }
  83. }
  84. if (ee == null)
  85. ee = new ExecutionException();
  86. throw ee;
  87. } finally {
  88. for (int i = 0, size = futures.size(); i < size; i++)
  89. futures.get(i).cancel(true);
  90. }
  91. }
  92. public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  93. throws InterruptedException, ExecutionException {
  94. try {
  95. return doInvokeAny(tasks, false, 0);
  96. } catch (TimeoutException cannotHappen) {
  97. assert false;
  98. return null;
  99. }
  100. }
  101. public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  102. long timeout, TimeUnit unit)
  103. throws InterruptedException, ExecutionException, TimeoutException {
  104. return doInvokeAny(tasks, true, unit.toNanos(timeout));
  105. }
  106. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  107. throws InterruptedException {
  108. if (tasks == null)
  109. throw new NullPointerException();
  110. ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  111. boolean done = false;
  112. try {
  113. for (Callable<T> t : tasks) {
  114. RunnableFuture<T> f = newTaskFor(t);
  115. futures.add(f);
  116. execute(f);
  117. }
  118. for (int i = 0, size = futures.size(); i < size; i++) {
  119. Future<T> f = futures.get(i);
  120. if (!f.isDone()) {
  121. try {
  122. f.get();
  123. } catch (CancellationException ignore) {
  124. } catch (ExecutionException ignore) {
  125. }
  126. }
  127. }
  128. done = true;
  129. return futures;
  130. } finally {
  131. if (!done)
  132. for (int i = 0, size = futures.size(); i < size; i++)
  133. futures.get(i).cancel(true);
  134. }
  135. }
  136. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  137. long timeout, TimeUnit unit)
  138. throws InterruptedException {
  139. if (tasks == null)
  140. throw new NullPointerException();
  141. long nanos = unit.toNanos(timeout);
  142. ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  143. boolean done = false;
  144. try {
  145. for (Callable<T> t : tasks)
  146. futures.add(newTaskFor(t));
  147. final long deadline = System.nanoTime() + nanos;
  148. final int size = futures.size();
  149. // Interleave time checks and calls to execute in case
  150. // executor doesn't have any/much parallelism.
  151. for (int i = 0; i < size; i++) {
  152. execute((Runnable)futures.get(i));
  153. nanos = deadline - System.nanoTime();
  154. if (nanos <= 0L)
  155. return futures;
  156. }
  157. for (int i = 0; i < size; i++) {
  158. Future<T> f = futures.get(i);
  159. if (!f.isDone()) {
  160. if (nanos <= 0L)
  161. return futures;
  162. try {
  163. f.get(nanos, TimeUnit.NANOSECONDS);
  164. } catch (CancellationException ignore) {
  165. } catch (ExecutionException ignore) {
  166. } catch (TimeoutException toe) {
  167. return futures;
  168. }
  169. nanos = deadline - System.nanoTime();
  170. }
  171. }
  172. done = true;
  173. return futures;
  174. } finally {
  175. if (!done)
  176. for (int i = 0, size = futures.size(); i < size; i++)
  177. futures.get(i).cancel(true);
  178. }
  179. }
  180. }


Modifier and TypeMethod and Description
<V> ScheduledFuture<V>schedule(Callable<V> callable, long delay, TimeUnit unit)


ScheduledFuture<?>schedule(Runnable command, long delay, TimeUnit unit)


ScheduledFuture<?>scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

创建并执行在给定的初始延迟之后,随后以给定的时间段首先启用的周期性动作;那就是执行将在initialDelay之后开始,然后是initialDelay+period,然后是initialDelay + 2 * period,等等。

ScheduledFuture<?>scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)


  1. package java.util.concurrent;
  2. public interface ScheduledExecutorService extends ExecutorService {
  3. public ScheduledFuture<?> schedule(Runnable command,
  4. long delay, TimeUnit unit);
  5. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
  6. long delay, TimeUnit unit);
  7. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  8. long initialDelay,
  9. long period,
  10. TimeUnit unit);
  11. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  12. long initialDelay,
  13. long delay,
  14. TimeUnit unit);
  15. }


Modifier and TypeClass and Description
static class ThreadPoolExecutor.AbortPolicy


static class ThreadPoolExecutor.CallerRunsPolicy


static class ThreadPoolExecutor.DiscardOldestPolicy


static class ThreadPoolExecutor.DiscardPolicy


  1. package java.util.concurrent;
  2. import java.security.AccessControlContext;
  3. import java.security.AccessController;
  4. import java.security.PrivilegedAction;
  5. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  6. import java.util.concurrent.locks.Condition;
  7. import java.util.concurrent.locks.ReentrantLock;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. import java.util.*;
  10. public class ThreadPoolExecutor extends AbstractExecutorService {
  11. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  12. private static final int COUNT_BITS = Integer.SIZE - 3;
  13. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  14. // runState is stored in the high-order bits
  15. private static final int RUNNING = -1 << COUNT_BITS;
  16. private static final int SHUTDOWN = 0 << COUNT_BITS;
  17. private static final int STOP = 1 << COUNT_BITS;
  18. private static final int TIDYING = 2 << COUNT_BITS;
  19. private static final int TERMINATED = 3 << COUNT_BITS;
  20. // Packing and unpacking ctl
  21. private static int runStateOf(int c) { return c & ~CAPACITY; }
  22. private static int workerCountOf(int c) { return c & CAPACITY; }
  23. private static int ctlOf(int rs, int wc) { return rs | wc; }
  24. private static boolean runStateLessThan(int c, int s) {
  25. return c < s;
  26. }
  27. private static boolean runStateAtLeast(int c, int s) {
  28. return c >= s;
  29. }
  30. private static boolean isRunning(int c) {
  31. return c < SHUTDOWN;
  32. }
  33. private boolean compareAndIncrementWorkerCount(int expect) {
  34. return ctl.compareAndSet(expect, expect + 1);
  35. }
  36. private boolean compareAndDecrementWorkerCount(int expect) {
  37. return ctl.compareAndSet(expect, expect - 1);
  38. }
  39. private void decrementWorkerCount() {
  40. do {} while (! compareAndDecrementWorkerCount(ctl.get()));
  41. }
  42. private final BlockingQueue<Runnable> workQueue;
  43. private final ReentrantLock mainLock = new ReentrantLock();
  44. private final HashSet<Worker> workers = new HashSet<Worker>();
  45. private final Condition termination = mainLock.newCondition();
  46. private int largestPoolSize;
  47. private long completedTaskCount;
  48. private volatile ThreadFactory threadFactory;
  49. private volatile RejectedExecutionHandler handler;
  50. private volatile long keepAliveTime;
  51. private volatile boolean allowCoreThreadTimeOut;
  52. private volatile int corePoolSize;
  53. private volatile int maximumPoolSize;
  54. private static final RejectedExecutionHandler defaultHandler =
  55. new AbortPolicy();
  56. private static final RuntimePermission shutdownPerm =
  57. new RuntimePermission("modifyThread");
  58. /* The context to be used when executing the finalizer, or null. */
  59. private final AccessControlContext acc;
  60. private final class Worker
  61. extends AbstractQueuedSynchronizer
  62. implements Runnable
  63. {
  64. /**
  65. * This class will never be serialized, but we provide a
  66. * serialVersionUID to suppress a javac warning.
  67. */
  68. private static final long serialVersionUID = 6138294804551838833L;
  69. /** Thread this worker is running in. Null if factory fails. */
  70. final Thread thread;
  71. /** Initial task to run. Possibly null. */
  72. Runnable firstTask;
  73. /** Per-thread task counter */
  74. volatile long completedTasks;
  75. /**
  76. * Creates with given first task and thread from ThreadFactory.
  77. * @param firstTask the first task (null if none)
  78. */
  79. Worker(Runnable firstTask) {
  80. setState(-1); // inhibit interrupts until runWorker
  81. this.firstTask = firstTask;
  82. this.thread = getThreadFactory().newThread(this);
  83. }
  84. /** Delegates main run loop to outer runWorker */
  85. public void run() {
  86. runWorker(this);
  87. }
  88. // Lock methods
  89. //
  90. // The value 0 represents the unlocked state.
  91. // The value 1 represents the locked state.
  92. protected boolean isHeldExclusively() {
  93. return getState() != 0;
  94. }
  95. protected boolean tryAcquire(int unused) {
  96. if (compareAndSetState(0, 1)) {
  97. setExclusiveOwnerThread(Thread.currentThread());
  98. return true;
  99. }
  100. return false;
  101. }
  102. protected boolean tryRelease(int unused) {
  103. setExclusiveOwnerThread(null);
  104. setState(0);
  105. return true;
  106. }
  107. public void lock() { acquire(1); }
  108. public boolean tryLock() { return tryAcquire(1); }
  109. public void unlock() { release(1); }
  110. public boolean isLocked() { return isHeldExclusively(); }
  111. void interruptIfStarted() {
  112. Thread t;
  113. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  114. try {
  115. t.interrupt();
  116. } catch (SecurityException ignore) {
  117. }
  118. }
  119. }
  120. }
  121. private void advanceRunState(int targetState) {
  122. for (;;) {
  123. int c = ctl.get();
  124. if (runStateAtLeast(c, targetState) ||
  125. ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
  126. break;
  127. }
  128. }
  129. final void tryTerminate() {
  130. for (;;) {
  131. int c = ctl.get();
  132. if (isRunning(c) ||
  133. runStateAtLeast(c, TIDYING) ||
  134. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  135. return;
  136. if (workerCountOf(c) != 0) { // Eligible to terminate
  137. interruptIdleWorkers(ONLY_ONE);
  138. return;
  139. }
  140. final ReentrantLock mainLock = this.mainLock;
  141. mainLock.lock();
  142. try {
  143. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  144. try {
  145. terminated();
  146. } finally {
  147. ctl.set(ctlOf(TERMINATED, 0));
  148. termination.signalAll();
  149. }
  150. return;
  151. }
  152. } finally {
  153. mainLock.unlock();
  154. }
  155. // else retry on failed CAS
  156. }
  157. }
  158. private void checkShutdownAccess() {
  159. SecurityManager security = System.getSecurityManager();
  160. if (security != null) {
  161. security.checkPermission(shutdownPerm);
  162. final ReentrantLock mainLock = this.mainLock;
  163. mainLock.lock();
  164. try {
  165. for (Worker w : workers)
  166. security.checkAccess(w.thread);
  167. } finally {
  168. mainLock.unlock();
  169. }
  170. }
  171. }
  172. private void interruptWorkers() {
  173. final ReentrantLock mainLock = this.mainLock;
  174. mainLock.lock();
  175. try {
  176. for (Worker w : workers)
  177. w.interruptIfStarted();
  178. } finally {
  179. mainLock.unlock();
  180. }
  181. }
  182. private void interruptIdleWorkers(boolean onlyOne) {
  183. final ReentrantLock mainLock = this.mainLock;
  184. mainLock.lock();
  185. try {
  186. for (Worker w : workers) {
  187. Thread t = w.thread;
  188. if (!t.isInterrupted() && w.tryLock()) {
  189. try {
  190. t.interrupt();
  191. } catch (SecurityException ignore) {
  192. } finally {
  193. w.unlock();
  194. }
  195. }
  196. if (onlyOne)
  197. break;
  198. }
  199. } finally {
  200. mainLock.unlock();
  201. }
  202. }
  203. private void interruptIdleWorkers() {
  204. interruptIdleWorkers(false);
  205. }
  206. private static final boolean ONLY_ONE = true;
  207. final void reject(Runnable command) {
  208. handler.rejectedExecution(command, this);
  209. }
  210. void onShutdown() {
  211. }
  212. final boolean isRunningOrShutdown(boolean shutdownOK) {
  213. int rs = runStateOf(ctl.get());
  214. return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
  215. }
  216. private List<Runnable> drainQueue() {
  217. BlockingQueue<Runnable> q = workQueue;
  218. ArrayList<Runnable> taskList = new ArrayList<Runnable>();
  219. q.drainTo(taskList);
  220. if (!q.isEmpty()) {
  221. for (Runnable r : q.toArray(new Runnable[0])) {
  222. if (q.remove(r))
  223. taskList.add(r);
  224. }
  225. }
  226. return taskList;
  227. }
  228. private boolean addWorker(Runnable firstTask, boolean core) {
  229. retry:
  230. for (;;) {
  231. int c = ctl.get();
  232. int rs = runStateOf(c);
  233. // Check if queue empty only if necessary.
  234. if (rs >= SHUTDOWN &&
  235. ! (rs == SHUTDOWN &&
  236. firstTask == null &&
  237. ! workQueue.isEmpty()))
  238. return false;
  239. for (;;) {
  240. int wc = workerCountOf(c);
  241. if (wc >= CAPACITY ||
  242. wc >= (core ? corePoolSize : maximumPoolSize))
  243. return false;
  244. if (compareAndIncrementWorkerCount(c))
  245. break retry;
  246. c = ctl.get(); // Re-read ctl
  247. if (runStateOf(c) != rs)
  248. continue retry;
  249. // else CAS failed due to workerCount change; retry inner loop
  250. }
  251. }
  252. boolean workerStarted = false;
  253. boolean workerAdded = false;
  254. Worker w = null;
  255. try {
  256. w = new Worker(firstTask);
  257. final Thread t = w.thread;
  258. if (t != null) {
  259. final ReentrantLock mainLock = this.mainLock;
  260. mainLock.lock();
  261. try {
  262. // Recheck while holding lock.
  263. // Back out on ThreadFactory failure or if
  264. // shut down before lock acquired.
  265. int rs = runStateOf(ctl.get());
  266. if (rs < SHUTDOWN ||
  267. (rs == SHUTDOWN && firstTask == null)) {
  268. if (t.isAlive()) // precheck that t is startable
  269. throw new IllegalThreadStateException();
  270. workers.add(w);
  271. int s = workers.size();
  272. if (s > largestPoolSize)
  273. largestPoolSize = s;
  274. workerAdded = true;
  275. }
  276. } finally {
  277. mainLock.unlock();
  278. }
  279. if (workerAdded) {
  280. t.start();
  281. workerStarted = true;
  282. }
  283. }
  284. } finally {
  285. if (! workerStarted)
  286. addWorkerFailed(w);
  287. }
  288. return workerStarted;
  289. }
  290. private void addWorkerFailed(Worker w) {
  291. final ReentrantLock mainLock = this.mainLock;
  292. mainLock.lock();
  293. try {
  294. if (w != null)
  295. workers.remove(w);
  296. decrementWorkerCount();
  297. tryTerminate();
  298. } finally {
  299. mainLock.unlock();
  300. }
  301. }
  302. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  303. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
  304. decrementWorkerCount();
  305. final ReentrantLock mainLock = this.mainLock;
  306. mainLock.lock();
  307. try {
  308. completedTaskCount += w.completedTasks;
  309. workers.remove(w);
  310. } finally {
  311. mainLock.unlock();
  312. }
  313. tryTerminate();
  314. int c = ctl.get();
  315. if (runStateLessThan(c, STOP)) {
  316. if (!completedAbruptly) {
  317. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  318. if (min == 0 && ! workQueue.isEmpty())
  319. min = 1;
  320. if (workerCountOf(c) >= min)
  321. return; // replacement not needed
  322. }
  323. addWorker(null, false);
  324. }
  325. }
  326. private Runnable getTask() {
  327. boolean timedOut = false; // Did the last poll() time out?
  328. for (;;) {
  329. int c = ctl.get();
  330. int rs = runStateOf(c);
  331. // Check if queue empty only if necessary.
  332. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  333. decrementWorkerCount();
  334. return null;
  335. }
  336. int wc = workerCountOf(c);
  337. // Are workers subject to culling?
  338. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  339. if ((wc > maximumPoolSize || (timed && timedOut))
  340. && (wc > 1 || workQueue.isEmpty())) {
  341. if (compareAndDecrementWorkerCount(c))
  342. return null;
  343. continue;
  344. }
  345. try {
  346. Runnable r = timed ?
  347. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  348. workQueue.take();
  349. if (r != null)
  350. return r;
  351. timedOut = true;
  352. } catch (InterruptedException retry) {
  353. timedOut = false;
  354. }
  355. }
  356. }
  357. final void runWorker(Worker w) {
  358. Thread wt = Thread.currentThread();
  359. Runnable task = w.firstTask;
  360. w.firstTask = null;
  361. w.unlock(); // allow interrupts
  362. boolean completedAbruptly = true;
  363. try {
  364. while (task != null || (task = getTask()) != null) {
  365. w.lock();
  366. // If pool is stopping, ensure thread is interrupted;
  367. // if not, ensure thread is not interrupted. This
  368. // requires a recheck in second case to deal with
  369. // shutdownNow race while clearing interrupt
  370. if ((runStateAtLeast(ctl.get(), STOP) ||
  371. (Thread.interrupted() &&
  372. runStateAtLeast(ctl.get(), STOP))) &&
  373. !wt.isInterrupted())
  374. wt.interrupt();
  375. try {
  376. beforeExecute(wt, task);
  377. Throwable thrown = null;
  378. try {
  379. task.run();
  380. } catch (RuntimeException x) {
  381. thrown = x; throw x;
  382. } catch (Error x) {
  383. thrown = x; throw x;
  384. } catch (Throwable x) {
  385. thrown = x; throw new Error(x);
  386. } finally {
  387. afterExecute(task, thrown);
  388. }
  389. } finally {
  390. task = null;
  391. w.completedTasks++;
  392. w.unlock();
  393. }
  394. }
  395. completedAbruptly = false;
  396. } finally {
  397. processWorkerExit(w, completedAbruptly);
  398. }
  399. }
  400. // Public constructors and methods
  401. public ThreadPoolExecutor(int corePoolSize,
  402. int maximumPoolSize,
  403. long keepAliveTime,
  404. TimeUnit unit,
  405. BlockingQueue<Runnable> workQueue) {
  406. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  407. Executors.defaultThreadFactory(), defaultHandler);
  408. }
  409. public ThreadPoolExecutor(int corePoolSize,
  410. int maximumPoolSize,
  411. long keepAliveTime,
  412. TimeUnit unit,
  413. BlockingQueue<Runnable> workQueue,
  414. ThreadFactory threadFactory) {
  415. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  416. threadFactory, defaultHandler);
  417. }
  418. public ThreadPoolExecutor(int corePoolSize,
  419. int maximumPoolSize,
  420. long keepAliveTime,
  421. TimeUnit unit,
  422. BlockingQueue<Runnable> workQueue,
  423. RejectedExecutionHandler handler) {
  424. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  425. Executors.defaultThreadFactory(), handler);
  426. }
  427. public ThreadPoolExecutor(int corePoolSize,
  428. int maximumPoolSize,
  429. long keepAliveTime,
  430. TimeUnit unit,
  431. BlockingQueue<Runnable> workQueue,
  432. ThreadFactory threadFactory,
  433. RejectedExecutionHandler handler) {
  434. if (corePoolSize < 0 ||
  435. maximumPoolSize <= 0 ||
  436. maximumPoolSize < corePoolSize ||
  437. keepAliveTime < 0)
  438. throw new IllegalArgumentException();
  439. if (workQueue == null || threadFactory == null || handler == null)
  440. throw new NullPointerException();
  441. this.acc = System.getSecurityManager() == null ?
  442. null :
  443. AccessController.getContext();
  444. this.corePoolSize = corePoolSize;
  445. this.maximumPoolSize = maximumPoolSize;
  446. this.workQueue = workQueue;
  447. this.keepAliveTime = unit.toNanos(keepAliveTime);
  448. this.threadFactory = threadFactory;
  449. this.handler = handler;
  450. }
  451. public void execute(Runnable command) {
  452. if (command == null)
  453. throw new NullPointerException();
  454. int c = ctl.get();
  455. if (workerCountOf(c) < corePoolSize) {
  456. if (addWorker(command, true))
  457. return;
  458. c = ctl.get();
  459. }
  460. if (isRunning(c) && workQueue.offer(command)) {
  461. int recheck = ctl.get();
  462. if (! isRunning(recheck) && remove(command))
  463. reject(command);
  464. else if (workerCountOf(recheck) == 0)
  465. addWorker(null, false);
  466. }
  467. else if (!addWorker(command, false))
  468. reject(command);
  469. }
  470. public void shutdown() {
  471. final ReentrantLock mainLock = this.mainLock;
  472. mainLock.lock();
  473. try {
  474. checkShutdownAccess();
  475. advanceRunState(SHUTDOWN);
  476. interruptIdleWorkers();
  477. onShutdown(); // hook for ScheduledThreadPoolExecutor
  478. } finally {
  479. mainLock.unlock();
  480. }
  481. tryTerminate();
  482. }
  483. public List<Runnable> shutdownNow() {
  484. List<Runnable> tasks;
  485. final ReentrantLock mainLock = this.mainLock;
  486. mainLock.lock();
  487. try {
  488. checkShutdownAccess();
  489. advanceRunState(STOP);
  490. interruptWorkers();
  491. tasks = drainQueue();
  492. } finally {
  493. mainLock.unlock();
  494. }
  495. tryTerminate();
  496. return tasks;
  497. }
  498. public boolean isShutdown() {
  499. return ! isRunning(ctl.get());
  500. }
  501. public boolean isTerminating() {
  502. int c = ctl.get();
  503. return ! isRunning(c) && runStateLessThan(c, TERMINATED);
  504. }
  505. public boolean isTerminated() {
  506. return runStateAtLeast(ctl.get(), TERMINATED);
  507. }
  508. public boolean awaitTermination(long timeout, TimeUnit unit)
  509. throws InterruptedException {
  510. long nanos = unit.toNanos(timeout);
  511. final ReentrantLock mainLock = this.mainLock;
  512. mainLock.lock();
  513. try {
  514. for (;;) {
  515. if (runStateAtLeast(ctl.get(), TERMINATED))
  516. return true;
  517. if (nanos <= 0)
  518. return false;
  519. nanos = termination.awaitNanos(nanos);
  520. }
  521. } finally {
  522. mainLock.unlock();
  523. }
  524. }
  525. protected void finalize() {
  526. SecurityManager sm = System.getSecurityManager();
  527. if (sm == null || acc == null) {
  528. shutdown();
  529. } else {
  530. PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
  531. AccessController.doPrivileged(pa, acc);
  532. }
  533. }
  534. public void setThreadFactory(ThreadFactory threadFactory) {
  535. if (threadFactory == null)
  536. throw new NullPointerException();
  537. this.threadFactory = threadFactory;
  538. }
  539. public ThreadFactory getThreadFactory() {
  540. return threadFactory;
  541. }
  542. public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
  543. if (handler == null)
  544. throw new NullPointerException();
  545. this.handler = handler;
  546. }
  547. public RejectedExecutionHandler getRejectedExecutionHandler() {
  548. return handler;
  549. }
  550. public void setCorePoolSize(int corePoolSize) {
  551. if (corePoolSize < 0)
  552. throw new IllegalArgumentException();
  553. int delta = corePoolSize - this.corePoolSize;
  554. this.corePoolSize = corePoolSize;
  555. if (workerCountOf(ctl.get()) > corePoolSize)
  556. interruptIdleWorkers();
  557. else if (delta > 0) {
  558. // We don't really know how many new threads are "needed".
  559. // As a heuristic, prestart enough new workers (up to new
  560. // core size) to handle the current number of tasks in
  561. // queue, but stop if queue becomes empty while doing so.
  562. int k = Math.min(delta, workQueue.size());
  563. while (k-- > 0 && addWorker(null, true)) {
  564. if (workQueue.isEmpty())
  565. break;
  566. }
  567. }
  568. }
  569. public int getCorePoolSize() {
  570. return corePoolSize;
  571. }
  572. public boolean prestartCoreThread() {
  573. return workerCountOf(ctl.get()) < corePoolSize &&
  574. addWorker(null, true);
  575. }
  576. void ensurePrestart() {
  577. int wc = workerCountOf(ctl.get());
  578. if (wc < corePoolSize)
  579. addWorker(null, true);
  580. else if (wc == 0)
  581. addWorker(null, false);
  582. }
  583. public int prestartAllCoreThreads() {
  584. int n = 0;
  585. while (addWorker(null, true))
  586. ++n;
  587. return n;
  588. }
  589. public boolean allowsCoreThreadTimeOut() {
  590. return allowCoreThreadTimeOut;
  591. }
  592. public void allowCoreThreadTimeOut(boolean value) {
  593. if (value && keepAliveTime <= 0)
  594. throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
  595. if (value != allowCoreThreadTimeOut) {
  596. allowCoreThreadTimeOut = value;
  597. if (value)
  598. interruptIdleWorkers();
  599. }
  600. }
  601. public void setMaximumPoolSize(int maximumPoolSize) {
  602. if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
  603. throw new IllegalArgumentException();
  604. this.maximumPoolSize = maximumPoolSize;
  605. if (workerCountOf(ctl.get()) > maximumPoolSize)
  606. interruptIdleWorkers();
  607. }
  608. public int getMaximumPoolSize() {
  609. return maximumPoolSize;
  610. }
  611. public void setKeepAliveTime(long time, TimeUnit unit) {
  612. if (time < 0)
  613. throw new IllegalArgumentException();
  614. if (time == 0 && allowsCoreThreadTimeOut())
  615. throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
  616. long keepAliveTime = unit.toNanos(time);
  617. long delta = keepAliveTime - this.keepAliveTime;
  618. this.keepAliveTime = keepAliveTime;
  619. if (delta < 0)
  620. interruptIdleWorkers();
  621. }
  622. public long getKeepAliveTime(TimeUnit unit) {
  623. return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
  624. }
  625. /* User-level queue utilities */
  626. public BlockingQueue<Runnable> getQueue() {
  627. return workQueue;
  628. }
  629. public boolean remove(Runnable task) {
  630. boolean removed = workQueue.remove(task);
  631. tryTerminate(); // In case SHUTDOWN and now empty
  632. return removed;
  633. }
  634. public void purge() {
  635. final BlockingQueue<Runnable> q = workQueue;
  636. try {
  637. Iterator<Runnable> it = q.iterator();
  638. while (it.hasNext()) {
  639. Runnable r = it.next();
  640. if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
  641. it.remove();
  642. }
  643. } catch (ConcurrentModificationException fallThrough) {
  644. // Take slow path if we encounter interference during traversal.
  645. // Make copy for traversal and call remove for cancelled entries.
  646. // The slow path is more likely to be O(N*N).
  647. for (Object r : q.toArray())
  648. if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
  649. q.remove(r);
  650. }
  651. tryTerminate(); // In case SHUTDOWN and now empty
  652. }
  653. public int getPoolSize() {
  654. final ReentrantLock mainLock = this.mainLock;
  655. mainLock.lock();
  656. try {
  657. // Remove rare and surprising possibility of
  658. // isTerminated() && getPoolSize() > 0
  659. return runStateAtLeast(ctl.get(), TIDYING) ? 0
  660. : workers.size();
  661. } finally {
  662. mainLock.unlock();
  663. }
  664. }
  665. public int getActiveCount() {
  666. final ReentrantLock mainLock = this.mainLock;
  667. mainLock.lock();
  668. try {
  669. int n = 0;
  670. for (Worker w : workers)
  671. if (w.isLocked())
  672. ++n;
  673. return n;
  674. } finally {
  675. mainLock.unlock();
  676. }
  677. }
  678. public int getLargestPoolSize() {
  679. final ReentrantLock mainLock = this.mainLock;
  680. mainLock.lock();
  681. try {
  682. return largestPoolSize;
  683. } finally {
  684. mainLock.unlock();
  685. }
  686. }
  687. public long getTaskCount() {
  688. final ReentrantLock mainLock = this.mainLock;
  689. mainLock.lock();
  690. try {
  691. long n = completedTaskCount;
  692. for (Worker w : workers) {
  693. n += w.completedTasks;
  694. if (w.isLocked())
  695. ++n;
  696. }
  697. return n + workQueue.size();
  698. } finally {
  699. mainLock.unlock();
  700. }
  701. }
  702. public long getCompletedTaskCount() {
  703. final ReentrantLock mainLock = this.mainLock;
  704. mainLock.lock();
  705. try {
  706. long n = completedTaskCount;
  707. for (Worker w : workers)
  708. n += w.completedTasks;
  709. return n;
  710. } finally {
  711. mainLock.unlock();
  712. }
  713. }
  714. public String toString() {
  715. long ncompleted;
  716. int nworkers, nactive;
  717. final ReentrantLock mainLock = this.mainLock;
  718. mainLock.lock();
  719. try {
  720. ncompleted = completedTaskCount;
  721. nactive = 0;
  722. nworkers = workers.size();
  723. for (Worker w : workers) {
  724. ncompleted += w.completedTasks;
  725. if (w.isLocked())
  726. ++nactive;
  727. }
  728. } finally {
  729. mainLock.unlock();
  730. }
  731. int c = ctl.get();
  732. String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
  733. (runStateAtLeast(c, TERMINATED) ? "Terminated" :
  734. "Shutting down"));
  735. return super.toString() +
  736. "[" + rs +
  737. ", pool size = " + nworkers +
  738. ", active threads = " + nactive +
  739. ", queued tasks = " + workQueue.size() +
  740. ", completed tasks = " + ncompleted +
  741. "]";
  742. }
  743. protected void beforeExecute(Thread t, Runnable r) { }
  744. protected void afterExecute(Runnable r, Throwable t) { }
  745. protected void terminated() { }
  746. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  747. /**
  748. * Creates a {@code CallerRunsPolicy}.
  749. */
  750. public CallerRunsPolicy() { }
  751. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  752. if (!e.isShutdown()) {
  753. r.run();
  754. }
  755. }
  756. }
  757. public static class AbortPolicy implements RejectedExecutionHandler {
  758. /**
  759. * Creates an {@code AbortPolicy}.
  760. */
  761. public AbortPolicy() { }
  762. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  763. throw new RejectedExecutionException("Task " + r.toString() +
  764. " rejected from " +
  765. e.toString());
  766. }
  767. }
  768. public static class DiscardPolicy implements RejectedExecutionHandler {
  769. public DiscardPolicy() { }
  770. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  771. }
  772. }
  773. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  774. public DiscardOldestPolicy() { }
  775. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  776. if (!e.isShutdown()) {
  777. e.getQueue().poll();
  778. e.execute(r);
  779. }
  780. }
  781. }
  782. }


Modifier and TypeMethod and Description
protected <V> RunnableScheduledFuture<V>decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task)


protected <V> RunnableScheduledFuture<V>decorateTask(Runnable runnable, RunnableScheduledFuture<V> task)


voidexecute(Runnable command)










<V> ScheduledFuture<V>schedule(Callable<V> callable, long delay, TimeUnit unit)


ScheduledFuture<?>schedule(Runnable command, long delay, TimeUnit unit)


ScheduledFuture<?>scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

创建并执行在给定的初始延迟之后,随后以给定的时间段首先启用的周期性动作;那就是执行将在initialDelay之后开始,然后initialDelay+period,然后是initialDelay + 2 * period等等。

ScheduledFuture<?>scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)


voidsetContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)


voidsetExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)


voidsetRemoveOnCancelPolicy(boolean value)






<T> Future<T>submit(Callable<T> task)


Future<?>submit(Runnable task)


<T> Future<T>submit(Runnable task, T result)


  1. package java.util.concurrent;
  2. import static java.util.concurrent.TimeUnit.NANOSECONDS;
  3. import java.util.concurrent.atomic.AtomicLong;
  4. import java.util.concurrent.locks.Condition;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. import java.util.*;
  7. public class ScheduledThreadPoolExecutor
  8. extends ThreadPoolExecutor
  9. implements ScheduledExecutorService {
  10. private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  11. private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
  12. private volatile boolean removeOnCancel = false;
  13. private static final AtomicLong sequencer = new AtomicLong();
  14. final long now() {
  15. return System.nanoTime();
  16. }
  17. private class ScheduledFutureTask<V>
  18. extends FutureTask<V> implements RunnableScheduledFuture<V> {
  19. /** Sequence number to break ties FIFO */
  20. private final long sequenceNumber;
  21. /** The time the task is enabled to execute in nanoTime units */
  22. private long time;
  23. private final long period;
  24. /** The actual task to be re-enqueued by reExecutePeriodic */
  25. RunnableScheduledFuture<V> outerTask = this;
  26. int heapIndex;
  27. ScheduledFutureTask(Runnable r, V result, long ns) {
  28. super(r, result);
  29. this.time = ns;
  30. this.period = 0;
  31. this.sequenceNumber = sequencer.getAndIncrement();
  32. }
  33. ScheduledFutureTask(Runnable r, V result, long ns, long period) {
  34. super(r, result);
  35. this.time = ns;
  36. this.period = period;
  37. this.sequenceNumber = sequencer.getAndIncrement();
  38. }
  39. ScheduledFutureTask(Callable<V> callable, long ns) {
  40. super(callable);
  41. this.time = ns;
  42. this.period = 0;
  43. this.sequenceNumber = sequencer.getAndIncrement();
  44. }
  45. public long getDelay(TimeUnit unit) {
  46. return unit.convert(time - now(), NANOSECONDS);
  47. }
  48. public int compareTo(Delayed other) {
  49. if (other == this) // compare zero if same object
  50. return 0;
  51. if (other instanceof ScheduledFutureTask) {
  52. ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
  53. long diff = time - x.time;
  54. if (diff < 0)
  55. return -1;
  56. else if (diff > 0)
  57. return 1;
  58. else if (sequenceNumber < x.sequenceNumber)
  59. return -1;
  60. else
  61. return 1;
  62. }
  63. long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
  64. return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  65. }
  66. public boolean isPeriodic() {
  67. return period != 0;
  68. }
  69. private void setNextRunTime() {
  70. long p = period;
  71. if (p > 0)
  72. time += p;
  73. else
  74. time = triggerTime(-p);
  75. }
  76. public boolean cancel(boolean mayInterruptIfRunning) {
  77. boolean cancelled = super.cancel(mayInterruptIfRunning);
  78. if (cancelled && removeOnCancel && heapIndex >= 0)
  79. remove(this);
  80. return cancelled;
  81. }
  82. public void run() {
  83. boolean periodic = isPeriodic();
  84. if (!canRunInCurrentRunState(periodic))
  85. cancel(false);
  86. else if (!periodic)
  87. ScheduledFutureTask.super.run();
  88. else if (ScheduledFutureTask.super.runAndReset()) {
  89. setNextRunTime();
  90. reExecutePeriodic(outerTask);
  91. }
  92. }
  93. }
  94. boolean canRunInCurrentRunState(boolean periodic) {
  95. return isRunningOrShutdown(periodic ?
  96. continueExistingPeriodicTasksAfterShutdown :
  97. executeExistingDelayedTasksAfterShutdown);
  98. }
  99. private void delayedExecute(RunnableScheduledFuture<?> task) {
  100. if (isShutdown())
  101. reject(task);
  102. else {
  103. super.getQueue().add(task);
  104. if (isShutdown() &&
  105. !canRunInCurrentRunState(task.isPeriodic()) &&
  106. remove(task))
  107. task.cancel(false);
  108. else
  109. ensurePrestart();
  110. }
  111. }
  112. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  113. if (canRunInCurrentRunState(true)) {
  114. super.getQueue().add(task);
  115. if (!canRunInCurrentRunState(true) && remove(task))
  116. task.cancel(false);
  117. else
  118. ensurePrestart();
  119. }
  120. }
  121. @Override void onShutdown() {
  122. BlockingQueue<Runnable> q = super.getQueue();
  123. boolean keepDelayed =
  124. getExecuteExistingDelayedTasksAfterShutdownPolicy();
  125. boolean keepPeriodic =
  126. getContinueExistingPeriodicTasksAfterShutdownPolicy();
  127. if (!keepDelayed && !keepPeriodic) {
  128. for (Object e : q.toArray())
  129. if (e instanceof RunnableScheduledFuture<?>)
  130. ((RunnableScheduledFuture<?>) e).cancel(false);
  131. q.clear();
  132. }
  133. else {
  134. // Traverse snapshot to avoid iterator exceptions
  135. for (Object e : q.toArray()) {
  136. if (e instanceof RunnableScheduledFuture) {
  137. RunnableScheduledFuture<?> t =
  138. (RunnableScheduledFuture<?>)e;
  139. if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
  140. t.isCancelled()) { // also remove if already cancelled
  141. if (q.remove(t))
  142. t.cancel(false);
  143. }
  144. }
  145. }
  146. }
  147. tryTerminate();
  148. }
  149. protected <V> RunnableScheduledFuture<V> decorateTask(
  150. Runnable runnable, RunnableScheduledFuture<V> task) {
  151. return task;
  152. }
  153. protected <V> RunnableScheduledFuture<V> decorateTask(
  154. Callable<V> callable, RunnableScheduledFuture<V> task) {
  155. return task;
  156. }
  157. public ScheduledThreadPoolExecutor(int corePoolSize) {
  158. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  159. new DelayedWorkQueue());
  160. }
  161. public ScheduledThreadPoolExecutor(int corePoolSize,
  162. ThreadFactory threadFactory) {
  163. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  164. new DelayedWorkQueue(), threadFactory);
  165. }
  166. public ScheduledThreadPoolExecutor(int corePoolSize,
  167. RejectedExecutionHandler handler) {
  168. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  169. new DelayedWorkQueue(), handler);
  170. }
  171. public ScheduledThreadPoolExecutor(int corePoolSize,
  172. ThreadFactory threadFactory,
  173. RejectedExecutionHandler handler) {
  174. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  175. new DelayedWorkQueue(), threadFactory, handler);
  176. }
  177. private long triggerTime(long delay, TimeUnit unit) {
  178. return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
  179. }
  180. long triggerTime(long delay) {
  181. return now() +
  182. ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  183. }
  184. private long overflowFree(long delay) {
  185. Delayed head = (Delayed) super.getQueue().peek();
  186. if (head != null) {
  187. long headDelay = head.getDelay(NANOSECONDS);
  188. if (headDelay < 0 && (delay - headDelay < 0))
  189. delay = Long.MAX_VALUE + headDelay;
  190. }
  191. return delay;
  192. }
  193. public ScheduledFuture<?> schedule(Runnable command,
  194. long delay,
  195. TimeUnit unit) {
  196. if (command == null || unit == null)
  197. throw new NullPointerException();
  198. RunnableScheduledFuture<?> t = decorateTask(command,
  199. new ScheduledFutureTask<Void>(command, null,
  200. triggerTime(delay, unit)));
  201. delayedExecute(t);
  202. return t;
  203. }
  204. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
  205. long delay,
  206. TimeUnit unit) {
  207. if (callable == null || unit == null)
  208. throw new NullPointerException();
  209. RunnableScheduledFuture<V> t = decorateTask(callable,
  210. new ScheduledFutureTask<V>(callable,
  211. triggerTime(delay, unit)));
  212. delayedExecute(t);
  213. return t;
  214. }
  215. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  216. long initialDelay,
  217. long period,
  218. TimeUnit unit) {
  219. if (command == null || unit == null)
  220. throw new NullPointerException();
  221. if (period <= 0)
  222. throw new IllegalArgumentException();
  223. ScheduledFutureTask<Void> sft =
  224. new ScheduledFutureTask<Void>(command,
  225. null,
  226. triggerTime(initialDelay, unit),
  227. unit.toNanos(period));
  228. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  229. sft.outerTask = t;
  230. delayedExecute(t);
  231. return t;
  232. }
  233. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  234. long initialDelay,
  235. long delay,
  236. TimeUnit unit) {
  237. if (command == null || unit == null)
  238. throw new NullPointerException();
  239. if (delay <= 0)
  240. throw new IllegalArgumentException();
  241. ScheduledFutureTask<Void> sft =
  242. new ScheduledFutureTask<Void>(command,
  243. null,
  244. triggerTime(initialDelay, unit),
  245. unit.toNanos(-delay));
  246. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  247. sft.outerTask = t;
  248. delayedExecute(t);
  249. return t;
  250. }
  251. public void execute(Runnable command) {
  252. schedule(command, 0, NANOSECONDS);
  253. }
  254. // Override AbstractExecutorService methods
  255. public Future<?> submit(Runnable task) {
  256. return schedule(task, 0, NANOSECONDS);
  257. }
  258. public <T> Future<T> submit(Runnable task, T result) {
  259. return schedule(Executors.callable(task, result), 0, NANOSECONDS);
  260. }
  261. public <T> Future<T> submit(Callable<T> task) {
  262. return schedule(task, 0, NANOSECONDS);
  263. }
  264. public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
  265. continueExistingPeriodicTasksAfterShutdown = value;
  266. if (!value && isShutdown())
  267. onShutdown();
  268. }
  269. public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
  270. return continueExistingPeriodicTasksAfterShutdown;
  271. }
  272. public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
  273. executeExistingDelayedTasksAfterShutdown = value;
  274. if (!value && isShutdown())
  275. onShutdown();
  276. }
  277. public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
  278. return executeExistingDelayedTasksAfterShutdown;
  279. }
  280. public void setRemoveOnCancelPolicy(boolean value) {
  281. removeOnCancel = value;
  282. }
  283. public boolean getRemoveOnCancelPolicy() {
  284. return removeOnCancel;
  285. }
  286. public void shutdown() {
  287. super.shutdown();
  288. }
  289. public List<Runnable> shutdownNow() {
  290. return super.shutdownNow();
  291. }
  292. public BlockingQueue<Runnable> getQueue() {
  293. return super.getQueue();
  294. }
  295. static class DelayedWorkQueue extends AbstractQueue<Runnable>
  296. implements BlockingQueue<Runnable> {
  297. private static final int INITIAL_CAPACITY = 16;
  298. private RunnableScheduledFuture<?>[] queue =
  299. new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  300. private final ReentrantLock lock = new ReentrantLock();
  301. private int size = 0;
  302. private Thread leader = null;
  303. /**
  304. * Condition signalled when a newer task becomes available at the
  305. * head of the queue or a new thread may need to become leader.
  306. */
  307. private final Condition available = lock.newCondition();
  308. /**
  309. * Sets f's heapIndex if it is a ScheduledFutureTask.
  310. */
  311. private void setIndex(RunnableScheduledFuture<?> f, int idx) {
  312. if (f instanceof ScheduledFutureTask)
  313. ((ScheduledFutureTask)f).heapIndex = idx;
  314. }
  315. private void siftUp(int k, RunnableScheduledFuture<?> key) {
  316. while (k > 0) {
  317. int parent = (k - 1) >>> 1;
  318. RunnableScheduledFuture<?> e = queue[parent];
  319. if (key.compareTo(e) >= 0)
  320. break;
  321. queue[k] = e;
  322. setIndex(e, k);
  323. k = parent;
  324. }
  325. queue[k] = key;
  326. setIndex(key, k);
  327. }
  328. private void siftDown(int k, RunnableScheduledFuture<?> key) {
  329. int half = size >>> 1;
  330. while (k < half) {
  331. int child = (k << 1) + 1;
  332. RunnableScheduledFuture<?> c = queue[child];
  333. int right = child + 1;
  334. if (right < size && c.compareTo(queue[right]) > 0)
  335. c = queue[child = right];
  336. if (key.compareTo(c) <= 0)
  337. break;
  338. queue[k] = c;
  339. setIndex(c, k);
  340. k = child;
  341. }
  342. queue[k] = key;
  343. setIndex(key, k);
  344. }
  345. private void grow() {
  346. int oldCapacity = queue.length;
  347. int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
  348. if (newCapacity < 0) // overflow
  349. newCapacity = Integer.MAX_VALUE;
  350. queue = Arrays.copyOf(queue, newCapacity);
  351. }
  352. private int indexOf(Object x) {
  353. if (x != null) {
  354. if (x instanceof ScheduledFutureTask) {
  355. int i = ((ScheduledFutureTask) x).heapIndex;
  356. // Sanity check; x could conceivably be a
  357. // ScheduledFutureTask from some other pool.
  358. if (i >= 0 && i < size && queue[i] == x)
  359. return i;
  360. } else {
  361. for (int i = 0; i < size; i++)
  362. if (x.equals(queue[i]))
  363. return i;
  364. }
  365. }
  366. return -1;
  367. }
  368. public boolean contains(Object x) {
  369. final ReentrantLock lock = this.lock;
  370. lock.lock();
  371. try {
  372. return indexOf(x) != -1;
  373. } finally {
  374. lock.unlock();
  375. }
  376. }
  377. public boolean remove(Object x) {
  378. final ReentrantLock lock = this.lock;
  379. lock.lock();
  380. try {
  381. int i = indexOf(x);
  382. if (i < 0)
  383. return false;
  384. setIndex(queue[i], -1);
  385. int s = --size;
  386. RunnableScheduledFuture<?> replacement = queue[s];
  387. queue[s] = null;
  388. if (s != i) {
  389. siftDown(i, replacement);
  390. if (queue[i] == replacement)
  391. siftUp(i, replacement);
  392. }
  393. return true;
  394. } finally {
  395. lock.unlock();
  396. }
  397. }
  398. public int size() {
  399. final ReentrantLock lock = this.lock;
  400. lock.lock();
  401. try {
  402. return size;
  403. } finally {
  404. lock.unlock();
  405. }
  406. }
  407. public boolean isEmpty() {
  408. return size() == 0;
  409. }
  410. public int remainingCapacity() {
  411. return Integer.MAX_VALUE;
  412. }
  413. public RunnableScheduledFuture<?> peek() {
  414. final ReentrantLock lock = this.lock;
  415. lock.lock();
  416. try {
  417. return queue[0];
  418. } finally {
  419. lock.unlock();
  420. }
  421. }
  422. public boolean offer(Runnable x) {
  423. if (x == null)
  424. throw new NullPointerException();
  425. RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
  426. final ReentrantLock lock = this.lock;
  427. lock.lock();
  428. try {
  429. int i = size;
  430. if (i >= queue.length)
  431. grow();
  432. size = i + 1;
  433. if (i == 0) {
  434. queue[0] = e;
  435. setIndex(e, 0);
  436. } else {
  437. siftUp(i, e);
  438. }
  439. if (queue[0] == e) {
  440. leader = null;
  441. available.signal();
  442. }
  443. } finally {
  444. lock.unlock();
  445. }
  446. return true;
  447. }
  448. public void put(Runnable e) {
  449. offer(e);
  450. }
  451. public boolean add(Runnable e) {
  452. return offer(e);
  453. }
  454. public boolean offer(Runnable e, long timeout, TimeUnit unit) {
  455. return offer(e);
  456. }
  457. private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
  458. int s = --size;
  459. RunnableScheduledFuture<?> x = queue[s];
  460. queue[s] = null;
  461. if (s != 0)
  462. siftDown(0, x);
  463. setIndex(f, -1);
  464. return f;
  465. }
  466. public RunnableScheduledFuture<?> poll() {
  467. final ReentrantLock lock = this.lock;
  468. lock.lock();
  469. try {
  470. RunnableScheduledFuture<?> first = queue[0];
  471. if (first == null || first.getDelay(NANOSECONDS) > 0)
  472. return null;
  473. else
  474. return finishPoll(first);
  475. } finally {
  476. lock.unlock();
  477. }
  478. }
  479. public RunnableScheduledFuture<?> take() throws InterruptedException {
  480. final ReentrantLock lock = this.lock;
  481. lock.lockInterruptibly();
  482. try {
  483. for (;;) {
  484. RunnableScheduledFuture<?> first = queue[0];
  485. if (first == null)
  486. available.await();
  487. else {
  488. long delay = first.getDelay(NANOSECONDS);
  489. if (delay <= 0)
  490. return finishPoll(first);
  491. first = null; // don't retain ref while waiting
  492. if (leader != null)
  493. available.await();
  494. else {
  495. Thread thisThread = Thread.currentThread();
  496. leader = thisThread;
  497. try {
  498. available.awaitNanos(delay);
  499. } finally {
  500. if (leader == thisThread)
  501. leader = null;
  502. }
  503. }
  504. }
  505. }
  506. } finally {
  507. if (leader == null && queue[0] != null)
  508. available.signal();
  509. lock.unlock();
  510. }
  511. }
  512. public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
  513. throws InterruptedException {
  514. long nanos = unit.toNanos(timeout);
  515. final ReentrantLock lock = this.lock;
  516. lock.lockInterruptibly();
  517. try {
  518. for (;;) {
  519. RunnableScheduledFuture<?> first = queue[0];
  520. if (first == null) {
  521. if (nanos <= 0)
  522. return null;
  523. else
  524. nanos = available.awaitNanos(nanos);
  525. } else {
  526. long delay = first.getDelay(NANOSECONDS);
  527. if (delay <= 0)
  528. return finishPoll(first);
  529. if (nanos <= 0)
  530. return null;
  531. first = null; // don't retain ref while waiting
  532. if (nanos < delay || leader != null)
  533. nanos = available.awaitNanos(nanos);
  534. else {
  535. Thread thisThread = Thread.currentThread();
  536. leader = thisThread;
  537. try {
  538. long timeLeft = available.awaitNanos(delay);
  539. nanos -= delay - timeLeft;
  540. } finally {
  541. if (leader == thisThread)
  542. leader = null;
  543. }
  544. }
  545. }
  546. }
  547. } finally {
  548. if (leader == null && queue[0] != null)
  549. available.signal();
  550. lock.unlock();
  551. }
  552. }
  553. public void clear() {
  554. final ReentrantLock lock = this.lock;
  555. lock.lock();
  556. try {
  557. for (int i = 0; i < size; i++) {
  558. RunnableScheduledFuture<?> t = queue[i];
  559. if (t != null) {
  560. queue[i] = null;
  561. setIndex(t, -1);
  562. }
  563. }
  564. size = 0;
  565. } finally {
  566. lock.unlock();
  567. }
  568. }
  569. private RunnableScheduledFuture<?> peekExpired() {
  570. // assert lock.isHeldByCurrentThread();
  571. RunnableScheduledFuture<?> first = queue[0];
  572. return (first == null || first.getDelay(NANOSECONDS) > 0) ?
  573. null : first;
  574. }
  575. public int drainTo(Collection<? super Runnable> c) {
  576. if (c == null)
  577. throw new NullPointerException();
  578. if (c == this)
  579. throw new IllegalArgumentException();
  580. final ReentrantLock lock = this.lock;
  581. lock.lock();
  582. try {
  583. RunnableScheduledFuture<?> first;
  584. int n = 0;
  585. while ((first = peekExpired()) != null) {
  586. c.add(first); // In this order, in case add() throws.
  587. finishPoll(first);
  588. ++n;
  589. }
  590. return n;
  591. } finally {
  592. lock.unlock();
  593. }
  594. }
  595. public int drainTo(Collection<? super Runnable> c, int maxElements) {
  596. if (c == null)
  597. throw new NullPointerException();
  598. if (c == this)
  599. throw new IllegalArgumentException();
  600. if (maxElements <= 0)
  601. return 0;
  602. final ReentrantLock lock = this.lock;
  603. lock.lock();
  604. try {
  605. RunnableScheduledFuture<?> first;
  606. int n = 0;
  607. while (n < maxElements && (first = peekExpired()) != null) {
  608. c.add(first); // In this order, in case add() throws.
  609. finishPoll(first);
  610. ++n;
  611. }
  612. return n;
  613. } finally {
  614. lock.unlock();
  615. }
  616. }
  617. public Object[] toArray() {
  618. final ReentrantLock lock = this.lock;
  619. lock.lock();
  620. try {
  621. return Arrays.copyOf(queue, size, Object[].class);
  622. } finally {
  623. lock.unlock();
  624. }
  625. }
  626. @SuppressWarnings("unchecked")
  627. public <T> T[] toArray(T[] a) {
  628. final ReentrantLock lock = this.lock;
  629. lock.lock();
  630. try {
  631. if (a.length < size)
  632. return (T[]) Arrays.copyOf(queue, size, a.getClass());
  633. System.arraycopy(queue, 0, a, 0, size);
  634. if (a.length > size)
  635. a[size] = null;
  636. return a;
  637. } finally {
  638. lock.unlock();
  639. }
  640. }
  641. public Iterator<Runnable> iterator() {
  642. return new Itr(Arrays.copyOf(queue, size));
  643. }
  644. private class Itr implements Iterator<Runnable> {
  645. final RunnableScheduledFuture<?>[] array;
  646. int cursor = 0; // index of next element to return
  647. int lastRet = -1; // index of last element, or -1 if no such
  648. Itr(RunnableScheduledFuture<?>[] array) {
  649. this.array = array;
  650. }
  651. public boolean hasNext() {
  652. return cursor < array.length;
  653. }
  654. public Runnable next() {
  655. if (cursor >= array.length)
  656. throw new NoSuchElementException();
  657. lastRet = cursor;
  658. return array[cursor++];
  659. }
  660. public void remove() {
  661. if (lastRet < 0)
  662. throw new IllegalStateException();
  663. DelayedWorkQueue.this.remove(array[lastRet]);
  664. lastRet = -1;
  665. }
  666. }
  667. }
  668. }












