当前位置:   article > 正文

深入理解 Java 线程池技术_java 线程池技术部

java 线程池技术部

我们在程序开发中为了“压榨”计算机的 CPU 资源,会去使用多线程来提高程序的性能,在高并发的场景下,多线程编程显得尤为重要。而在线上,我们使用多线程大部分都是通过线程池来管理。线程池是一种基于池化思想的线程管理工具,“池化思想”就是通过一个容器来实现管理和复用,避免重复创建带来的开销。

一. 线程池的作用和使用

线程池主要有两个作用:

1. 线程复用减少重复创建带来的开销且提高响应速度。

2. 合理限制和管理线程资源。

线程池的使用比较简单,初始化后调用 execute 方法即可完成任务的提交:

  1. ThreadPoolExecutor executor = new ThreadPoolExecutor(num, num, 0L, TimeUnit.SECONDS,
  2. new SynchronousQueue<>(),
  3. new ThreadFactoryBuilder().setNameFormat( "-thread_pool%d").build(),
  4. new ThreadPoolExecutor.CallerRunsPolicy());
  5. executor.execute(() -> {
  6. System.out.println("任务执行...");
  7. });

线程池核心参数

线程池的使用也比较简单,我们来看下线程池的核心实现类 - ThreadPoolExecutor 的构造方法:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler);

​​​​​​​

  • int corePoolSize:核心线程数,代表线程池中有该数量的线程的一直存在,除非调用 allowCoreThreadTimeOut 方法让核心线程也允许销毁,会在第一个任务进来时初始化核心线程。

  • int maximumPoolSize:最大线程数,代表线程池中允许存在的最大线程数量,maximumPoolSize - corePoolSize 的数量一般被称为「非核心线程数」,当无核心线程可用且「任务等待队列」中没有容量存放新任务时会触发非核心线程的创建。

  • long keepAliveTime:非核心线程空闲时存活时间,当非核心线程空闲超过该时间会被销毁。

  • TimeUnit unit:非核心线程空闲时存活时间单位。

  • BlockingQueue<Runnable> workQueue:任务等待队列(阻塞队列),当无核心线程可用时,提交的任务会被丢进此队列中。

  • ThreadFactory threadFactory:线程的创建工厂,一般我们会自己实现一个线程工厂去对线程进行命名,后续排查问题方便些。

  • RejectedExecutionHandler handler:拒绝策略,当无核心线程和非核心线程可用且阻塞队列无容量存放新任务时,会触发拒绝策略,这块我们后面再讲。

二. 线程池的原理

线程池的内部是一个生产者-消费者的模型,为了实现线程复用,将线程和任务进行解耦,提交一个任务(Runnable)后,通过一系列流转交给池内的 Worker 线程调用(Runnable)run 方法来执行任务。

线程池的生命周期

线程池内通过一个 AtomicInteger 原子类包装的 ctl 变量来维护「线程池的状态」「线程池的线程数量」两个值,高3位维护「线程池运行状态」,低29位维护「线程数量」;一个变量维护多个状态可以以较少的锁资源来保证多个状态的一致。

  1. // 初始化为 Running 状态
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

​​​​​​​

通过位运算的方式获取「线程池的运行状态」「线程数量」

  1. private static final int COUNT_BITS = Integer.SIZE - 3;
  2. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  3. // 返回「线程池的运行状态」
  4. private static int runStateOf(int c) { return c & ~CAPACITY; }
  5. // 返回「线程数量」
  6. private static int workerCountOf(int c) { return c & CAPACITY; }

线程池共有五种运行状态:

  1. private static final int RUNNING = -1 << COUNT_BITS;
  2. private static final int SHUTDOWN = 0 << COUNT_BITS;
  3. private static final int STOP = 1 << COUNT_BITS;
  4. private static final int TIDYING = 2 << COUNT_BITS;
  5. private static final int TERMINATED = 3 << COUNT_BITS;

