当前位置:   article > 正文

线程池---ThreadPoolExecutor_获取线程池的运行状态

获取线程池的运行状态

目录

线程池的优势

Executor框架的结构

ThreadPoolExecutor

线程池的5种运行状态

无返回提交任务execute(Runnable command)方法

带返回提交任务Future submit(Runnable task)

新建一个工作线程---addWorker(Runnable firstTask,boolean core)方法

工作线程---Worker类

执行任务---runWorker(Worker w)

获取任务---getTask()方法

关闭线程池方式一---shutdown()方法

关闭线程池方式二---shutdownNow()

tryTerminate()方法

工作线程销毁之后---processWorkerExit方法

awaitTermination方法

合理配置线程池

线程池监控

JDK已经定义好的线程池

FixedThreadPool(无界阻塞队列)

SingleThreadExecutor(单个工作线程)

CachedThreadPool(无限制创建线程)


线程池的优势

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁的消耗。
  2. 提高响应速度。当任务到达时,任务不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池。

比如BIO编程过程中,需要利用线程来解决阻塞问题,如果不用线程池,那么可能会造成很多线程被频繁的创建和销毁,这个时候就可以利用线程池。

Executor框架的结构

  1. 任务:包括被执行的任务需要实现的接口(Runnable接口或者Callable接口)。
  2. 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadExecutor)。
  3. 异步计算的结果:包括接口Future和实现Future接口的FutureTask类。

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

ThreadPoolExecutor

线程池的核心实现类,用于执行被提交的任务。

构造线程池的七大参数:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. //在自定义线程池时需要注意参数需要符合要求,否则可能会造成下面两种异常
  9. if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
  10. throw new IllegalArgumentException();
  11. if (workQueue == null || threadFactory == null || handler == null)
  12. throw new NullPointerException();
  13. this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
  14. this.corePoolSize = corePoolSize;
  15. this.maximumPoolSize = maximumPoolSize;
  16. this.workQueue = workQueue;
  17. this.keepAliveTime = unit.toNanos(keepAliveTime);
  18. this.threadFactory = threadFactory;
  19. this.handler = handler;
  20. }
  1. corePoolSize:核心线程数量。
    1. 当前池中的线程数量 < corePoolSize,则会创建新线程来执行新任务。即使其他空闲的核心线程能够执行新任务也会创建线程。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
    2. corePoolSize < 当前池中线程数量 < maximumPoolSize, 该情况下,如果workQueue(阻塞队列)未满,则新任务会暂存于阻塞队列中,若阻塞队列已满,则创建新的线程去执行新任务。
    3. 当前池中线程数量 == maximumPoolSize,若该情况下阻塞队列已满,则利用拒绝策略(handler)处理新任务。
  2. maximumPoolSize:线程池允许创建的最大线程数量。如果阻塞队列已满,而且以创建的线程数量小于最大线程数,则线程池会再创建新线程来执行新任务。
  3. keepAliveTime:若当前池中线程数量大于corePoolSize,且非核心线程处于空闲状态,则会在等待指定的时间后销毁这些非核心线程。该值就为指定的时间值。默认情况下,核心线程是不会被销毁的,当将allowCoreThreadTimeout设置为true时,核心线程也会被超时销毁。如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程利用率。
  4. unit:时间单位。可选单位如下:
    1. DAYS:天
    2. HOURS:小时
    3. MINUTES:分钟
    4. MILLISECONDS:毫秒
    5. MICROSECONDS:微妙
    6. NANOSECONDS:纳秒
  5. workQueue:阻塞队列(BlockingQueue<Runnable>)。
    1. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列。
    2. LinkedBlockingQueue:一个基于链表结构的有界阻塞队列,其默认队列容量为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQueue。
    3. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
    4. PriorityBlockingQueue:一个具有优先级的无界阻塞队列。
  6. threadFactory:线程工厂,用于创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
  7. handler:任务拒绝策略。如果当前池中线程数量达到了最大线程数量,且阻塞队列已满,则新任务会交给拒绝策略处理。可以根据场景需要来实现RejectedExecutionHandler接口自定义策略。JDK提供了四种拒绝策略:
    1. AbortPolicy:直接抛出异常。
    2. CallerRunsPolicy:使用调用者所在的线程来运行任务。
    3. DiscardOldestPolicy:丢弃队列中最前面的一个任务,并执行当前任务。
    4. DiscardPolicy:不处理,丢弃掉。

