赞
踩
案例:
public static void testThreadException(){
try {
new Thread(() -> {
System.out.println("===线程执行===");
throw new RuntimeException("出现异常");
}).start();
}catch (Exception e) {
System.out.println("异常:"+e.getMessage());
}
}
结果:
===线程执行===
Exception in thread "Thread-1" java.lang.RuntimeException: 出现异常
at test.SpringbootApplication.lambda$testThreadException$0(SpringbootApplication.java:53)
at java.lang.Thread.run(Thread.java:748)
可以发现线程本身可以因为出现异常而终止,但是使用try catch进行处理多线程异常是没有作用的
Thread.UncaughtExceptionHandler 这是Thread类中的接口
官方文档中的描述:
getUncaughtExceptionHandler
查询线程的UncaughtExceptionHandler
并将调用处理程序的uncaughtException
方法,将线程和异常作为参数传递。UncaughtExceptionHandler
,那么它的ThreadGroup
对象作为它的UncaughtExceptionHandler
。 如果ThreadGroup
对象对处理异常没有特殊要求,它可以将调用转发给默认的未捕获异常处理程序。ThreadGroup.uncaughtException
public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { //获取Thread中的UncaughtExceptionHandler Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); //如果实现了UncaughtExceptionHandler,那么执行实现的逻辑 if (ueh != null) { ueh.uncaughtException(t, e); } else if (!(e instanceof ThreadDeath)) { //默认的执行。可以看到线程出现异常默认都是打印这样的信息 System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); } } }
UncaughtExceptionHandler
@FunctionalInterface
public interface UncaughtExceptionHandler {
/**
* Method invoked when the given thread terminates due to the
* given uncaught exception.
* <p>Any exception thrown by this method will be ignored by the
* Java Virtual Machine.
* @param t the thread
* @param e the exception
*/
void uncaughtException(Thread t, Throwable e);
}
Thread提供了setUncaughtExceptionHandler方法来进行设置UncaughtExceptionHandler类型的对象
public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
checkAccess();
uncaughtExceptionHandler = eh;
}
具体使用:
public static void testUncaughtExceptionHandler(){
Thread t = new Thread(() -> {
System.out.println("===线程执行===");
throw new RuntimeException("出现异常");
});
t.setName("test线程");
t.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t.getName()+"执行, 异常信息:"+e.getMessage());
}
});
t.start();
}
结果:
===线程执行===
test线程执行, 异常信息:出现异常
这是Thread的静态方法用于全局的设置异常处理
public static void testDefaultUncaughtExceptionHandler(){
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t.getName()+"执行, 异常信息:"+e.getMessage());
}
});
Thread t = new Thread(() -> {
System.out.println("===线程执行===");
throw new RuntimeException("出现异常");
});
t.setName("test线程");
t.start();
}
结果:
===线程执行===
test线程执行, 异常信息:出现异常
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 2, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), r -> { Thread thread = new Thread(r); thread.setUncaughtExceptionHandler((t1, e) -> System.out.println(t1.getName()+"执行, 异常信息:"+e.getMessage())); return thread; }); public static void testThreadPoolUncaughtExceptionHandler(){ Thread thread = new Thread(() -> { System.out.println("===线程执行==="); throw new RuntimeException("出现异常"); }); threadPool.execute(thread); }
案例1:
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> test("execute"));
executorService.submit(() -> test("submit"));
}
private static void test(String name) {
System.out.println( "执行 : "+ name +" --- (线程名字:" + Thread.currentThread().getName() + ")");
throw new RuntimeException("执行 : "+ name + " --- 出现异常");
}
结果:
执行 : execute --- (线程名字:pool-1-thread-1)
执行 : submit --- (线程名字:pool-1-thread-2)
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 执行 : execute --- 出现异常
at test.thread.ExecutorsTest.test(ExecutorsTest.java:19)
at test.thread.ExecutorsTest.lambda$main$0(ExecutorsTest.java:13)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
发现execute方法执行会看到堆栈异常,submit方法是看不到堆栈异常的。
案例2:
public static void testExecutorSubmit() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(() -> {
System.out.println("执行线程池中submit方法");
throw new RuntimeException("出现异常");
});
try {
System.out.println(future.get());
} catch (InterruptedException e) {
System.out.println("异常信息:"+e.getMessage());
} catch (ExecutionException e) {
System.out.println("异常信息:"+e.getMessage());
}
}
结果:
执行线程池中submit方法
异常信息:java.lang.RuntimeException: 出现异常
可以看到submit方式执行,如果出现异常,只能在future.get()的显式捕获异常中获得到。
调用链:
execute(Runnable command) -> addWorker(Runnable firstTask, boolean core) -> run() -> runWorker(Worker w)
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; //可以看到将任务的run方法进行try catch,根据出现的 //RuntimeException、Error、Throwable来进行对应的捕获并抛出 try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
可以看到将任务的run方法进行try catch,根据出现的RuntimeException、Error、Throwable来进行对应的捕获并抛出
文档中已经说明了如果一个线程没有明确设置它的UncaughtExceptionHandler
,那么它的ThreadGroup
对象作为它的UncaughtExceptionHandler
。 如果ThreadGroup
对象对处理异常没有特殊要求,它可以将调用转发给默认的未捕获异常处理程序。
ThreadGroup#uncaughtException
public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) { ueh.uncaughtException(t, e); //由于没有设置全局的UncaughtExceptionHandler,所以会执行下面这个if条件 } else if (!(e instanceof ThreadDeath)) { System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); } } }
所以在上一个线程池的execute方法出现异常信息是
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 执行 : execute --- 出现异常
at test.thread.ExecutorsTest.test(ExecutorsTest.java:19)
at test.thread.ExecutorsTest.lambda$main$0(ExecutorsTest.java:13)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
就是在ThreadGroup#uncaughtException中设置的。
AbstractExecutorService#submit(Callable task)
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
可以看到实际调用的依旧是execute方法,但是在执行之前将task任务封装成了FutureTask,然后将这个FutureTask进行返回。
还是回到之前的runWorker(Worker w)
//代码部分省略。。。 Runnable task = w.firstTask; //代码部分省略。。。 try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } //代码部分省略。。。
这时的task为FutureTask类型,所以进入FutureTask的run()方法
public void run() { // 如果状态 state 不是 NEW,或者设置 runner 值失败 // 表示有别的线程在此之前调用 run 方法,并成功设置了 runner 值 // 保证了只有一个线程可以运行 try 代码块中的代码。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; / 只有 c 不为 null 且状态 state 为 NEW 的情况 if (c != null && state == NEW) { V result; boolean ran; try { //调用 callable 的 call 方法,并获得返回结果 result = c.call(); //运行成功 ran = true; } catch (Throwable ex) { result = null; ran = false; //设置异常结果 setException(ex); } if (ran) //设置结果 set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
可以看到执行Callable的call()方法后,如果出现异常会被捕获到,然后执行setException(ex)方法,而不是将异常抛出,这就解释了为什么submit方法看不到异常信息。
所以需要进入setException(ex)继续分析。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //任务执行完或出现异常后,将此线程进行唤醒 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
可以看到将异常赋值给了成员变量outcome,再执行finishCompletion将此线程进行唤醒,也就是将之前的FutureTask#get()方法进行唤醒。
get 方法就是阻塞获取线程执行结果,这里主要做了两个事情
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
如果当前的结果还没有被执行完,把当前线程线程和插入到等待队列。
被阻塞的线程,会等到 run 方法执行结束之后被唤醒
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; // 节点是否已添加 boolean queued = false; for (;;) { // 如果当前线程中断标志位是 true, // 那么从列表中移除节点 q,并抛出 InterruptedException 异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 当状态大于 COMPLETING 时,表示 FutureTask 任务已结束。 if (s > COMPLETING) { if (q != null) // 将节点 q 线程设置为 null,因为线程没有阻塞等待 q.thread = null; return s; } // 表示还有一些后序操作没有完成,那么当前线程让出执行权 else if (s == COMPLETING) // cannot time out yet Thread.yield(); //表示状态是 NEW,那么就需要将当前线程阻塞等待。 // 就是将它插入等待线程链表中, else if (q == null) q = new WaitNode(); else if (!queued) // 使用 CAS 函数将新节点添加到链表中,如果添加失败,那么queued 为 false, // 下次循环时,会继续添加,知道成功。 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // timed 为 true 表示需要设置超时 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 让当前线程等待 nanos 时间 LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
report 方法就是根据传入的状态值 s,来决定是抛出异常,还是返回结果值。 这个两种情况都表示 FutureTask 完结了
private V report(int s) throws ExecutionException {
//表示 call 的返回值
Object x = outcome;
// 表示正常完结状态,所以返回结果值
if (s == NORMAL)
return (V)x;
// 大于或等于 CANCELLED,都表示手动取消 FutureTask 任务,
// 所以抛出 CancellationException 异常
if (s >= CANCELLED)
throw new CancellationException();
// 否则就是运行过程中,发生了异常,这里就抛出这个异常
throw new ExecutionException((Throwable)x);
}
可以看到代码中最后一行将Throwable包装成了ExecutionException类型并抛出。
总结,通过以上分析可知submit方式执行,实际执行的是FutureTask的run,出现异常后不会抛出,而是通过FutureTask的get方法的显式异常捕获来获得,
之前分析了线程池的execute和submit方法在执行过程中出现异常的处理过程,那么当这个线程是消亡还是会被回收呢?
之前分析了submit方式异常会被FutureTask自己处理掉,线程池看不到的。所以以下分析execute方式情况
案例:
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(() -> test("execute",1));
executorService.execute(() -> test("execute",2));
executorService.execute(() -> test("execute",3));
executorService.execute(() -> test("execute",4));
executorService.execute(() -> test("execute",5));
}
private static void test(String name,int i) {
System.out.println( "执行 : "+ name +" --- (线程名字:" + Thread.currentThread().getName() + ")");
if (i == 2){
throw new RuntimeException("执行 : "+ name + " --- 出现异常");
}
}
结果:
执行 : execute --- (线程名字:pool-1-thread-1)
执行 : execute --- (线程名字:pool-1-thread-2)
执行 : execute --- (线程名字:pool-1-thread-3)
执行 : execute --- (线程名字:pool-1-thread-5)
执行 : execute --- (线程名字:pool-1-thread-6)
Exception in thread "pool-1-thread-2" java.lang.RuntimeException: 执行 : execute --- 出现异常
at test.thread.ExecutorsTest.test(ExecutorsTest.java:23)
at test.thread.ExecutorsTest.lambda$main$1(ExecutorsTest.java:14)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
依旧回到runWorker方法中
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; //可以看到将任务的run方法进行try catch,根据出现的 //RuntimeException、Error、Throwable来进行对应的捕获并抛出 try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
可以看到当任务task执行出现异常后,会执行finally中的processWorkerExit(w, completedAbruptly)
private void processWorkerExit(Worker w, boolean completedAbruptly) { //completedAbruptly为true,说明task任务执行时也就是run方法发生了异常,则需要将工作线程数减1。 //那么如果task任务正常执行的呢,那么在getTask方法中已经做了减1操作了。 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //将worker中的任务完成数量汇总到线程池中的完成任务数量 completedTaskCount += w.completedTasks; //将Set集合移除此worker workers.remove(w); } finally { mainLock.unlock(); } //尝试终止线程池,主要判断线程池是否满足终止状态条件,如果满足但还有线程,尝试进行中断。 //没有线程的话 tidying状态改为terminated状态 tryTerminate(); int c = ctl.get(); //如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个worker if (runStateLessThan(c, STOP)) { //不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker() if (!completedAbruptly) { //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个 if (workerCountOf(c) >= min) return; // replacement not needed } //添加一个没有firstTask的worker //只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量, //就新添一个worker线程,即使是shutdown状态 addWorker(null, false); } }
这个方法中有一段代码workers.remove(w);将这个线程从线程池中移除掉,所以答案是发生异常的线程会被线程池移除掉
但是看结果
执行 : execute --- (线程名字:pool-1-thread-1)
执行 : execute --- (线程名字:pool-1-thread-2)
执行 : execute --- (线程名字:pool-1-thread-3)
执行 : execute --- (线程名字:pool-1-thread-5)
执行 : execute --- (线程名字:pool-1-thread-6)
Exception in thread "pool-1-thread-2" java.lang.RuntimeException: 执行 : execute --- 出现异常
at test.thread.ExecutorsTest.test(ExecutorsTest.java:23)
at test.thread.ExecutorsTest.lambda$main$1(ExecutorsTest.java:14)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
会发现pool-1-thread-4执行了两次,而pool-1-thread-5没有了, 这是为什么?
让我们回到方法中最后一行addWorker(null, false);看着像是又添加了一个线程,继续跟踪进入分析下。
private boolean addWorker(Runnable firstTask, boolean core) { // 省略。。。 Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
关键在w = new Worker(firstTask);这一行,进入继续分析。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
案例中的线程池是默认的ThreadFactory
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
可以看出线程的名字就是在DefaultThreadFactory的newThread方法中设置的。
当thread2出现异常后会执行 addWorker -> w = new Worker(firstTask) -> newThread(Runnable r) threadNumber就会自增一次。(到这里时thread3已经执行了threadNumber已经是4了,所以这是自增后就变成了5)。所以以后再执行就跳过了4.
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class HungryDeadLockTest { private static ThreadPoolExecutor executor; public static void main(String[] args) throws InterruptedException, ExecutionException { TimeUnit unit = TimeUnit.HOURS; BlockingQueue workQueue = new LinkedBlockingQueue(); executor = new ThreadPoolExecutor(5, 5, 1000, unit, workQueue); new Thread(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(executor); }).start(); int loop = 0; while (true) { System.out.println("loop start. loop = " + (loop)); innerFutureAndOutFuture(); System.out.println("loop end. loop = " + (loop++)); Thread.sleep(10); } } public static void innerFutureAndOutFuture() throws ExecutionException, InterruptedException { Callable<String> innerCallable = new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(100); return "inner callable"; } }; Callable<String> outerCallable = new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(10); Future<String> innerFuture = executor.submit(innerCallable); String innerResult = innerFuture.get(); Thread.sleep(10); return "outer callable. inner result = " + innerResult; } }; List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { System.out.println("submit : " + i); Future<String> outerFuture = executor.submit(outerCallable); futures.add(outerFuture); } for (int i = 0; i < 10; i++) { String outerResult = futures.get(i).get(); System.out.println(outerResult + ":" + i); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。