​​​​​​​

  • RUNNING:线程池初始化的状态,能接收新任务并且可以处理阻塞队列中的任务;

  • SHUTDOWN:拒绝接收新任务,但可以继续处理阻塞队列中的任务;

  • STOP:拒绝接收新任务,不处理阻塞队列中的任务且会中断正在处理任务的线程;

  • TIDYING:当所有任务终止后且线程数为 0 后会进入该状态;

  • TERMINATED:线程池被销毁。

各状态流转如下:

线程池实现细节

我们来结合源码看下线程池中提交任务的 execute 方法实现细节:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. // 「有效线程数量」小于「核心线程数」
  6. if (workerCountOf(c) < corePoolSize) {
  7. // 创建新的线程并执行任务
  8. if (addWorker(command, true))
  9. // 成功直接返回
  10. return;
  11. // 失败可能是竞争导致,重新获取 ctl 状态
  12. c = ctl.get();
  13. }
  14. // 如果线程池是Running状态,向阻塞队列添加任务(此时池中的线程正在take阻塞获取任务)
  15. if (isRunning(c) && workQueue.offer(command)) {
  16. int recheck = ctl.get();
  17. // 添加成功后再次检查线程池状态是否是运行中
  18. if (! isRunning(recheck) && remove(command))
  19. // 如果非运行中,将任务从阻塞队列清除并使用「拒绝策略」拒绝该任务
  20. reject(command);
  21. else if (workerCountOf(recheck) == 0)
  22. addWorker(null, false);
  23. }
  24. // 阻塞队列满了尝试创建「非核心线程」
  25. else if (!addWorker(command, false))
  26. // 如果无可用「核心线程」和「非核心线程」且阻塞队列也无容量存放任务,就调用拒绝策略处理新任务
  27. reject(command);
  28. }

​​​​​​​

先简单总结下 execute 的流程:

1. 如果当前线程数小于核心线程数,先去创建线程(懒加载核心线程);

2. 如果大于等于核心线程判断线程状态是否是运行态,如果是offer进阻塞队列中;    

3. 如果无可用「核心线程」且「阻塞队列」无容量存放新任务,尝试创建非核心线程;

4. 如果无可用「非核心线程」则触发拒绝策略处理任务。

execute 的过程就是生产者生产任务的过程。

详细看下 addWorker() 方法:

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5.         int rs = runStateOf(c);
  6. // 如果线程处于 SHUTDOWN 且阻塞队列不为空返回 fasle
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. // 自旋 CAS 的方式增加线程数量
  13. for (;;) {
  14. int wc = workerCountOf(c);
  15. // 如果 core 为 true,代表本次增加的是核心线程
  16. // 如果 core 为 false,代表本次增加的是非核心线程
  17. if (wc >= CAPACITY ||
  18. wc >= (core ? corePoolSize : maximumPoolSize))
  19. // 超过对应阈值返回 false
  20. return false;
  21. if (compareAndIncrementWorkerCount(c))
  22. // CAS 成功才会跳出循环
  23. break retry;
  24. c = ctl.get();
  25. // 重读线程池运行状态,如果发生变化到最外层循环重试
  26. if (runStateOf(c) != rs)
  27. continue retry;
  28. }
  29. }
  30. boolean workerStarted = false;
  31. boolean workerAdded = false;
  32. Worker w = null;
  33. try {
  34. // 跳出循环后开始创建线程
  35. w = new Worker(firstTask);
  36. final Thread t = w.thread;
  37. if (t != null) {
  38. // 这里通过 mainLock 来保证 worker 集合
  39. // 和 largestPoolSize(线程池有过的最大线程数)的线程安全
  40. final ReentrantLock mainLock = this.mainLock;
  41. mainLock.lock();
  42. try {
  43. int rs = runStateOf(ctl.get());
  44. if (rs < SHUTDOWN ||
  45. (rs == SHUTDOWN && firstTask == null)) {
  46. if (t.isAlive())
  47. // 如果线程已运行,抛出异常
  48. throw new IllegalThreadStateException();
  49. workers.add(w); // 加入线程集合中
  50. int s = workers.size();
  51. // 记录最大线程数
  52. if (s > largestPoolSize)
  53. largestPoolSize = s;
  54. workerAdded = true;
  55. }
  56. } finally {
  57. mainLock.unlock();
  58. }
  59. // 以上操作成功,start 线程
  60. if (workerAdded) {
  61. t.start();
  62. workerStarted = true;
  63. }
  64. }
  65. } finally {
  66. // 如果线程启动失败,将该线程从线程集合清除并将 workerCount 减1
  67. if (! workerStarted)
  68. addWorkerFailed(w);
  69. }
  70. return workerStarted;
  71. }