线程池的5种运行状态

  1. /**
  2. * ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,共4个字节,32个比特位。
  3. * 利用切割比特位来表示两部分信息: 线程池的运行状态 (runState占用3个比特位来表示5种状态) 和 线程池内工作线程的数量 (workerCount)
  4. * 默认为RUNNING态,有效线程数量为0
  5. */
  6. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0);
  7. private static final int COUNT_BITS = Integer.SIZE - 3; //29
  8. private static final int CAPACITY = (1 << COUNT_BITS) - 1; //00011111 11111111 11111111 11111111 == 536870911
  9. // 线程池的运行状态存储于高阶位中,5种状态利用3个比特位来表示
  10. private static final int RUNNING = -1 << COUNT_BITS; //111 00000000000000000000000000000 这是一个负数
  11. private static final int SHUTDOWN = 0 << COUNT_BITS; //000 00000000000000000000000000000
  12. private static final int STOP = 1 << COUNT_BITS; //001 00000000000000000000000000000
  13. private static final int TIDYING = 2 << COUNT_BITS; //010 00000000000000000000000000000
  14. private static final int TERMINATED = 3 << COUNT_BITS; //011 00000000000000000000000000000
  15. //~CAPACITY == 11100000 00000000 00000000 00000000,该方法用于获取线程池的运行状态runState的值
  16. private static int runStateOf(int c) { return c & ~CAPACITY; }
  17. //CAPACITY == 00011111 11111111 11111111 11111111,该方法用于获取线程池内工作线程数量workerCount的值
  18. private static int workerCountOf(int c) { return c & CAPACITY; }
  19. private static int ctlOf(int rs, int wc) { return rs | wc; } //根据runState的值和workerCount的值组合获取ctl的值
  1. RUNNING (该值为负数):能接受新提交的任务,并且也能处理阻塞队列中的任务,线程池一旦被实例化,其初始状态就是RUNNING态
  2. SHUTDOWN:不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
  3. STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
  4. TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
  5. TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
    进入TERMINATED状态需同时具备如下条件:
    1. 线程池不是RUNNING状态;
    2. 线程池状态不是TIDYING状态或TERMINATED状态;
    3. 如果线程池状态是SHUTDOWN并且workerQueue为空;
    4. workerCount为0;
    5. 设置TIDYING状态成功。

无返回提交任务execute(Runnable command)方法

  1. public void execute(Runnable command) {
  2. //先进行判空处理
  3. if (command == null)
  4. throw new NullPointerException();
  5. //获取当前ctl的值
  6. int c = ctl.get();
  7. //如果当前线程池中有效线程个数 < 核心线程个数
  8. if (workerCountOf(c) < corePoolSize) {
  9. /*创建新线程执行新任务
  10. *
  11. */
  12. if (addWorker(command, true))
  13. return;
  14. c = ctl.get();
  15. }
  16. /* 能执行到这里的条件:
  17. * 1.当前线程池中的有效线程个数 >= 核心线程个数;
  18. * 2.创建新线程执行任务(addWorker)失败。
  19. * 若当前线程池的运行状态为RUNNING态,则将当前任务添加进阻塞队列中,若队列已满,则offer方法会返回false;若该任务被成功加入到阻塞队列中,则执行下面if中的逻辑。
  20. */
  21. if (isRunning(c) && workQueue.offer(command)) {
  22. //获取ctl的值
  23. int recheck = ctl.get();
  24. //如果当前线程池的运行状态不是RUNNING态,则将刚才添加的任务从阻塞队列中移除。移除成功之后利用拒绝策略处理该任务
  25. if (!isRunning(recheck) && remove(command))
  26. reject(command);
  27. /*执行到这里的条件:
  28. * 1.当前线程池的运行状态就是RUNNING态
  29. * 2.当前线程池的运行状态已经不是RUNNING态,且阻塞队列为空
  30. *判断当前有效线程个数如果为0
  31. */
  32. else if (workerCountOf(recheck) == 0)
  33. addWorker(null, false);
  34. }
  35. /*能执行到这里的条件:
  36. * 当前线程池中的有效线程个数 >= 核心线程个数;
  37. * 且:
  38. * 1.当前线程池的运行状态不是RUNNING态
  39. * 2.就是RUNNING态但任务添加到阻塞队列中失败(阻塞队列已满)
  40. *此处执行addWorker如果返回false,则表示不能再创建工作线程了,则将任务交给拒绝策略。
  41. */
  42. else if (!addWorker(command, false))
  43. reject(command);
  44. }

