赞
踩
今天群里有个同学遇到一个看似很奇怪的问题,自定义 ThreadPoolTaskExecutor
子类,重写了 execute 方法,通过 execute 方法来执行任务时打印当前线程,日志显示任务一直在调用者线程里执行 (其实并不是),似乎线程池失效了。
自定义 ThreadPoolTaskExecutor
子类
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class ThreadPoolTaskExecutorImpl extends ThreadPoolTaskExecutor {
@Override
public void execute(Runnable command) {
System.out.println("当前线程" + Thread.currentThread().getName());
super.execute(command);
}
}
编写测试代码:
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolDemo { public static void main(String[] args) throws InterruptedException { // 构造线程池 ThreadPoolTaskExecutorImpl executor = new ThreadPoolTaskExecutorImpl(); executor.initialize(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("[线程池的线程]"); // 执行任务 for (int i = 0; i < 5; i++) { executor.execute(() -> { System.out.println("测试"); }); } TimeUnit.SECONDS.sleep(3); executor.shutdown(); } }
执行结果:
当前线程main
当前线程main
测试
测试
当前线程main
测试
当前线程main
当前线程main
测试
测试
由此断定:自定义的线程池失效,在 execute 方法中获取当前线程时,并没有出现我们定义的线程名称前缀的线程,仍然使用 main 线程来执行任务。
但是,真的是这样吗?
由于很多同学没有认真思考过多线程的本质,会想当然地认为线程池的 execute
方法的所有代码都是在线程池创建的线程中执行,可是真的是这样吗?
我们知道在没有使用新线程的情况下,程序会使用当前线程(main 线程)顺序执行。
因此,在 org.example.thread.ThreadPoolTaskExecutorImpl#execute
中,打印的 “当前线程” 的代码仍然是在 main 方法中执行的。
进入 super.execute
方法
@Override
public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
executor.execute(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
这里的 executor.execute
方法实际上是 java.util.concurrent.ThreadPoolExecutor#execute
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
小于核心线程数,会执行到 `addWorker ,此时才真正创建新的线程去执行任务:
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { // 省略部分代码 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //【关键1】 创建新的线程(run 方法) w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //【关键2】启动创建的线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
我们再看下 new Worker
时做了什么。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //【关键】 使用线程工厂创建线程 this.thread = getThreadFactory().newThread(this); } //【关键】 重写 Runnable 的 run 方法,里面就封装了外面传入的 Runnable /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } // 省略其他 }
因此
(1)自定义的 ThreadPoolTaskExecutorImpl
类里重写的 execute 方法里打印的当前线程实际还是调用者线程。
(2)外面传入的 Runnable 参数最终会在 Worker 现成的 run 方法中执行到。
我们可以使用包装器模式处理下即可:
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; public class ThreadPoolTaskExecutorImpl extends ThreadPoolTaskExecutor { @Override public void execute(Runnable command) { // 当前在父线程里 super.execute(() -> { // 这里在子线程里执行 // 可以在任务前打印下当前线程名称,线程池的状态等信息 System.out.println("当前线程" + Thread.currentThread().getName()); // 原始的任务 command.run(); }); } }
测试代码:
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolDemo { public static void main(String[] args) throws InterruptedException { // 构造线程池 ThreadPoolTaskExecutorImpl executor = new ThreadPoolTaskExecutorImpl(); executor.initialize(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("[线程池的线程]"); // 执行任务 for (int i = 0; i < 5; i++) { executor.execute(() -> { System.out.println(Thread.currentThread().getName() + "-> 测试"); }); } TimeUnit.SECONDS.sleep(3); executor.shutdown(); } }
输出结果:
当前线程[线程池的线程]2
当前线程[线程池的线程]1
[线程池的线程]1-> 测试
[线程池的线程]2-> 测试
当前线程[线程池的线程]3
[线程池的线程]3-> 测试
当前线程[线程池的线程]4
[线程池的线程]4-> 测试
当前线程[线程池的线程]5
[线程池的线程]5-> 测试
该同学提问非常模糊,甚至“反复修改问题”,最终给出关键代码截图,才真正理解问题是什么。
大家请教别人时,尽量能将问题转化为别人容易理解的表达方式。
大家请教别人时,一定自己先搞清楚问题究竟是什么,而不需要别人一再追问下,才不断逼近真实的问题。
大家请教别人时,最好能够有源码或者关键信息截图等。
我们使用线程池时,总是观察到我们传入的 Runnable 是在线程池中的线程执行的,我们是使用 execute 方法来执行的,但这并不意味着 execute 方法的所有步骤都是在线程池中的线程里执行的。
学习某个技术时,要真正理解技术的本质,而不是表象。
如调用线程的 start 方法才真正启动线程,在重写的 execute 方法第一行压根就没有创建新的线程,怎么会在新的线程里执行呢?
在实际开发和验证问题时,多进行代码调试,掌握高级的调试技巧,如调试时表达式计算、条件断点、移除栈帧回退调用等。
创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。