​​​​​​​

总结下 addWorker 方法:

1. 判断线程是否处于 SHUTDOWN 且阻塞队列不为空返回,如果是直接返回 fasle;

2. 自旋将 ctl 状态的 workerCount 位以 CAS 的方式加 1;

3. 创建新的Worker,并将其加入 workers 集合中,然后记录下最大线程数;

4. 如果上述操作都成功,启动线程;

5. 如果启动失败将该线程从线程集合清除并将 workerCount 减1。

这里有个问题,workers 是用的线程不安全 HashSet 集合存储:

private final HashSet<Worker> workers = new HashSet<Worker>();

然后用的 ReentrantLock 来保证集合的线程安全,那为什么不直接用线程安全的集合呢?

其实JUC并发之父在注释里也解释的很清楚:

主要的原因有两点:

1. 防止中断风暴,在对线程池调用 shutdown 方法时,会对遍历所有 workers,对非中断状态的线程进行中断操作,如果外面不加锁,就会导致多个线程并发中断 worker,造成不必要的开销。

2. 保证对 workers 集合 和 largestPoolSize 变量两个操作的原子性。

我们来分析下线程池内线程(Worker 类)的执行流程

Worker 线程的 run 方法:

  1. public void run() {
  2. runWorker(this);
  3. }
  4. final void runWorker(Worker w) {
  5. Thread wt = Thread.currentThread();
  6. // 线程第一次执行的任务
  7. Runnable task = w.firstTask;
  8. w.firstTask = null;
  9. w.unlock(); // allow interrupts
  10. boolean completedAbruptly = true;
  11. try {
  12. // 后续「核心线程」会一直在这个while循环里
  13. while (task != null || (task = getTask()) != null) {
  14. w.lock(); // 这里加锁是为了禁止被interruptIdleWorkers方法中断
  15. if ((runStateAtLeast(ctl.get(), STOP) ||
  16. (Thread.interrupted() &&
  17. runStateAtLeast(ctl.get(), STOP))) &&
  18. !wt.isInterrupted())
  19. wt.interrupt();
  20. try {
  21. // 如果想对任务执行前做方法增强可以重写beforeExecute方法
  22. beforeExecute(wt, task);
  23. Throwable thrown = null;
  24. try {
  25. // 执行任务
  26. task.run();
  27. } catch (RuntimeException x) {
  28. thrown = x; throw x;
  29. } catch (Error x) {
  30. thrown = x; throw x;
  31. } catch (Throwable x) {
  32. thrown = x; throw new Error(x);
  33. } finally {
  34. // 如果想对任务执行后做方法增强可以重写afterExecute方法
  35. afterExecute(task, thrown);
  36. }
  37. } finally {
  38. task = null;
  39. // 记录线程执行任务数
  40. w.completedTasks++;
  41. w.unlock();
  42. }
  43. }
  44. completedAbruptly = false;
  45. } finally {
  46. // 回收线程
  47. processWorkerExit(w, completedAbruptly);
  48. }
  49. }

​​​​​​​

我们看到 runWorker 方法内部有个 while 循环不断 getTask 获取任务,只要任务不为空线程就会一直在循环里,当成功获取到任务后会加锁,这里加锁是为了保证在任务执行中不被 interruptIdleWorkers 方法中断;当获取到的任务为空就跳出循环执行线程回收方法 processWorkerExit。