需要注意的是:只有在RUNNING态的时候才可以向阻塞队列里面添加新任务。

大致的处理流程如下:

带返回提交任务Future<?> submit(Runnable task)

该方法没有在ThreadPoolExecutor中实现,而是在其父类AbstractExecutorService中实现,具体如下:

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask; //返回的是FutureTask实例对象
  6. }
  7. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  8. return new FutureTask<T>(runnable, value);
  9. }

大概关系是这样的,有一个Callable<V>接口(创建线程的其中一种方式:可以通过实现该接口)该接口定了一个V call()方法,在FutureTask<V>类中维护了一个该接口的成员变量,由于FutureTask也是间接实现了Runnable接口的,所以在该类中实现了run方法。在这个run方法中调用的就是这个Callable成员的call方法,他将结果任务的执行结果存储进FutureTask实例中。

新建一个工作线程---addWorker(Runnable firstTask,boolean core)方法

该方法的返回值表示是否已经启动工作线程,即意味着新建一个工作线程是否成功。

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get(); //读取ctl的值
  5. int rs = runStateOf(c); //获取线程池的运行状态
  6. /*判断当前线程池还是否需要执行任务
  7. *如果当前线程池的状态为RUNNING态则不会返回false
  8. *返回false的条件(大前提:当前线程池状态不是RUNNING态),在此基础下下面三个条件有任何一个不成立都会直接返回,而不新建工作线程:
  9. * 1.当前线程池的状态为SHUTDOWN态
  10. * 2.所提交任务为null
  11. * 3.阻塞队列非空
  12. */
  13. if (rs >= SHUTDOWN &&
  14. !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
  15. return false;
  16. for (;;) {
  17. //获取当前池中线程个数
  18. int wc = workerCountOf(c);
  19. /*
  20. *若当前池中线程个数 >= 2的29次方减1,则无法创建新线程。池中最大线程数量为2的29次方减1个
  21. *如果core为true则于核心先称数量进行比较,否则与最大线程数量进行比较
  22. */
  23. if (wc >= CAPACITY ||
  24. wc >= (core ? corePoolSize : maximumPoolSize))
  25. return false;
  26. //将workerCount的值加1,并跳出外层循环
  27. if (compareAndIncrementWorkerCount(c))
  28. break retry;
  29. //如果线程状态被修改,则再次执行外层循环
  30. c = ctl.get();
  31. if (runStateOf(c) != rs)
  32. continue retry;
  33. }
  34. }
  35. boolean workerStarted = false;
  36. boolean workerAdded = false;
  37. Worker w = null;
  38. try {
  39. /*
  40. *此处创建Worker实例,并将任务firstTask设置进去
  41. *注意Worker类中有两个特殊的字段:1. Runnable firstTask 2. final Thread thread
  42. *Worker类本身也继承了Runnable接口,实现了其run()方法
  43. */
  44. w = new Worker(firstTask);
  45. //这里的t是w本身表示的线程对象,而非firstTask。
  46. final Thread t = w.thread;
  47. if (t != null) {
  48. final ReentrantLock mainLock = this.mainLock;
  49. mainLock.lock();
  50. try {
  51. //获取当前线程池的运行状态rs
  52. int rs = runStateOf(ctl.get());
  53. /*
  54. *rs < SHUTDOWN的状态只有RUNNING态
  55. *能进去下面if的条件:
  56. * 1. 当前线程池运行状态为RUNNING
  57. * 2.当前线程池状态为SHUTDOWN而且firstTask为null
  58. */
  59. if (rs < SHUTDOWN ||
  60. (rs == SHUTDOWN && firstTask == null)) {
  61. if (t.isAlive())
  62. throw new IllegalThreadStateException();
  63. //HashSet<Worker> workers线程池中利用HashSet保存的worker对象
  64. workers.add(w);
  65. int s = workers.size();
  66. //largestPoolSize用来记录线程池中最大的线程数量
  67. if (s > largestPoolSize)
  68. largestPoolSize = s;
  69. //任务添加成功(线程创建成功)
  70. workerAdded = true;
  71. }
  72. }finally {
  73. mainLock.unlock();
  74. }
  75. if (workerAdded) {
  76. //启动工作线程,这里调用的是Worker类中的run()方法
  77. t.start();
  78. workerStarted = true;
  79. }
  80. }
  81. } finally {
  82. if (!workerStarted)
  83. addWorkerFailed(w);
  84. }
  85. return workerStarted;
  86. }

