当前位置:   article > 正文

当线程池任务抛出异常_线程池异常会不会抛出

线程池异常会不会抛出

最近在应用场景中需要用线程池开多线程,但是有时候通过日志和监控会发现,异步线程的任务突然停止,搞得我排查起来一脸懵逼,无从下手,后来师兄帮我翻业务代码才发现,原来是新线程里的任务抛出运行时异常了,导致我开的用户线程直接“跪”了。那么为什么线程池中的线程不会将异常抛出来呢,抛出异常的线程又会是什么状态呢?此贴特地分析一下

情况复现

#submit

  1. 线程执行的任务抛出异常,但是没有被处理
  2. 发生业务异常的用户线程并没有“挂掉”,而是变成了WATTING
  3. 程序还在运行

#execute

  1. 线程执行的任务抛出异常,被成功捕获
  2. 发生业务异常的用户线程直接结束,状态变成了TERMINATED
  3. 系统没有结束,还在运行

#schedule

  1. 线程执行的任务抛出异常,但是没有被处理
  2. 线程池中线程的状态为WATTING
  3. 程序继续运行,但是抛出异常的定时任务不再被执行

原因分析

#execute

对于execute来说,这个比较好理解,从源码中可以发现,线程池中的线程在执行runWorker的时候,如果任务中抛出异常,则线程会直接将异常抛出:

  1. final void runWorker(Worker w) {
  2. // ... 省略代码
  3. try {
  4. while (task != null || (task = getTask()) != null) {
  5. // ... 省略代码
  6. try {
  7. beforeExecute(wt, task);
  8. Throwable thrown = null;
  9. try {
  10. task.run();
  11. } catch (RuntimeException x) {
  12. thrown = x; throw x; // 这里抛出异常
  13. } catch (Error x) {
  14. thrown = x; throw x;
  15. } catch (Throwable x) {
  16. thrown = x; throw new Error(x);
  17. } finally {
  18. afterExecute(task, thrown);
  19. }
  20. } finally {
  21. task = null;
  22. w.completedTasks++;
  23. w.unlock();
  24. }
  25. }
  26. completedAbruptly = false;
  27. } finally {
  28. processWorkerExit(w, completedAbruptly); // 抛出异常后,执行这里
  29. }
  30. }

并且,在抛出异常后,为了避免异常任务的线程被污染,执行该任务的worker线程会被销毁,然后重新创建一个无任务非核心的worker线程。所以,即使线程池中的所有任务都失败了,只要核心线程数不为0,程序就将会一直被workQueue#poll阻塞,导致Jvm不会退出

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
  3. decrementWorkerCount();
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. completedTaskCount += w.completedTasks;
  8. workers.remove(w); // 移除当前异常Worker
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. int c = ctl.get();
  14. if (runStateLessThan(c, STOP)) { // 如果不是STOP状态
  15. if (!completedAbruptly) {
  16. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  17. if (min == 0 && ! workQueue.isEmpty())
  18. min = 1;
  19. if (workerCountOf(c) >= min) // 且线程数小于核心线程
  20. return;
  21. }
  22. addWorker(null, false); // 新增一个线程执行
  23. }
  24. }

#submit

对于submit来说,可能要相对特殊一点,因为它执行的任务是有返回值的,在一开始的提交任务的时候,线程池就通过FutureTask对其进行了封装

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null); // 封装该应用
  4. execute(ftask);
  5. return ftask;
  6. }

所以,在线程池执行runWorker的方法时,实际进入的下面的方法:

  1. public void run() {
  2. // ... 省略代码
  3. try {
  4. Callable<V> c = callable;
  5. if (c != null && state == NEW) {
  6. V result;
  7. boolean ran;
  8. try {
  9. result = c.call();
  10. ran = true;
  11. } catch (Throwable ex) {
  12. result = null;
  13. ran = false;
  14. setException(ex); // FutureTask会catch异常
  15. }
  16. if (ran)
  17. set(result);
  18. }
  19. } finally {
  20. // ... 省略代码
  21. }
  22. }

这个时候,真相就慢慢浮出水面,FutureTask会将异常封装成outcome的Object对象,在使用FuntureTask#get调用report方法的时候,将异常封装成ExecutionException重新抛出:

  1. public V get() throws InterruptedException, ExecutionException {
  2. int s = state;
  3. if (s <= COMPLETING)
  4. s = awaitDone(false, 0L);
  5. return report(s);
  6. }
  7. private V report(int s) throws ExecutionException {
  8. Object x = outcome;
  9. if (s == NORMAL)
  10. return (V)x;
  11. if (s >= CANCELLED)
  12. throw new CancellationException();
  13. throw new ExecutionException((Throwable)x); // 抛出该异常
  14. }