getTask 方法就是线程获取任务(消费者进行消费)的主要逻辑:

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // 如果线程池处于 SHUTDOWN 但是队列不为空,继续处理
  7. // 如果线程池处于 STOP 及以上不管阻塞队列是否为空都不处理
  8. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  9. decrementWorkerCount();
  10. return null;
  11. }
  12. int wc = workerCountOf(c);
  13. // 判断 worker 是否应该被超时淘汰
  14. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  15. // 大于最大线程数量 或 worker 线程应该被回收
  16. if ((wc > maximumPoolSize || (timed && timedOut))
  17. && (wc > 1 || workQueue.isEmpty())) {
  18. if (compareAndDecrementWorkerCount(c))
  19. return null;
  20. continue;
  21. }
  22. try {
  23. // 主要在这里去获取阻塞队列里的任务,如果没有任务则阻塞线程
  24. Runnable r = timed ?
  25. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  26. workQueue.take();
  27. if (r != null)
  28. return r;
  29. timedOut = true;
  30. } catch (InterruptedException retry) {
  31. timedOut = false;
  32. }
  33. }
  34. }

​​​​​​​

其实 getTask方法就是一个尝试获取任务的方法,如果线程应该被超时淘汰则回调用阻塞队列的 poll 方法超时获取任务,时间到了也没有任务则返回 null,后续执行 processWorkerExit 方法将线程回收;如果线程不应该被超时淘汰则调用阻塞队列的 take 方法一直阻塞直到有新任务进入。

什么样的线程应该被超时淘汰?

1. 设置了 allowCoreThreadTimeOut 允许核心线程超时

2. 当前有效线程数大于设置的核心线程数(corePoolSize)

总的来说 worker 线程的执行流程就是while循环不断去尝试 getTask,没有任务且非应该被超时淘汰的线程就阻塞,返回任务为空时就回收线程。

Worker 线程何时会被回收呢?

1. 当线程池状态为 SHUTDOWN 且队列不为空时。

2. 当线程池处于 STOP 及以上状态时。

3. 当有效线程数大于核心线程数且空闲时间大于 keepAliveTime 时;

4. 当线程池设置了 allowCoreThreadTimeOut 后核心线程空闲时间大于keepAliveTime时;

线程池的拒绝策略

线程池内部提供了四种拒绝策略:

AbortPolicy:丢弃新任务并抛出异常,AbortPolicy 也是线程池的默认拒绝策略。

  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. public AbortPolicy() { }
  3. // 不执行新任务并抛出异常
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. throw new RejectedExecutionException("Task " + r.toString() +
  6. " rejected from " +
  7. e.toString());
  8. }
  9. }

​​​​​​​

DiscardPolicy:丢弃新任务,不抛出异常。

  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public DiscardPolicy() { }
  3. // 啥都不做
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. }
  6. }

​​​​​​​

DiscardOldestPolicy:丢弃目前在阻塞队列中存在最久的任务,提交新任务并执行。

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. public DiscardOldestPolicy() { }
  3. // 如果线程池处于Running,丢弃阻塞队列中存在最久的任务,提交新任务并执行
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. if (!e.isShutdown()) {
  6. e.getQueue().poll();
  7. e.execute(r);
  8. }
  9. }
  10. }

​​​​​​​

CallerRunsPolicy:交给当前线程执行新任务。

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. public CallerRunsPolicy() { }
  3. // 交给当前线程执行新任务
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. if (!e.isShutdown()) {
  6. r.run();
  7. }
  8. }
  9. }

​​​​​​​

其实我们也可以根据业务场景自己实现的一个 RejectedExecutionHandler,比如我们的任务较为重要,不希望丢掉任务,但同时又需要线程池满了告警通知到开发人员及时做调整,我们可以在 CallerRunsPolicy 的基础上加个告警或 warn 日志。

三. 线程池的妙用

线程池预热