通过上面 方法可以看到所有的工作线程Worker对象被存入一个HashSet中(workers),所有对workers的操作都利用ReentrantLock保证线程安全。

工作线程---Worker类

  1. //ThreadPoolExecutor的内部类,继承了AQS(中断工作线程时会使用),实现了Runnable接口
  2. private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  3. private static final long serialVersionUID = 6138294804551838833L;
  4. final Thread thread; //工作线程
  5. Runnable firstTask; //要运行的初始任务,可能为null
  6. volatile long completedTasks; //线程任务计数器
  7. //唯一的构造方法
  8. Worker(Runnable firstTask) {
  9. setState(-1);
  10. this.firstTask = firstTask;
  11. //这里的thread表示自己本身
  12. this.thread = getThreadFactory().newThread(this);
  13. }
  14. //addWorker方法中的t.start()就是执行的该方法。
  15. public void run() {
  16. runWorker(this);
  17. }
  18. //当前线程是否持有独占锁
  19. protected boolean isHeldExclusively() {
  20. return getState() != 0;
  21. }
  22. //获取锁
  23. protected boolean tryAcquire(int unused) {
  24. //注意这里只有当前AQS中state字段值为0时才可能争抢到锁,该锁不支持重入
  25. if (compareAndSetState(0, 1)) {
  26. setExclusiveOwnerThread(Thread.currentThread());
  27. return true;
  28. }
  29. return false;
  30. }
  31. protected boolean tryRelease(int unused) {
  32. setExclusiveOwnerThread(null);
  33. setState(0);
  34. return true;
  35. }
  36. public void lock() { acquire(1); }
  37. public boolean tryLock() { return tryAcquire(1); }
  38. public void unlock() { release(1); }
  39. public boolean isLocked() { return isHeldExclusively(); }
  40. void interruptIfStarted() {
  41. Thread t;
  42. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  43. try {
  44. t.interrupt();
  45. } catch (SecurityException ignore) {
  46. }
  47. }
  48. }
  49. }

Worker类本来就是一个线程对象,就是为了执行提交给线程池的任务的,所以理所应当实现Runnable接口。值得注意的是Worker类还继承了AQS,实现了独占锁。所以Worker对象实例也可以看成一个锁对象。(独占锁 且 不支持重入)

执行任务---runWorker(Worker w)

  1. final void runWorker(Worker w) {
  2. //获取当前线程对象
  3. Thread wt = Thread.currentThread();
  4. //获取第一个任务
  5. Runnable task = w.firstTask;
  6. w.firstTask = null;
  7. w.unlock(); // 允许中断
  8. boolean completedAbruptly = true;
  9. try {
  10. //该工作线程中第一个任务不为null,或者 可以通过getTask()取得任务
  11. while (task != null || (task = getTask()) != null) {
  12. w.lock();
  13. if ((runStateAtLeast(ctl.get(), STOP) ||
  14. (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
  15. wt.interrupt();
  16. try {
  17. //在ThreadPoolExecutor中该方法是一个空方法
  18. beforeExecute(wt, task);
  19. Throwable thrown = null;
  20. try {
  21. //执行任务。
  22. task.run();
  23. } catch (RuntimeException x) {
  24. thrown = x; throw x;
  25. } catch (Error x) {
  26. thrown = x; throw x;
  27. } catch (Throwable x) {
  28. thrown = x; throw new Error(x);
  29. } finally {
  30. afterExecute(task, thrown);
  31. }
  32. } finally {
  33. task = null;
  34. //任务计数器加1
  35. w.completedTasks++;
  36. //释放锁
  37. w.unlock();
  38. }
  39. }
  40. //如果执行任务的过程中没有发生异常,则completedAbruptly会被赋值为false
  41. completedAbruptly = false;
  42. } finally {
  43. processWorkerExit(w, completedAbruptly);
  44. }
  45. }

注意上面的runWorker方法通过while循环不断的在获取任务来让工作线程去执行该任务,若获取的任务为null,那么当前的工作线程就会被销毁。而且在while循环中一个条语句就w.lock(),执行完一个任务才会进行unLock操作 。继承AQS是为了在中断工作线程时会用到。只不过现在可以确定的是,若工作线程正在执行任务,那么它一个是拿到w锁的。

获取任务---getTask()方法

  1. private Runnable getTask() {
  2. boolean timedOut = false;
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. /*
  7. *若当前线程池的工作状态为RUNNING则不会进入下面if。
  8. *1.若状态为STOP、TIDYING、TERMINATED则当前工作线程不能执行任务。
  9. *2.若状态为SHUTDOWN,且阻塞队列为空,则获取任务为null
  10. */
  11. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  12. //workerCount的值减1
  13. decrementWorkerCount();
  14. return null;
  15. }
  16. //获取工作线程数量
  17. int wc = workerCountOf(c);
  18. //若allowCoreThreadTimeOut设置为true 或者 当前池中工作线程数量大于核心线程数量 则timed为true
  19. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  20. //若当前工作线程数量已经超过最大线程数量,则也获取不到任务,会从该方法中返回null,进而结束该工作线程
  21. if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
  22. if (compareAndDecrementWorkerCount(c))
  23. return null;
  24. continue;
  25. }
  26. try {
  27. /*
  28. *若allowCoreThreadTimeOut设置为true 或者 当前池中工作线程数量大于核心线程数量
  29. * 则:在指定的时间内从阻塞队列中获取任务,若取不到则返回null
  30. *若allowCoreThreadTimeOut设置为false 而且 当前池中工作线程数量小于核心线程数量
  31. * 则:在指定的时间内从阻塞队列中获取任务,若取不到则一直阻塞
  32. */
  33. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
  34. if (r != null)
  35. return r;
  36. //若r == null,则此处timedOut的值被设置为true
  37. timedOut = true;
  38. } catch (InterruptedException retry) {
  39. //如果阻塞等待过程中线程发生中断,则将timeOut设置为false,进入下一次循环
  40. timedOut = false;
  41. }
  42. }
  43. }

