当前位置:   article > 正文

Android 线程池详解_android threadpoolmanager

android threadpoolmanager

前言:生活是公平的,哪怕吃了很多苦,只要你坚持下去,一定会有收获,即使你失败了,你也获得了别人不具备的经历。

一、概述

1.在Android中线程的作用

线程作为CPU任务调度和执行的最小单元,线程必须依赖进程而存在,在一个进程内需要并行执行多个程序,以实现不同的功能,线程的开销比进程的小(这也是引入线程的原因)。

(1)线程是Android中很重要的概念,从用途上来看,可以分为主线程和子线程,主线程也被称为UI线程,用于运行四大组件和用户的交互,处理各种和界面相关的事情。
(2)子线程用于执行耗时操作,比如IO操作、网络请求等耗时操作,(Andorid3.0以后必须要求耗时操作网络请求必须在子线程中执行)子线程不允许更新UI。否则就会造成ANR(Application Not Responding)程序无响应。

2.什么是ANR?

ANR:Application Not Responding,表示程序在一定时间内无法响应,造成异常。

Activity响应超过5秒;
Broadcast在处理时间超过10秒;
Service处理事件超过20秒;

上面三种情况都会导致程序出现ANR,导致这种情况的出现原因有很多,网络请求,大文件读取,耗时的计算等都会引发ANR。主线程进行了过于耗时的操作,因为Activity、Broadcast、Service都是通过主线程承载的。子线程的出现就是为了解决这个问题,Android建议耗时操作必须在子线程中运行。

3.为什么会有线程池

在项目中大量的线程创建和销毁很容易导致GC的频繁执行,从而发生内存抖动的情况,而内存抖动对移动端最大的影响就是页面卡顿,线程的创建和销毁都是需要时间的,当有大量的线程创建和销毁时,这些时间比较明显,将导致性能上的缺失。

线程池就是用来解决这个问题,重用线程池中的线程,避免频繁的创建和销毁带来的性能消耗,能有效控制最大并发数,防止大量线程抢占资源导致系统阻塞。

4.线程池的优点:

(1) 重用线程池中的线程,线程执行完毕后不会立即销毁,而是等待另外的任务,这样避免了线程的频繁创建、销毁和触发GC;
(2) 有效控制线程池的最大并发数,防止大量线程抢占资源出问题;
(3) 对多线程进行统一的管理,可提供定时执行和执行间隔循环执行的功能。

二、线程池的使用

1.什么是线程池?

Android的线程池来源于Java的Executor,Executor是一个接口,真正的线程池实现为ThreadPoolExecutor,ThreadPoolExecutor提供了一系列的参数来配置,通过不同的参数来创建不同线程池。

2.构造方法

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

下面详细讲解各个参数的含义:

  • corePoolSize:           核心线程数,默认情况,线程一直存活;如果设置allowCoreThreadTimeOut = true那么核心线程就会存在超时策略,这时间间隔由keepAliveTime决定,等待时间超过keepAliveTime指定的时间后,核心线程就会被停止。
  • maximumPoolSize:  线程池所能容纳的最大线程数,超过这数,后面的任务线程就会被阻塞;
  • keepAliveTime:        非核心线程超时时的时长,当系统中非核心线程闲置时间超过这个时长后,则会被系统回收,如果设置allowCoreThreadTimeOut = true,那么这个时长同样对核心线程奏效;
  • unit:                           keepAliveTime这个参数的时间单位,有纳秒、微秒、毫秒、秒、分、时、天等,例如TimeUnit.SECONDS表示单位为秒;
  • workQueue:              线程池中的任务队列,主要用来存储已经提交但是未执行的任务,通过ThreadPoolExecutor的execute方法来提交任务;这个队列就是BlockQueue类型,属于阻塞队列,如果队列为空时,取出消息的操作就会被阻塞,直到任务加入队列中不为空的时候,才能进行取出操作,而在满队列的时候,添加操作同样被阻塞;
  • threadFactory:         线程工厂,为线程池提供创建新线程的功能,它是一个接口,只有一个方法Thread newThread(Runnable r),一般情况使用默认即可;
  • handler:                    拒绝策略,当线程无法执行新任务时(一般由于线程池中线程数量达到最大值任务队列已经饱和或者线程池被关闭导致),默认情况,当线程池无法处理新线程时,抛出RejectedExecutionException异常。