线程池的 Worker 线程默认是懒加载的方式,线程池初始化后,内部线程并没有创建好,只有当有任务时才会去初始化 Worker 线程。如果我们是有流量激增的场景,比如在某一个时刻突然有大量请求,可以选择在初始化线程池后将核心线程进行预热。(预热也可以应用于新增缓存的场景,比如我们上线新的功能,上线后这个新的缓存暂时是没有的,如果流量激增会造成缓存雪崩

线程池内也提供了核心线程预热的方法:prestartAllCoreThreads,我们可以在线程池初始化后调用该方法:

  1. public int prestartAllCoreThreads() {
  2. int n = 0;
  3. // 添加空任务会初始化线程并执行
  4. while (addWorker(null, true))
  5. ++n;
  6. return n;
  7. }

​​​​​​​

阻塞队列的选择

一般在线程池中常用的阻塞队列有以下几个:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。

  • ArrayBlockingQueue 是基于数组实现的阻塞队列,因为是基于数组,在初始化就已经开辟好整个空间,会有内存空间浪费和内存碎片的问题,但是支持公平和非公平两种。

  • LinkedBlockingQueue 是基于链表结构实现的阻塞队列,可以合理利用内存空间,只支持公平模式,且需要在初始化时指定好队列界限,否则可能会造成OOM!

  • SynchronousQueue 是一个无容量队列,放进队列后,被 take 后才能继续放元素,否则阻塞,SynchronousQueue 常被用来做敏感不能丢失任务的线程池阻塞队列,或是应用在分布式消息队列中一个消费者多线程消费的消费速率限制。

线程池使用 ThreadLocal 的内存泄漏问题

ThreadLocal大家或多或少都有接触过,可以在 web 项目中存 user 信息,比如在前端发来请求时,在 mvc 流程后加一个 AOP 拦截器,将用户信息塞进 ThreadLocal 中,在后续的业务逻辑中使用并且能保证线程之间数据隔离。ThreadLocal 其实只是对外提供获取和设置线程本地缓存的入口,真正的存数据的是 ThreadLocal.ThreadLocalMap,ThreadLocalMap 的 key 是弱引用包装的 ThreadLocal 防止 ThreadLocal 的内存泄漏,而 value 却不是,如果线程执行完正常退出没有关系,value 最终会被可达性分析算法判断为垃圾进行回收;但如果使用线程池,核心线程基本上会和程序的生命周期保持一致,不清理就会造成value值的内存泄漏。我们一般使用 Tomcat 进行请求处理的时候内部也是用的线程池,所以在请求处理完后(返回请求前)需要调用 ThreadLocal 的 remove 方法,将本地缓存清空(如果本身使用的本地缓存就和程序生命周期一致的话,就不用remove)。

submit 方法的坑

除了 execute 方法,还有一个 submit 方法也可以向线程池投递任务,submit 任务后会立刻返回一个 FutureTask 对象,等任务执行完后会将结果塞进这个 FutureTask 中,后面主线程可以通过 FutureTask.get() 方法获取结果。当我们需要拿到所有任务的结果去执行下一步操作时,可以使用 submit 方法。

例如:

  1. ExecutorService task = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.SECONDS,
  2. new LinkedBlockingDeque<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
  3. // Future 回溯
  4. List<Future> futures = new ArrayList<>();
  5. for (int i = 0; i < 3; i++) {
  6. futures.add(task.submit(() -> {
  7. System.out.println(Thread.currentThread().getName() + " 执行中");
  8. return Thread.currentThread().getName() + " OK";
  9. }));
  10. }
  11. for (Future future: futures) {
  12. // 调用get方法拿到结果,如果没有结果会阻塞直到结果到达被唤醒
  13. System.out.println(future.get());
  14. }
  15. task.shutdown();

​​​​​​​结果:

  1. pool-1-thread-1 执行中
  2. pool-1-thread-2 执行中
  3. pool-1-thread-3 执行中
  4. pool-1-thread-1 OK
  5. pool-1-thread-2 OK
  6. pool-1-thread-3 OK

​​​​​​​

但是 submit 方法有个问题,就是当我们线程 run 方法内部出现异常并且我们还没有catch,然后我们最后还没调用 get 方法获取结果,那这个异常就会丢失!在开发中,出现异常不可怕,出现异常但丢失了才最可怕

让我们来分析下原因

submit 方法是在线程池类 ThreadPoolExecutor 继承的抽象模板类 AbstractExecutorService 内:

  1. public <T> Future<T> submit(Callable<T> task) {
  2. if (task == null) throw new NullPointerException();
  3. // 将 task 包装成一个 FutureTask 对象返回
  4. RunnableFuture<T> ftask = newTaskFor(task);
  5. execute(ftask); // 执行 FutureTask
  6. return ftask;
  7. }
  8. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  9. return new FutureTask<T>(callable);
  10. }