上面getTask()方法会从阻塞队列中获取任务,取不到任务的情况如下:

  1. 若当前线程池状态为STOP、TIDYING、TERMINATED 或者 状态为SHUTDOWN且阻塞队列为空,则取不到任务。则从该方法中返回null。
  2. 从阻塞队列中获取任务时等待超时,则取不到任务。(注意:如果当前池中线程数量 <= 核心线程数量 且allowCoreThreadTimeOut为false的情况下如果当前阻塞队列为空, 则当前的工作线程会被一直阻塞,而不会从该方法中返回。)所以当线程池中的线程数量 > 核心线程数量 且allowCoreThreadTimeOut为false的情况下,对于额外的线程若没有任务可以执行的话,会等待一段时间,如果等待过后还是没有任务可以执行,那么就会从该方法中返回进而当前工作线程被销毁。从上面逻辑也可以看出来只要allowCoreThreadTimeOut被设置为true,那么空闲的核心线程也是可以要被销毁的。

关闭线程池方式一---shutdown()方法

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. //检测是否有关闭线程池的权限
  6. checkShutdownAccess();
  7. //将线程池状态设置为SHUTDOWN态
  8. advanceRunState(SHUTDOWN);
  9. //中断空闲线程(没有执行任务的线程)
  10. interruptIdleWorkers();
  11. //该方法在ThreadPoolExecutor中是一个空方法
  12. onShutdown();
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. //尝试将线程池状态设置为TERMINATED状态。
  17. tryTerminate();
  18. }

对于线程池关闭的整体逻辑:

  1. 检测是否具有关闭线程池的权限
  2. 将线程池的状态设置为SHUTDOWN态
  3. 调用interruptIdleWorkers()方法中断空闲线程,如果线程此刻正在执行任务,那么将会继续执行下去。