线程池执行流程原则:

1.如果线程池中的线程数未达到核心线程数,就会开启一个线程去执行任务;
2.如果线程池中的线程数已经达到核心线程数,而任务队列workQueue未满,则会将任务添加到workQueue任务队列中;
3.如果线程池中的线程数已经达到核心线程数但未超过最大线程数,而且任务队列workQueue已经满,则会开启非核心线程来执行任务;
4.如果线程池中的线程数已经达到最大线程数,那么拒绝该任务,执行饱和策略,抛出RejectedExecutionException异常。

3.相关方法

ThreadPoolExecutor有两个方法供我们执行,execute()submit()都可以执行任务。通常我们不需要返回值时使用execute()执行任务,需要返回值时使用submit()submit()方法返回一个Future,Future可以获得结果,并且可以cancel()取消请求。

  • execute():           执行任务,无返回值;
  • submit():             执行任务,返回Future,可以获得结果,并且cancel()取消请求。内部也执行execute()的逻辑;
  • shutdown():        关闭线程池,不影响已经提交的任务;
  • shutdownNow(): 关闭线程池,尝试终止正在执行的任务。

4.实例使用

我们来验证一下上面的流程原则:

  1. //1.创建线程池:核心线程数为3,最大线程数为12,非核心线程保活时间为1秒,任务队列容量为4
  2. ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 12,
  3. 1, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(4));
  4. //2.创建任务:手动创建10个任务请求
  5. for (int i = 0; i < 10; i++) {
  6. final int index = i;
  7. Runnable runnable = new Runnable() {
  8. @Override
  9. public void run() {
  10. try {
  11. Log.e(TAG, "线程:" + Thread.currentThread().getName() + "正在执行" + index + "个任务");
  12. Thread.currentThread().sleep(2000);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. };
  18. //3.线程池执行任务:将请求加入到线程池中
  19. executor.execute(runnable);
  20. }

通过new构造方法创建线程池,核心线程数为3,最大线程数为12,非核心线程保活时间为1秒,任务队列容量为4,手动创建10个任务请求,加入到线程池中;
首先打印出0,1,2说明添加了3个核心线程任务,然后3,4,5,6添加到任务队列中,任务队列只能装4个任务,这时任务队列已满,那么剩下的7,8,9任务只能放到非核心线程中执行,所以7,8,9比3,4,5,6先打印出来。任务队列的任务会等着,知道有核心线程空闲了才会执行。打印log如下:

5.线程池分类

Google官方并不推荐使用new的方法直接创建线程池,而是通过Executors方法来创建线程池,通过直接或者间接配置ThreadPoolExecutor 的参数配置构建线程池,常用的线程池有以下4中:定长线程池、定时线程池、可缓存线程池、单线程化线程池。

(1)FixedThreadPool

定长线程池:一个线程数固定的线程池,当线程空闲状态的时候,它们并不会被回收,除非线程池被关闭了,当所有线程处于活动状态的时候,新的任务会处于等待状态,直到有线程空闲才会被执行。因为只有核心线程并且线程不会被回收,核心线程没有超时机制,任务队列也没有大小限制,所以它能快速响应外界的请求。

内部实现:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

实例:创建核心线程数为3的定长线程池

  1. //1.创建定长线程池,核心线程数为3
  2. ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
  3. //2.创建10个任务请求
  4. for (int i = 0; i < 10; i++) {
  5. final int index = i;
  6. Runnable runnable = new Runnable() {
  7. @Override
  8. public void run() {
  9. try {
  10. Log.e(TAG, "线程:" + Thread.currentThread().getName() + "正在执行" + index + "个任务");
  11. Thread.sleep(2000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. };
  17. //3.线程池执行任务
  18. newFixedThreadPool.execute(runnable);
  19. }

创建10个任务请求,线程池执行任务,执行情况应该是先执行“0,1,2”三个任务,剩下的“4,5,6,7,8,9”六个任务分别添加到任务队列中,带核心线程空闲后再执行,因为核心线程数为3,所以每次执行3个任务,直到任务执行完毕。打印log如下:

(2)ScheduledThreadPool 

定时线程池:核心线程数是固定的,非核心线程数没有限制,并且非核心线程空闲的时候会立即被回收,主要用于执行定时任务和有固定周期的重复任务。

内部实现:下面中非核心线程数为Integer.MAX_VALUE,表示无穷大,没有限制。

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. public ScheduledThreadPoolExecutor(int corePoolSize) {
  5. super(corePoolSize, Integer.MAX_VALUE,
  6. DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
  7. new DelayedWorkQueue());
  8. }

实例:创建核心线程数为3个定时线程池

  1. //1.创建定时线程池,核心线程数为3,非核心线程数无限制
  2. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
  3. //2.创建任务
  4. Runnable runnable = new Runnable() {
  5. @Override
  6. public void run() {
  7. Log.e(TAG, "newScheduledThreadPool线程:" + Thread.currentThread().getName() + "正在执行任务");
  8. }
  9. };
  10. //3.定时执行任务:延迟2秒执行,每秒执行一次任务
  11. scheduledExecutorService.scheduleAtFixedRate(runnable, 2, 1, TimeUnit.SECONDS);
  12. //scheduledExecutorService.schedule(runnable, 2, TimeUnit.SECONDS);延迟2秒执行

延迟2秒,每秒执行一次,打印log如下:

(3)CachedThreadPool

可缓存线程池:它是线程数量不定的线程池,它只有非核心线程,线程数为Integer.MAX_VALUE,表示线程数无穷大,没有限制;当线程池中的任务都处于活动状态的时候,如果有新的任务来,就创建新的线程来处理新的任务,否则就会利用空闲的线程来处理,这里的线程都有超时机制,时长为60秒,超过60秒闲置的线程就会被回收,SynchronousQueue 相当于一个空队列,导致一遇到新的任务就会立即执行。适合用于大量并且耗时较少的任务。

内部实现:

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

实例:创建可缓存线程池,每秒添加一个任务

  1. ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
  2. //2.创建10个任务请求
  3. for (int i = 0; i < 10; i++) {
  4. try {
  5. //1秒,即每隔一秒执行一次
  6. Thread.sleep(1000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. final int index = i;
  11. //3.线程池执行任务
  12. newCachedThreadPool.execute(new Runnable() {
  13. @Override
  14. public void run() {
  15. try {
  16. Log.e(TAG, "newCachedThreadPool:" + Thread.currentThread().getName() + " 正在执行第" + index + "个任务");
  17. //执行任务时睡 index * 500毫秒
  18. Thread.sleep(index * 500);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. });
  24. }

上面例子中,每秒添加一个任务,每个任务执行都睡 i*500毫秒,这里演示任务执行完毕的时间,当线程不足时就开启新的线程来执行,如果有空闲的线程就会利用空闲的线程来处理任务,打印log如下:

(4)SingleThreadExecutor

单线程化线程池:它的内部有且仅有一个核心线程,他确保所有的任务都在一个线程中按照顺序执行。目的是统一外界线程到一个线程中,使得这些任务之间不需要处理线程同步的问题。不适合并发但可能引起阻塞或者影响UI线程响应的操作,比如数据库、文件操作等。

内部实现:

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

例子:创建单线程化线程池,开启10个任务请求

  1. //1.创建单线程化线程池
  2. ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
  3. //2.创建10个任务请求
  4. for (int i = 0; i < 10; i++) {
  5. final int index = i;
  6. Runnable runnable = new Runnable() {
  7. @Override
  8. public void run() {
  9. try {
  10. Log.e(TAG, "newSingleThreadExecutor线程:" + Thread.currentThread().getName() + " 正在执行" + index + "个任务");
  11. Thread.sleep(1000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. };
  17. //3.线程池执行任务
  18. newSingleThreadExecutor.execute(runnable);
  19. }

可以看到,执行10个任务,每个任务都睡了1秒,而且每次都是唯一一个线程pool-2-thread-1 执行任务,打印log如下:

总结一下几个相关线程池的分类:

类型线程类型描述场景
FixedThreadPool:
定长线程池
核心线程只有核心线程,线程数固定,空闲状态时,并不会被回收,除非线程池被关闭;
新任务处于等待状态,直到有线程空闲才会被执行;
没有超时机制,任务队列也没有大小限制,能快速响应外界的请求。
控制线程最大并发数。
ScheduledThreadPool :
定时线程池
核心+非核心线程核心线程数固定,非核心线程数没有限制,非核心线程空闲即被回收。执行定时任务和有固定周期的重复任务。
CachedThreadPool:
可缓存线程池
非核心线程线程数量不定(没有限制)的线程池,只有非核心线程;
新任务如果有空闲线程就用空闲线程执行,否则创建新的线程执行;
有超时机制,60秒,超过60秒闲置的线程会被回收;
任务队列是空队列,遇到新的任务会立即执行。
适合用于大量并且耗时较少的任务。
SingleThreadExecutor:
单线程化线程池
核心线程仅有一个核心线程,所有的任务都在一个线程中按照顺序执行;
不需要处理线程同步的问题。
单线程,不适合并发但可能引起阻塞或者影响UI线程响应的操作,比如数据库、文件操作等。

6.线程池的封装

线程池的使用一般情况下都封装成一个管理类,方便使用:

  1. /**
  2. * 线程池管理工具类
  3. */
  4. public class ThreadPoolManager {
  5. private int corePoolSize;//核心线程池的数量,同时能够执行的线程数量
  6. private int maximumPoolSize = 100;//最大线程池数量,表示当缓冲队列满的时候能继续容纳的等待任务的数量
  7. private long keepAliveTime = 30 * 60;//空闲线程存活时间30分钟
  8. private TimeUnit unit = TimeUnit.SECONDS;//单位:秒
  9. private ThreadPoolExecutor executor;//线程池实例
  10. private static ThreadPoolManager mInstance;//线程池管理类
  11. public static ThreadPoolManager getInstance() {
  12. if (mInstance == null) {//单例
  13. synchronized (ThreadPoolManager.class) {//加锁
  14. if (mInstance == null) {
  15. mInstance = new ThreadPoolManager();
  16. }
  17. }
  18. }
  19. return mInstance;
  20. }
  21. /**
  22. * 私有化构造方法
  23. */
  24. private ThreadPoolManager() {
  25. /**
  26. * 给corePoolSize赋值:当前设备可用处理器核心数*2 + 1,能够让cpu的效率得到最大程度执行(有研究论证的)
  27. */
  28. corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
  29. executor = new ThreadPoolExecutor(
  30. corePoolSize, //当某个核心任务执行完毕,会依次从缓冲队列中取出等待任务
  31. maximumPoolSize, //5,先corePoolSize,然后new LinkedBlockingQueue<Runnable>(),然后maximumPoolSize,但是它的数量是包含了corePoolSize的
  32. keepAliveTime, //表示的是maximumPoolSize当中等待任务的存活时间
  33. unit,
  34. new LinkedBlockingQueue<Runnable>(), //缓冲队列,用于存放等待任务,Linked的先进先出
  35. Executors.defaultThreadFactory(), //创建线程的工厂
  36. new ThreadPoolExecutor.AbortPolicy() //用来对超出maximumPoolSize的任务的处理策略
  37. );
  38. }
  39. /**
  40. * 执行任务
  41. */
  42. public void execute(Runnable runnable) {
  43. if (runnable == null) return;
  44. executor.execute(runnable);
  45. }
  46. /**
  47. * 从线程池中移除任务
  48. */
  49. public void remove(Runnable runnable) {
  50. if (runnable == null) return;
  51. executor.remove(runnable);
  52. }
  53. }

使用方式:

  1. //任务请求
  2. Runnable runnable = new Runnable() {
  3. @Override
  4. public void run() {
  5. try {
  6. Log.e(TAG, "线程池管理类:" + Thread.currentThread().getName() + " 正在执行任务");
  7. Thread.currentThread().sleep(2000);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. };
  13. ThreadPoolManager.getInstance().execute(runnable);//执行请求
  14. ThreadPoolManager.getInstance().remove(runnable);//移除请求

三、深入理解线程池

1.拒绝策略defaultHandler

由上面构造方法可知,可以传入自定义的拒绝策略defaultHandler,否则就是使用默认的;

  1. /**
  2. * The default rejected execution handler.
  3. */
  4. private static final RejectedExecutionHandler defaultHandler =
  5. new AbortPolicy();

表示无法执行任务的通知类, 当线程池无法执行任务,可能因为无法成功执行任务或者任务队列已满,这个时候就会调用ThreadPoolExecution的rejectedExecution(拒绝执行异常)方法通知调用者;

(1)AbortPolicy:默认值,拒绝任务时直接抛出异常和原因;

  1. /**
  2. *抛出一个拒绝执行任务的处理程序
  3. * {@code RejectedExecutionException}.
  4. */
  5. public static class AbortPolicy implements RejectedExecutionHandler {
  6. /**
  7. * Creates an {@code AbortPolicy}.
  8. */
  9. public AbortPolicy() { }
  10. /**
  11. * 总是抛出拒绝执行异常
  12. */
  13. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  14. throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + e.toString());
  15. }
  16. }

看上面AbortPolicy源码可以知道,rejectedExecution默认会抛出一个RejectedExecutionException异常来说明当前任务为什么无法执行。RejectedExecutionHandler任务拒绝策略还提供以下几种:

(2)CallerRunsPolicy :拒绝任务时,判断线程池的状态是否为SHUTDOWN,如果为true任务则会被丢弃,如果为false任务会继续执行;

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  3. if (!e.isShutdown()) {
  4. r.run();
  5. }
  6. }
  7. }

(3)DiscardPolicy:拒绝任务,什么也不会发生

  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  3. }
  4. }

(4)DiscardOldestPolicy:拒绝任务时,判断线程池的状态是否为SHUTDOWN,如果为true则任务丢弃,如果为false,将当前任务队列中等待时间最长的任务弹出,将其加入任务队列中并重试。

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  3. if (!e.isShutdown()) {
  4. e.getQueue().poll();
  5. e.execute(r);
  6. }
  7. }
  8. }

2.执行任务execute()

线程池中有两个执行的方法execute()submit(),其实两个方法的本质含义是一样的,只是有无返回值的区别。在阅读源码前我们先来看看ThreadPoolExecutor类中的几个常量的意义:

  1. /**
  2. * 获取到当前线程池的生命周期的状态
  3. */
  4. private static int runStateOf(int c) { return c & ~CAPACITY; }
  5. /**
  6. * 获取当前线程池的工作线程状态
  7. */
  8. private static int workerCountOf(int c) { return c & CAPACITY; }
  9. /**
  10. * 通过或运算拼接线程的生命周期状态和工作线程的个数
  11. */
  12. private static int ctlOf(int rs, int wc) { return rs | wc; }

(1)execute():我们来看看execute()的源码:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. //获取当前线程的生命周期对应的二进制状态码
  5. int c = ctl.get();
  6. //当前线程数量是否少于核心线程数量,如果小于就直接创建核心线程执行任务,成功则跳出方法,失败则继续往下走
  7. if (workerCountOf(c) < corePoolSize) {
  8. if (addWorker(command, true))
  9. return;
  10. c = ctl.get();
  11. }
  12. //判断线程池是否为Running状态,并且将任务添加到队列中
  13. if (isRunning(c) && workQueue.offer(command)) {
  14. int recheck = ctl.get();
  15. //审核下的线程状态如果不是Running状态则直接移除队列中
  16. if (! isRunning(recheck) && remove(command))
  17. reject(command);
  18. //如果当前线程数为0,则单独创建线程而不是指定任务
  19. else if (workerCountOf(recheck) == 0)
  20. addWorker(null, false);
  21. }
  22. //如果不满足上述条件,尝试创建一个非核心线程来执行任务,如果创建失败则会reject()
  23. else if (!addWorker(command, false))
  24. reject(command);
  25. }

execute()的流程图:

那么我们可以看出addWorker()才是创建线程(核心、非核心)的主要方法,而reject()则是一个创建线程失败的回调,看一下源码:

  1. final void reject(Runnable command) {
  2. handler.rejectedExecution(command, this);
  3. }

通过handler将通知发出去,然后针对不同类型的RejectedExecutionHandler,进行不同的处理。
(2)addWorker():我们来看看addWorker()的源码:

  1. //firstTask:需要执行的任务,也可以为null(单纯的创建线程执行任务)
  2. //core:创建的线程是否需要核心线程
  3. private boolean addWorker(Runnable firstTask, boolean core) {
  4. retry://java标识符,防止在多线程的情况下,compareAndIncrementWorkerCount()计算线程池状态出现问题
  5. for (;;) {
  6. int c = ctl.get();
  7. int rs = runStateOf(c);
  8. // Check if queue empty only if necessary.
  9. //1.线程处于stop或者即将处于stop状态
  10. //2.线程处于SHUTDOWN状态,并且传递的任务为null,队列不为空,需要增加线程,其他不需要
  11. if (rs >= SHUTDOWN &&
  12. ! (rs == SHUTDOWN &&
  13. firstTask == null &&
  14. ! workQueue.isEmpty()))
  15. return false;
  16. for (;;) {
  17. int wc = workerCountOf(c);
  18. //当前工作的线程数量是否超过最大值,或者超过核心线程数(core:表示是否需要核心线程)和最大线程数
  19. if (wc >= CAPACITY ||
  20. wc >= (core ? corePoolSize : maximumPoolSize))
  21. return false;
  22. //线程池统计是否更新成功,成功则跳出循环,继续执行,不成功则跳出外循环继续执行
  23. if (compareAndIncrementWorkerCount(c))
  24. break retry;
  25. c = ctl.get(); // Re-read ctl
  26. if (runStateOf(c) != rs)
  27. continue retry;
  28. // else CAS failed due to workerCount change; retry inner loop
  29. }
  30. }
  31. //线程创建过程,Worker是包装类,分别对创建成功和失败做了处理
  32. boolean workerStarted = false;
  33. boolean workerAdded = false;
  34. Worker w = null;
  35. try {
  36. //创建线程
  37. w = new Worker(firstTask);
  38. final Thread t = w.thread;
  39. if (t != null) {
  40. //线程加锁
  41. final ReentrantLock mainLock = this.mainLock;
  42. mainLock.lock();
  43. try {
  44. // Recheck while holding lock.
  45. // Back out on ThreadFactory failure or if
  46. // shut down before lock acquired.
  47. int rs = runStateOf(ctl.get());
  48. //线程处于Running状态或者处于SHUTDOWN状态并且任务不为空
  49. if (rs < SHUTDOWN ||
  50. (rs == SHUTDOWN && firstTask == null)) {
  51. if (t.isAlive()) // precheck that t is startable
  52. throw new IllegalThreadStateException();
  53. workers.add(w);
  54. int s = workers.size();
  55. //容量的判断
  56. if (s > largestPoolSize)
  57. largestPoolSize = s;
  58. workerAdded = true;
  59. }
  60. } finally {
  61. mainLock.unlock();
  62. }
  63. //线程增加成功,设置成功标记
  64. if (workerAdded) {
  65. t.start();
  66. workerStarted = true;
  67. }
  68. }
  69. } finally {
  70. //如果线程未启动则分发到添加线程失败
  71. if (! workerStarted)
  72. addWorkerFailed(w);
  73. }
  74. //如果添加成功返回true,添加失败返回false
  75. return workerStarted;
  76. }

我们来看看addWorkerFailed()方法:回滚线程的创建操作,如果线程的包装类Worker存在就移除掉,刷新工作线程数量,尝试终止线程操作;

  1. private void addWorkerFailed(Worker w) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. if (w != null)
  6. //移除worker
  7. workers.remove(w);
  8. //刷新线程数量
  9. decrementWorkerCount();
  10. //尝试停止操作
  11. tryTerminate();
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. }

 tryTerminate():尝试停止线程池操作,1.如果在SHUTDOWN状态并且队列为空时;2.stop状态,两种情况会将线程池进行停止操作,

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. //Running状态或者已停止状态TERMINATED或者SHUTDOWN状态并且任务队列不为空
  5. if (isRunning(c) ||
  6. runStateAtLeast(c, TIDYING) ||
  7. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  8. return;
  9. //如果线程数量不为空,终止一个线程
  10. if (workerCountOf(c) != 0) { // Eligible to terminate
  11. interruptIdleWorkers(ONLY_ONE);
  12. return;
  13. }
  14. final ReentrantLock mainLock = this.mainLock;
  15. mainLock.lock();
  16. try {
  17. //设置当前线程状态为TIDYING,如果不成功则继续循设置
  18. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  19. try {
  20. terminated();
  21. } finally {
  22. //最终线程池状态为停止状态
  23. ctl.set(ctlOf(TERMINATED, 0));
  24. //设置可重新入锁的标志,将被锁隔离的在外面等待的线程唤醒
  25. termination.signalAll();
  26. }
  27. return;
  28. }
  29. } finally {
  30. mainLock.unlock();
  31. }
  32. // else retry on failed CAS
  33. }
  34. }

 interruptIdleWorkers():设置线程中断的操作,onlyOne如果为true:只中断工作线程中的其中一个线程;如果为false:中断所有的工作线程。t.interrupt():表示中断线程。

  1. private void interruptIdleWorkers(boolean onlyOne) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers) {
  6. Thread t = w.thread;
  7. //检查线程的状态,如果未拦截并且有锁就拦截
  8. if (!t.isInterrupted() && w.tryLock()) {
  9. try {
  10. //中断线程
  11. t.interrupt();
  12. } catch (SecurityException ignore) {
  13. } finally {
  14. w.unlock();
  15. }
  16. }
  17. //如果onlyOne = true则只执行一次就跳出
  18. if (onlyOne)
  19. break;
  20. }
  21. } finally {
  22. mainLock.unlock();
  23. }
  24. }