​​​​​​​

因为最终 execute 调用的是 ftask 的 run 方法,我们来看下 FutureTask 的 run 方法:

  1. public void run() {
  2. if (state != NEW ||
  3. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  4. null, Thread.currentThread()))
  5. return;
  6. try {
  7. Callable<V> c = callable;
  8. if (c != null && state == NEW) {
  9. V result;
  10. boolean ran;
  11. try {
  12. // 这里真正执行任务后拿到返回值
  13. result = c.call();
  14. ran = true;
  15. } catch (Throwable ex) {
  16. result = null;
  17. ran = false;
  18. // 这里 catch 异常做了处理
  19. setException(ex);
  20. }
  21. if (ran)
  22. // 塞返回值
  23. set(result);
  24. }
  25. } finally {
  26. runner = null;
  27. int s = state;
  28. if (s >= INTERRUPTING)
  29. handlePossibleCancellationInterrupt(s);
  30. }
  31. }

​​​​​​​

当任务执行完后会将返回值做下处理,我们看到在任务执行的外层 catch 了异常并调用 setException 方法做了异常处理:

  1. protected void setException(Throwable t) {
  2. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  3. outcome = t;
  4. UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
  5. finishCompletion();
  6. }
  7. }
  8. protected void set(V v) {
  9. // 修改任务状态为完成
  10. if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  11. outcome = v;
  12. UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  13. finishCompletion();
  14. }
  15. }

​​​​​​​

不管是异常还是最终结果都赋值给了 outcome 然后任务就执行结束,我们继续看下FutureTask.get() 方法:

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. if (s <= COMPLETING)
  4. // 如果任务未执行完,进行阻塞
  5. s = awaitDone(false, 0L);
  6. // 任务执行完直接返回结果
  7. return report(s);
  8. }
  9. private V report(int s) throws ExecutionException {
  10. Object x = outcome;
  11. if (s == NORMAL)
  12. // 直接将 outcome 的结果返回
  13. return (V)x;
  14. if (s >= CANCELLED)
  15. throw new CancellationException();
  16. throw new ExecutionException((Throwable)x);
  17. }

​​​​​​​

可以看到不管是异常还是结果都是通过 get() 方法才返回。赋值后就完成任务,所以如果我们没有在run方法内catch异常去处理或调用 FutureTask.get() 方法,方法内部的异常就会丢失。

解决办法也很简单,只要在提交任务的 run 方法内部 catch 住异常去处理即可(以后所有 run 方法内都 catch 异常去处理就完事了

如何保证任务不丢失以及优雅关闭线程池?

一般较为重要的任务都需要保证不丢,首先就是拒绝策略需要选择或类似 CallerRunsPolicy 的策略,其次就是在程序关闭时如何处理未执行完的任务。

阿东去年刚实习那会,关闭线程池只知道去调用 shutdown() 方法,后来发现发布时偶尔会有数据丢失的情况。我们来看下线程池的 shutdown() 方法做了哪些事情:

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. // 安全性检查
  6. checkShutdownAccess();
  7. // 将线程池状态通过自旋 CAS 修改为 SHUTDOWN
  8. advanceRunState(SHUTDOWN);
  9. // 中断 Worker 线程
  10. interruptIdleWorkers();
  11. onShutdown(); // hook for ScheduledThreadPoolExecutor
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. // 如果有效线程不为0,尝试将线程池状态改为TIDYING,TERMINATED
  16. tryTerminate();
  17. }

​​​​​​​

我们看到 shutdown 方法会将线程池状态修改为 SHUTDOWN,这个状态是拒绝新任务的,阻塞队列里的任务还是会执行的,接着判断有效线程不为 0 就尝试将线程池状态改为TIDYING、TERMINATED,如果此时还有任务没执行完是不会改为 TERMINATED 的,但是此时主线程已经执行完,整个程序就关闭了,无法等待任务都执行完,所以就会导致一定任务的丢失。