其中中断的逻辑如下:

  1. private void interruptIdleWorkers() {
  2. interruptIdleWorkers(false);
  3. }
  4. private void interruptIdleWorkers(boolean onlyOne) {
  5. final ReentrantLock mainLock = this.mainLock;
  6. mainLock.lock();
  7. try {
  8. for (Worker w : workers) {
  9. Thread t = w.thread;
  10. if (!t.isInterrupted() && w.tryLock()) {
  11. try {
  12. t.interrupt();
  13. } catch (SecurityException ignore) {
  14. } finally {
  15. w.unlock();
  16. }
  17. }
  18. if (onlyOne)
  19. break;
  20. }
  21. } finally {
  22. mainLock.unlock();
  23. }
  24. }
  • 线程池在获取任务时(执行getTask()方法)是通过死循环来不断获取任务,如果工作线程空闲的话,它会被阻塞队列阻塞,工作线程在阻塞过程中如果被中断则会解除阻塞再次执行循环体,在该循环中第一个if判断就是:当前线程池状态如果为SHUTDOWN态并且阻塞队列为空(必须),即没有等待执行的任务,那么该方法就会返回null,然后跳出runWorker方法的while循环,该工作线程也就会销毁。
  • 在runWorker方法中,如果工作线程取到的任务不为null,它最开始做的一个动作就是w.lock(),这样的话该工作线程会拿到一把独占锁(锁对象为w),该锁是不能重入的,执行完任务之后该工作线程才会释放这把w锁。所以,如果线程正在执行任务的过程中的话,那么该工作线程肯定是拿到了独占锁的,上面interruptIdleWorkers方法中的if条件后面是w.tryLock(),那么肯定会返回false,则该工作线程就不会被中断。所以从上面shutdown()方法来看,正在执行任务的工作线程是不会被中断的。
  • shutdown方法中断空闲线程之前线程池的状态已经被设置为SHUTDOWN状态,该状态下线程池将不再接受新提交任务。等处理完阻塞队列里面的所有任务之后,阻塞队列即为空。那么getTask()方法中:线程池状态为SHUTDOWN,阻塞队列为空时所获取的任务为null,所以shutdown()方法未中断的工作线程也会这样销毁。

关闭线程池方式二---shutdownNow()

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. //检查是否有关闭权限
  7. checkShutdownAccess();
  8. //将线程池状态设置为STOP态
  9. advanceRunState(STOP);
  10. interruptWorkers();
  11. //获取阻塞队列中等待执行的任务
  12. tasks = drainQueue();
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. tryTerminate();
  17. //获取阻塞队列中等待执行的任务
  18. return tasks;
  19. }

注意上述shutdownNow()方法和shutdown()方法在中断线程时调用的是不同的方法,shutdownNow()方法中会先将线程池状态设置为STOP态,然后中断所有工作线程,具体源码如下: 

  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers)
  6. w.interruptIfStarted();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }
  11. //该方法在Worker类中定义,该方法用于中断当前的worker实例表示的工作线程
  12. void interruptIfStarted() {
  13. Thread t;
  14. //注意这里的逻辑,getState() >= 0 : 正在执行任务的工作线程state == 1,空闲的工作线程state == 0
  15. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  16. try {
  17. t.interrupt();
  18. } catch (SecurityException ignore) {
  19. }
  20. }
  21. }

 所以shutdownNow方法的具体流程如下:

  1. 将线程池状态设置为STOP。该状态下线程池也不再接受新提交的任务。
  2. 中断所有工作线程
  3. 返回阻塞队列中等待执行的任务

shutdown方法会结束空闲线程,空闲线程在阻塞队列中被阻塞着,shutdown方法会设置中断信号,然后在getTask方法中捕获该中断信号,进而结束空闲线程。

shutdownNow方法会打断正在执行任务的工作线程,它是对所有线程都发中断信号,所以这里能打断的任务执行的前提是当前这个任务的可以捕获中断信号。如果这个任务是一个死循环没有结束期限,并且不能捕获中断信号,那么执行这个任务的工作线程永远都不会停止。因为本次任务无法响应该中断信号。如果这个任务可以结束,但是不能响应中断信号,那么也是该任务执行完毕之后才能结束当前的工作线程。

当然像那种死循环不能结束的情况,如果可以,可以将其设置为守护线程(当运行的唯一线程都是守护进程线程时,Java虚拟机将退出。)将其结束。

tryTerminate()方法

该方法中会将线程池的状态设置为TERMINATED,在shutdown()方法和shutdownNow方法的最后都调用了该方法。

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. /*
  5. *RUNNING、TIDYING、TERMINATED这三种状态都会直接返回。
  6. *SHUTDOWN状态下如果阻塞队列不为空依然会直接返回
  7. */
  8. if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
  9. return;
  10. /*
  11. *执行到这里的前提:当前状态为STOP状态 或者 当前为SHUTDOWN状态但阻塞队列为空。
  12. *这里的判断意思是:如果此时还存在工作线程
  13. */
  14. if (workerCountOf(c) != 0) {
  15. //ONLY_ONE为true,如果参数为true那么该方法内部只会中断一个空闲的线程。
  16. interruptIdleWorkers(ONLY_ONE);
  17. return;
  18. }
  19. final ReentrantLock mainLock = this.mainLock;
  20. mainLock.lock();
  21. try {
  22. //将当前线程池状态先设置为TIDYING,设置成功的前提是ctl的值未发生变化:状态或者工作线程数量
  23. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  24. try {
  25. //在ThreadPoolExecutor中该方法为一个空方法
  26. terminated();
  27. } finally {
  28. //最后将线程池状态设置为TERMINATED
  29. ctl.set(ctlOf(TERMINATED, 0));
  30. termination.signalAll();
  31. }
  32. return;
  33. }
  34. } finally {
  35. mainLock.unlock();
  36. }
  37. }
  38. }