addWorker()方法流程图:

(3)shutdown():中断所有空闲线程的方法,它的核心方法还是调用了上面的interruptIdleWorkers()方法。

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. //校验线程的状态
  6. checkShutdownAccess();
  7. //设置线程池的状态为SHUTDOWN
  8. advanceRunState(SHUTDOWN);
  9. //中断所有空闲进程,内部调用interruptIdleWorkers(false)
  10. interruptIdleWorkers();
  11. //中断所有线程可指定的操作,需要自己实现
  12. onShutdown(); // hook for ScheduledThreadPoolExecutor
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. tryTerminate();
  17. }
  18. private void interruptIdleWorkers() {
  19. interruptIdleWorkers(false);
  20. }

 

注意:1.在shutdown()被执行时,可以让现有的任务被执行关闭,但是新的任务不会再被处理;
2.如果任务线程时SHUTDOWN状态,继续调用shutdown()不会产生任何效果。

3.submit()

submit()其实还需要调用execute()去执行任务,而execute()submit()本质不同的是submit()将包装好的任务进行返回。

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. //最终还是调用execute(ftask)
  5. execute(ftask);
  6. return ftask;
  7. }
  8. //创建RunnableFuture
  9. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  10. return new FutureTask<T>(runnable, value);
  11. }
  12. //FutureTask其实就是包装了callable 和其他一些信息的类
  13. public FutureTask(Runnable runnable, V result) {
  14. this.callable = Executors.callable(runnable, result);
  15. this.state = NEW; // ensure visibility of callable
  16. }

Future<?>则是返回的结果,里面封装了请求等信息,我们可以通过cancel(boolean),true表示中断退出正在执行的任务,false则是任务可以被完成。

至此,本文结束!

 

源码地址:https://github.com/FollowExcellence/AndroidOptimizeDemo

请尊重原创者版权,转载请标明出处:https://blog.csdn.net/m0_37796683/article/details/103054999 谢谢!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/240428?site
推荐阅读
相关标签
  

闽ICP备14008679号