至于此时为啥线程池中的线程还处于WAITING状态,则是因为当前任务执行结束,Worker在调用workQueue#poll方法被阻塞了而已

#schedule

#schedule是定时线程池的执行方法,按照道理讲,即使任务抛出异常,线程池也应该定时执行该任务,那为什么任务被停止了呢?

这要回到FutureTask的子类ScheduledFutureTask中去看,我们可以发现,正常情况下,当定时任务被执行完成的时候,线程会重新把该任务放到执行队列中

  1. public void run() {
  2. boolean periodic = isPeriodic();
  3. if (!canRunInCurrentRunState(periodic))
  4. cancel(false);
  5. else if (!periodic)
  6. ScheduledFutureTask.super.run();
  7. else if (ScheduledFutureTask.super.runAndReset()) {
  8. setNextRunTime();
  9. reExecutePeriodic(outerTask);// 重新将任务放到队列中执行
  10. }
  11. }

但是我们深入进#runAndReset方法中就会发现,当任务抛出异常,#runAndReset返回的是false,此时就不会将定时任务入列了,所以,抛出异常的定时任务,是不会被重新执行的

  1. protected boolean runAndReset() {
  2. // ...省略代码
  3. boolean ran = false;
  4. int s = state;
  5. try {
  6. Callable<V> c = callable;
  7. if (c != null && s == NEW) {
  8. try {
  9. c.call(); // don't set result
  10. ran = true;
  11. } catch (Throwable ex) {
  12. setException(ex); // 抛出异常后,ran仍然是false
  13. }
  14. }
  15. } finally {
  16. // ...省略代码
  17. }
  18. return ran && s == NEW; // 此处返回false
  19. }

解决方案

  1. 在任务中捕获异常并处理,防止干扰到线程池线程的执行
  2. 如果是#execute方法的话,可以利用ThreadFactory设置处理线程unCatch异常的逻辑
  3. 如果是#submit或者是#schedule这样通过Callable的调用,则需要调用其#get方法,对任务的异常进行显示的处理

一些思考

这次的问题,只要了解线程池,还是比较简单的;相对来说,比较充分地暴露出来我基础知识不够展示的地方,以前都是背八股文,虽然在背的时候,对线程池的ctl,各种配置张口就来,但是过段时间后,在真正使用,遇见问题时还是一脸懵逼。果然有句话说得好,“看过的,只能是别人的,唯有经历了,才是自己的”~

如何关闭线程

线程池shutdown的时候,就会关闭一些线程,这个是如何做的呢?

当调用线程池的#shutdown方法的时候,线程池会遍历所有的workers,然后将每个workers中的闲置线程中断,当线程接收到中断的时候,会从#getTask的阻塞中跳出来,重新检查两个地方:

  1. 队列是否为空
  2. 当前状态是否是STOP(调用shutDownNow方法)
  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. decrementWorkerCount();
  9. return null; // 如果是shutDown则退出
  10. }
  11. // ... 省略代码
  12. }
  13. }

如果符合,则#getTask会返回null,此时,工作线程进入进入#processWorkerExit关闭当前工作线程,再进入#addWorker方法,会发现,此时也有两点:

  1. STOP状态
  2. SHUTDOWN且队列为空
  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false; // 此时便不会新增worker
  12. // ... 省略代码
  13. return workerStarted;
  14. }

如果符合,则不会再新创建线程。

核心通过中断线程,将闲置线程从#poll中解放出来,并且关闭;如果是shutDown状态,则需要等到任务执行完才可以关闭;如果是stop状态,则不会等到任务执行完,就会关闭

线程池相关知识

看完ThreadPoolExecutor的代码后,我特地抽时间画了一张图,希望可以加深自己的理解:

同理,ScheduledThreadPoolExecutor作为ThreadPoolExecutor的子类,基本上大差不差。与其最大的不同之处就在于队列:定时线程池自定义了DelayedWorkQueue队列,work线程从队列中获取任务时,队列会去判断时间,只有时间满足,才会将任务出列。同时在任务被执行完成后,定时线程池会将该任务重新入队,便于之后重新调用

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/83580
推荐阅读
相关标签
  

闽ICP备14008679号