分析完该方法之后就可以画出线程池的状态变迁图了,如下:

工作线程销毁之后---processWorkerExit方法

该方法是在每一个工作线程销毁前都要执行的。 

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. //completedAbruptly == true : 表示执行任务的过程中发生异常,导致该工作线程结束
  3. if (completedAbruptly)
  4. decrementWorkerCount();
  5. final ReentrantLock mainLock = this.mainLock;
  6. mainLock.lock();
  7. try {
  8. //在每个工作线程结束时都会将自己执行的任务数汇总给线程池
  9. completedTaskCount += w.completedTasks;
  10. //将当前工作线程对象从池子中移除,因为它将被销毁
  11. workers.remove(w);
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. //尝试将线程池状态设置为TERMINATED
  16. tryTerminate();
  17. int c = ctl.get();
  18. /*
  19. *该if条件成立的条件是:线程池状态为SHUTDOWN 或者 RUNNING
  20. * SHUTDOWN状态属于正常关闭线程池销毁线程,completedAbruptly为false
  21. * RUNNING状态属于执行任务时出现异常导致执行该任务的工作线程需要被销毁
  22. */
  23. if (runStateLessThan(c, STOP)) {
  24. //该if条件成立表示SHUTDOWN态下销毁工作线程
  25. if (!completedAbruptly) {
  26. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  27. //如果allowCoreThreadTimeOut被设置为true 而且 还有正在等待执行的任务
  28. if (min == 0 && !workQueue.isEmpty())
  29. min = 1;
  30. /*
  31. * 1.allowCoreThreadTimeOut == false(默认) :
  32. * 如果当前线程池中的工作线程数量 >= 核心线程数量,则可以直接返回
  33. * 2.allowCoreThreadTimeOut == true :
  34. * 如果当前线程池中的工作线程数量 >= 1,则可以直接返回
  35. */
  36. if (workerCountOf(c) >= min)
  37. return;
  38. }
  39. addWorker(null, false);
  40. }
  41. }

注意上述方法执行的条件:

  1. 执行任务过程中出现异常,将导致执行该任务的工作线程执行processWorkerExit方法。这种情况下是不进入if (!completedAbruptly)里面的代码块的,而是直接执行addWorker(null, false),表示创建一个新的工作线程来顶替本次意外被销毁的线程。这个新的线程会继续从阻塞队列中取任务执行。
  2. 调用shutdown()方法关闭线程池,线程池状态被设置为SHUTDOWN,这种情况下需要判断阻塞队列中是否还有正在等待执行的任务,如果有,那么结合allowCoreThreadTimeOut字段的取值来取决最后需要保留的工作线程数量。
    1. 调用shutdown方法后,如果阻塞队列中还有任务,且allowCoreThreadTimeOut == false,则再任务没有被执行完之前线程池中仍然有最少corePoolSize个工作线程。
    2. 调用shutdown方法后,如果阻塞队列中还有任务,且allowCoreThreadTimeOut == true,则最少保留一个工作线程去执行任务。
  3. 调用shutdownNow方法关闭线程池。线程池状态被设置为STOP,则直接从workers池子中移除当前的工作线程即可,不用再新建工作线程(if (runStateLessThan(c, STOP))不成立)。

awaitTermination方法

  1. public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
  2. long nanos = unit.toNanos(timeout);
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. for (;;) {
  7. //如果当前线程池状态为TERMINATED,才会返回true
  8. if (runStateAtLeast(ctl.get(), TERMINATED))
  9. return true;
  10. if (nanos <= 0)
  11. return false;
  12. /*
  13. *阻塞当前执行awaitTermination方法的线程,阻塞完毕之后还会再次循环,若线程池状态还不是TERMINATED,那么会返回false。
  14. *被唤醒说明线程池状态已经为TERMINATED。已经没有工作线程了。
  15. */
  16. nanos = termination.awaitNanos(nanos);
  17. }
  18. } finally {
  19. mainLock.unlock();
  20. }
  21. }