其实线程池也提供了一个阻塞等待任务执行完线程正常退出或超时的方法 awaitTermination:

  1. public boolean awaitTermination(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. for (;;) {
  8. // 如果线程池已经关闭返回true
  9. if (runStateAtLeast(ctl.get(), TERMINATED))
  10. return true;
  11. // 如果达到超时时间,返回 false,走到这里代表可能还有任务没执行完
  12. if (nanos <= 0)
  13. return false;
  14. // 调用 Condition 的 awaitNanos 方法去阻塞当前线程
  15. nanos = termination.awaitNanos(nanos);
  16. }
  17. } finally {
  18. mainLock.unlock();
  19. }
  20. }

​​​​​​​

我们看到调用 awaitTermination 方法的线程会进入一个循环里,判断如果线程池已经关闭返回true,如果达到超时时间,返回 false,走到这里代表可能还有任务没执行完,上述两个情况都不符合调用 Condition 的 awaitNanos 方法去阻塞当前线程。

被阻塞的主线程何时被唤醒?

有两种情况:

1. 当达到超时时间后会自动唤醒再次检查线程池状态并返回结果

2. 后续在任务执行完后Worker线程退出时会执行 processWorkerExit 方法,内部会调用 Condition 类型 termination 的signalAll方法唤醒线程。

看了上面的分析过程,线程池的优雅关闭就可以这么写:

  1. public static void shutdown(ExecutorService executor, String name){
  2.     log.info("{}线程池优雅关闭start", name);
  3. executor.shutdown();
  4. try {
  5. // 阻塞一会返回结果
  6. if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
  7. // 如果任务未执行完调用 shutdownNow() 方法取消任务
  8. executor.shutdownNow(); // 取消当前任务
  9. log.error("优雅关闭线程池{}:线程池未执行完剩余任务!", name);
  10. }
  11. } catch (Exception e) {
  12. executor.shutdownNow();
  13. log.error("优雅关闭线程池{}:线程池异常中断!", name);
  14. Thread.currentThread().interrupt();
  15. return;
  16. }
  17.     log.info("{}线程池优雅关闭end", name);
  18. }

​​​​​​​

我们可以将 shutdown 和 awaitTermination 配合使用关闭线程池,超时的时间可以根据我们实际场景去设置,shutdownNow() 方法会终止所有任务并将未执行的任务返回,我们可以将这些未执行的任务暂存下来,比如存到 Redis 缓存,后续对这些任务进行补偿执行。

线程池的大小如何设置?

其实线程池的大小设置没有绝对的公式,只有根据实际场景去调整,我们的应用一般分为两种,CPU 密集型 IO 密集型

  • CPU 密集型应用的 CPU 资源比较紧张(大部分都是内存计算的场景,例如阿东上篇文章《手写一个高性能分布式 ID 生成器》中的finger-mark项目),线程池不适合设置太大,可以和 CPU 的逻辑核数保持一致,尽量减少线程切换。

  • IO 密集型应用是有很多网络 IO 和磁盘 IO 的,而我们开发中的应用基本上都是 IO 密集型(CRUD项目),因为线程在等待网络 IO 和磁盘 IO 是不会占用 CPU 的,所以这里 IO 密集型应用线程数可以根据具体流量进行设置,流量小的可以设置为 CPU 的逻辑核数,流量较大甚至可以设置 CPU 的逻辑核数的四到五倍;当然线程数不是越高越好,线程太多会造成线程切换过多从而影响性能且还占用内存,建议根据实际场景进行压测找到一个最优的临界值,或者可以将线程池的参数配置进分布式配置中心(Nocas、阿波罗等),设置一个线程池满告警的阈值,比如一分钟告警100次就电话通知开发人员不停机调整线程池参数。


如果觉得文章不错可以点个赞和关注

公众号:阿东编程之路

你好,我是阿东,目前从事后端研发工作。技术更新太快需要一直增加自己的储备,索性就将学到的东西记录下来同时分享给朋友们。未来我会在此公众号分享一些技术以及学习笔记之类的。妥妥的都是干货,跟大家一起成长。


参考:

https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/812534
推荐阅读
相关标签
  

闽ICP备14008679号