可以看到调用该方法,如果线程池状态不为TERMINATED,即线程池中还有工作线程,那么该方法会阻塞你指定的时间。

  1. 如果阻塞过程中所有工作线程均已销毁,线程池状态变为TERMINATED,则在tryTerminate()方法中会唤醒此处的阻塞,返回true。
  2. 如果阻塞超时了还有工作线程,那么该方法会返回false。表示还有工作线程在执行任务。

一般该方法配合shutdown()方法使用,shutdown()方法不会立刻销毁所有线程,它会在当前已提交的所有任务都执行完毕之后才销毁(所有线程都销毁后线程池状态变为TERMINATED)。那么可以调用该方法指定一个期望的时间,等任务执行结束之后再执行主方法的逻辑。

合理配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从如下几个角度来分析。

  1. 任务的性质:CPU密集型任务。IO密集型任务和混合型任务。
  2. 任务的优先级:高、中和低。
  3. 任务的执行时间:长、中和短。
  4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置N(cpu) + 1个线程的线程池。由于IO密集型的任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N(cpu)。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行拆分。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。(注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行)

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。

线程池监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数来进行监控,在监控线程池的时候可以使用一下属性。

  1. getTaskCount():获取线程池已经完成的+正在执行的+还在等待着的任务数量。
  2. completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
  3. largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数量可以知道线程池是否曾经满过。如该值等于线程池的最大大小,则表示线程池曾经满过。
  4. getPoolSize():获取当前线程池中的线程数量。
  5. getActiveCount():获取正在执行任务的线程数量。

还可以通过扩展线程池进行监控。可以通过继承线程池还自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也可以在任务执行前、任务执行后、线程池关闭前执行一些代码来监控。例如:监控任务的平均执行时间、最大执行时间和最小执行时间等。

JDK已经定义好的线程池

JDK中有一个Executor框架的工具类Executors,可以通过该类来直接使用帮我们定义好的线程池。下面简述其中三个:

FixedThreadPool(无界阻塞队列)

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

该线程池的 核心线程数量 == 最大线程数量,由于这两个值相等所以阻塞的时间就无所谓了,因为这个时间就是用来销毁核心线程外的空闲线程的,所以阻塞时间设置为0L,而且使用的阻塞队列为LinkedBlockingQueue<Ruunable>,这是一个基于链表的队列,而且其默认的队列容量为Integer.MAX_VALUE,这相当于一个无界的队列了,也正是因为使用的阻塞队列是无界的,所以maximumPoolSize将失去意义,因为当线程池达到核心线程数量时,在没有空闲线程的情况下新任务将被提交到队列里面,当队列满了才继续创建线程直到maximumPoolSize为止,所以如果队列无界,则最大线程数的配置将失去意义。而且无界队列也会导致拒绝策略执行不到。拒绝策略是当阻塞队列已满并且当前线程数量已经达到maximumPoolSize才能执行到。

SingleThreadExecutor(单个工作线程)

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

可以看到该线程池并非ThreadPoolExecutor的实例,而是FinalizeableDelegatedExecutorService的实例,该类又继承自DelegatedExecutorService类,其中除过finalize()方法外的其他方法都是在DelegatedExecutorService中实现。该线程池不可以使用ThreadPoolExecutor中的一些独有方法,但是本质上都是使用ThreadPoolExecutor。

  1. static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
  2. FinalizableDelegatedExecutorService(ExecutorService executor) {
  3. super(executor);
  4. }
  5. protected void finalize() {
  6. super.shutdown();
  7. }
  8. }
  9. static class DelegatedExecutorService extends AbstractExecutorService {
  10. private final ExecutorService e;
  11. DelegatedExecutorService(ExecutorService executor) { e = executor; }
  12. //……
  13. }

通过上面配置可看出,该线程池只有一个核心线程,最大线程数也被设置为1,所以依然阻塞时间为0L,也就是说全程只有一个线程在执行任务,只不过这个工作线程一直在复用来执行不同的任务。

CachedThreadPool(无限制创建线程)

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

该线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE(可以当成无限),阻塞时间为60秒,使用的阻塞队列没有容量。由于没有核心线程,所以该线程池中的空闲线程如果空闲60秒就会销毁。如果单个任务处理的时间比较长,那么可能导致池子中再不停的创建新线程来执行任务。该线程池执行任务不需要等,一旦提交可以马上执行。该线程池的线程复用性极为不好。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/812543
推荐阅读
相关标签
  

闽ICP备14008679号