赞
踩
参考文章:
《Java 并发编程的艺术》
7000 字 + 24 张图带你彻底弄懂线程池
(1)线程池 (ThreadPool)
是一种用于管理和复用线程的机制,它是在程序启动时就预先创建一定数量的线程,将这些线程放入一个池中,并对它们进行有效的管理和复用,从而在需要执行任务时,可以从线程池中获取一个可用线程来执行任务,任务执行完毕后线程不会被销毁而是返回线程池,以便下次使用。
(2)线程池的优点主要有以下几个方面:
(3)线程池的缺点主要有以下几个方面:
(1)通过工具类 Executors,可以创建以下 3 种类型的 ThreadPoolExecutor
:
//创建一个固定大小的线程池,大小为 3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//创建一个单线程的线程池
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//创建一个单线程的线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
(2)通过工具类 Executors,可以创建下面的 ScheduledThreadPoolExecutor
,它是一个用来在给定的延迟后运行任务或者定期执行任务的线程池,继承自 ThreadPoolExecutor
,并且其内部使用DelayedWorkQueue
作为任务队列。
//创建一个定时执行的线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
具体示例如下:
class ScheduledThreadPoolExecutorExample { public static void main(String[] args) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); // 定时任务,每隔 1 秒执行一次 executor.scheduleAtFixedRate(() -> { System.out.println("定时任务执行了:" + System.currentTimeMillis()); }, 0, 1, TimeUnit.SECONDS); // 延时任务,延迟 2 秒后执行 executor.schedule(() -> { System.out.println("延时任务执行了:" + System.currentTimeMillis()); }, 2, TimeUnit.SECONDS); // 关闭线程池 executor.shutdown(); } }
ThreadPoolExecutor
是 Executor 框架的一个具体实现,通过自定义 ThreadPoolExecutor 类可以创建更加灵活、符合需求的线程池。ThreadPoolExecutor
的 4 个构造函数如下所示,有关每个参数的具体含义可参考面试题 3。
《阿里巴巴 Java 开发手册》中有如下规定:
(1)【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
(2)【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
(3)使用 Executors.newCachedThreadPool()
方法创建线程池可能会导致以下问题:
newCachedThreadPool
是一个无界线程池,它会根据任务的需求动态地创建线程。如果任务提交的速度远远大于线程处理的速度,线程池将会不断地创建新线程,可能导致线程数量过多,从而占用大量系统资源(如内存和CPU),降低系统性能甚至引发内存溢出或系统崩溃。newCachedThreadPool
是为了快速处理任务而创建线程的,它会缓存线程一段时间以备重复利用,默认的空闲线程存活时间是 60s
。这对于长时间执行的任务来说可能不适用,因为长时间运行的任务可能需要维持相对稳定的线程池大小,而不是频繁地创建和销毁线程。因此,在使用 newCachedThreadPool
创建线程池时需要谨慎,根据实际场景合理地评估任务的特点和负载情况,选择适当的线程池类型和参数配置,以避免以上问题的发生,并保证系统的性能和稳定性。
使用下面 ThreadPoolExecutor 类的构造函数可以自定义线程池,参数最全的构造函数如下:
public class ThreadPoolExecutor extends AbstractExecutorService { //... public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } }
其中的参数含义如下所示:
参数 | 含义 |
---|---|
corePoolSize | 线程池基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建 |
maximumPoolSize | 线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务,值得注意的是,如果使用了无界的任务队列,那么这个参数就没什么效果 |
keepAliveTime | 线程活动保持时间,线程池中的线程数量大于 corePoolSize 时,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime 才会被回收销毁 |
unit | 线程活动保持时间的单位,即 keepAliveTime 的时间单位 |
workQueue | 用于保存等待执行的任务的阻塞队列,新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在任务队列中。值得注意的是,如果使用了工作队列 SynchronousQueue,那么当任务数超过线程池的核心线程数时,该任务不会进入队列 |
threadFactory | 用于设置创建线程的工厂,决定了如何创建新线程,例如可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架 guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置有意义的名字 |
handler | 当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy ,表示无法处理新任务时抛出异常 |
① 上图来源:《Java 性能调优实战》
② 上述提到的 7 个参数中,corePoolSize
、maximumPoolSize
、workQueue
这 3 个参数是核心参数。
③ 上述 workQueue 就是 BlockingQueue,有关 BlockingQueue 的相关知识可以参考 Java 并发编程面试题——BlockingQueue 这篇文章。
import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.*; class Solution { public static void main(String[] args) throws ExecutionException, InterruptedException { //自定义线程池 int corePoolSize = 2; int maximumPoolSize = 5; long keepAliveTime = 50; // keepAliveTime 的单位 TimeUnit unit = TimeUnit.MICROSECONDS; //工作队列 workQueue BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(3); //使用开源框架 guava 提供的 ThreadFactoryBuilder 可以给线程池里的线程自定义名字 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("demo-task-%d").build(); //饱和策略 RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); ThreadPoolExecutor threadsPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, blockingQueue, threadFactory, handler); //执行无返回值的任务 Runnable taskWithoutRet = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " is running"); } }; threadsPool.execute(taskWithoutRet); //执行有返回值的任务 FutureTask<Integer> taskWithRet = new FutureTask<>(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName() + " is running"); //线程睡眠 1000 ms Thread.sleep(1000); return 100; } }); threadsPool.submit(taskWithRet); System.out.println("有返回值的任务的结果为: " + taskWithRet.get()); //关闭线程池 threadsPool.shutdown(); } }
输出结果如下:
demo-task-0 is running
demo-task-1 is running
有返回值的任务的结果为: 100
在 Java 8 中可以使用 lambda 表达式来对创建任务的代码进行简化:
Runnable taskWithoutRet = () -> System.out.println(Thread.currentThread().getName() + " is running");
FutureTask<Integer> taskWithRet = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + " is running");
//线程睡眠 1000 ms
Thread.sleep(1000);
return 100;
});
与 FutureTask 有关的知识可以参考 Java 并发编程面试题——Future 这篇文章。
(1)如果线程到达 maximumPoolSize
仍然有新任务,这时线程池会执行拒绝策略。ThreadPoolExecutor
为接口 RejectedExecutionHandler
提供了以下 4 种实现,它们都是 ThreadPoolExecutor 中的静态内部类:
RejectedExecutionException
异常来拒绝新任务的处理,这也是默认的策略;(2)在 Java 线程池中,可以通过实现 RejectedExecutionHandler
接口来自定义拒绝策略。拒绝策略定义了当线程池无法接受新任务时应该采取的操作。RejectedExecutionHandler
接口有一个方法 rejectedExecution
,在线程池无法接受新任务时会调用该方法。以下是一个示例代码,演示如何自定义拒绝策略:
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { System.out.println("任务被拒绝: " + runnable.toString()); // 执行自定义的拒绝策略,可以选择抛出异常、丢弃任务或阻塞等操作 // 这里我们简单地抛出一个异常 throw new RejectedExecutionException("线程池已满,无法接受新任务!"); } } public class ThreadPoolExample { public static void main(String[] args) { // 创建线程池 int corePoolSize = 5; int maxPoolSize = 10; long keepAliveTime = 1000; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(2), new CustomRejectedExecutionHandler()); // 执行任务 for (int i = 1; i <= 15; i++) { final int taskId = i; threadPool.execute(() -> { System.out.println("执行任务: " + taskId); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } // 关闭线程池 threadPool.shutdown(); } }
某次输出如下:
执行任务: 1 执行任务: 5 执行任务: 7 执行任务: 2 执行任务: 3 执行任务: 4 执行任务: 9 执行任务: 8 执行任务: 11 任务被拒绝: com.exam.ThreadPoolExample$$Lambda$1/2093631819@4f3f5b24 执行任务: 10 Exception in thread "main" java.util.concurrent.RejectedExecutionException: 线程池已满,无法接受新任务! at com.exam.CustomRejectedExecutionHandler.rejectedExecution(Main.java:73) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.exam.ThreadPoolExample.main(Main.java:91) 执行任务: 6
在上述示例中,我们首先创建了一个自定义的 RejectedExecutionHandler
实现CustomRejectedExecutionHandler
,其中的rejectedExecution
方法简单地抛出一个 RejectedExecutionException
异常。然后,在创建线程池时,我们将上述自定义的拒绝策略传递给 ThreadPoolExecutor
的构造函数。当线程池无法接受新任务时,拒绝策略会被调用,我们在这个方法中抛出一个异常,显示任务被拒绝。你也可以根据自己的需求,实现不同的拒绝策略,例如将任务丢弃、阻塞等。请注意,当自定义拒绝策略抛出异常时,可能需要适当地处理异常,以免影响程序的执行。
(1)线程池的主要处理流程如下图所示:
(2)从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:
核心线程数 (其大小为 corePoolSize)
。如果小于,即核心线程池未满,则创建一个新的工作线程来执行任务。如果大于,即核心线程池里的线程都在执行任务,则进入下个流程;工作队列 (workQueue)
是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程;线程池的线程 (大小为 maximumPoolSize)
是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略 (RejectedExecutionHandler)
来处理这个任务;(1)线程池将任务放在阻塞队列中,并先调用非核心线程来处理任务的原因有以下几点:
(2)综上所述,在线程池中先将任务放在阻塞队列中,然后再调用非核心线程来处理任务,可以实现对线程数量的控制、平衡资源利用率、提高响应速度和避免任务丢失等好处。这样设计可以更有效地管理和利用系统资源,并提供更高效的并发处理能力。
(1)线程池的核心功能就是实现了线程的重复利用,而在线程池内部其实是被封装成一个 Worker
对象:
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//...
}
}
从上述源码可知,Worker 继承了 AQS,即有一定锁的特性。创建线程来执行任务的方法上面提到是通过 addWorker
方法创建的。在创建 Worker 对象的时候,会把线程和任务一起封装到 Worker 内部,然后调用 runWorker
方法来让线程执行任务:
public class ThreadPoolExecutor extends AbstractExecutorService { //... final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } }
(2)runWorker
方法内部使用了 while 死循环,当第一个任务执行完之后,会不断地通过 getTask
方法获取任务:
run
方法,继续执行任务,这就是线程能够复用的主要原因。processWorkerExit
方法,来将线程退出。(3)需要注意的是,因为 Worker 继承了 AQS,每次在执行任务之前都会调用 lock
方法,执行完任务之后,会调用 unlock
方法,这样做的目的在于通过 Woker 的加锁状态就能判断出当前线程是否正在运行任务。
(4)如果想知道线程是否正在运行任务,只需要调用 Woker 的 tryLock
方法,根据是否加锁成功来进行判断:加锁成功说明当前线程没有加锁,也就没有执行任务。在调用 shutdown
方法关闭线程池的时候,就是通过这种方式来判断线程有没有在执行任务,如果没有的话,来尝试打断没有执行任务的线程。
线程在执行完任务之后,会继续从 getTask
方法中获取任务,获取不到就会退出。getTask
方法的源码如下:
public class ThreadPoolExecutor extends AbstractExecutorService { //... private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } }
前面一部分是线程池的一些状态的判断,下面这行代码
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
是判断当前过来获取任务的线程是否可以超时退出。如果 allowCoreThreadTimeOut
设置为 true 或者线程池当前的线程数大于核心线程数,也就是 corePoolSize,那么该获取任务的线程就可以超时退出。那是怎么做到超时退出呢,就是这行核心代码
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
会根据是否允许超时来选择调用阻塞队列 workQueue 的 poll
方法或者 take
方法:
poll
方法,传入 keepAliveTime
,也就是构造线程池时传入的空闲时间,这个方法的意思就是从队列中阻塞 keepAliveTime 时间来获取任务,获取不到就会返回 null;从这里可以看到 keepAliveTime 是如何使用的了。所以到这里应该就知道线程池中的线程为什么可以做到空闲一定时间就退出了。其实最主要的是利用了阻塞队列的 poll 方法的实现,这个方法可以指定超时时间,一旦线程达到了 keepAliveTime 还没有获取到任务,那么就会返回 null,前面提到 getTask 方法返回 null,线程就会退出。
需要注意的是,判断当前获取任务的线程是否可以超时退出的时候,如果将 allowCoreThreadTimeOut 设置为 true,那么所有线程走到这个timed 都是 true,那么所有的线程,包括核心线程都可以做到超时退出。如果你的线程池需要将核心线程超时退出,那么可以通过allowCoreThreadTimeOut
方法将 allowCoreThreadTimeOut
变量设置为 true。
线程池提供了 shutdown
和 shutdownNow
两个方法来关闭线程池。
shutdown
方法将线程池的状态修改为 SHUTDOWN
,然后尝试打断空闲的线程(如何判断空闲,前面有提到),也就是在阻塞等待任务的线程。
public class ThreadPoolExecutor extends AbstractExecutorService { //... public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } }
shutdownNow
方法将线程池的状态修改为 STOP
,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么 shutdownNow 不能执行剩余任务的原因。
public class ThreadPoolExecutor extends AbstractExecutorService { //... public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } }
所以也可以看出 shutdown 方法和 shutdownNow 方法的主要区别就是:shutdown 之后还能处理在队列中的任务,shutdownNow 直接就将任务从队列中移除,线程池里的线程就不再处理了。
可以通过实现 RejectedExecutionHandler
接口来自定义拒绝策略,如果线程池无法执行更多的任务了,此时建议可以把任务信息持久化写入到磁盘里面去,后台专门启动一个线程,后续等待你的线程池的工作负荷降低了,再慢慢的从磁盘中读取之间持久化的任务,重新提交到线程池中去执行。
在提交一个任务到线程池里去之前,可以先将该任务的信息保存到数据库中,并记录其状态(状态包括未提交、已提交、已完成,初始时为未提交)。当该任务提交之后,更新其状态为已提交即可。系统重启之后后台线程去扫描数据库里的未提交和已提交的任务状态。可以把未提交、已提交的任务重新提交到线程池中继续执行即可。
对于这个问题,我们可以分情况进行讨论:
corePoolSize
之后,会判断如果 maximumPoolSize
大于 coolPoolSize
,此时会创建不会大于 maximumPoolSize
额外的线程处理队列中的请求。keepAliveTime
时间后会自动销毁。(1)ThreadPoolExecutor 执行 execute() 方法的执行流程如下如所示,我们可以将其看作线程池执行流程在代码中的具体表现。
(2)ThreadPoolExecutor 执行 execute 方法分下面 4 种情况:
① 如果当前运行的线程少于 corePoolSize
,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
② 如果运行的线程等于或多于 corePoolSize
,则将任务加入 BlockingQueue
。
③ 如果无法将任务加入 BlockingQueue
(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
④ 如果创建新线程将使当前运行的线程超出 maximumPoolSize
,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()
方法。
ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()
方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute() 方法调用都是执行步骤 ②,而步骤 ② 不需要获取全局锁。
初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。默认情况下创建的线程名字类似 pool-1-thread-n
这样的,没有业务含义,不利于定位问题。给线程池里的线程命名通常有下面两种方式:
ThreadPoolExecutor 的构造函数中的参数 ThreadFactory
是用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架 guava
提供的 ThreadFactoryBuilder
可以快速给线程池里的线程设置有意义的名字,代码如下:
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
threadFactory
的作用是为线程池创建新线程并提供线程的配置。通过自定义 threadFactory,可以灵活地指定线程的属性。当线程池需要创建新线程来执行任务时,会调用 threadFactory 的 newThread()
方法创建一个线程,并将其添加到线程池中。因此,通过自定义 threadFactory,我们可以对新创建的线程进行一些个性化的设置,例如线程名、优先级、是否为守护线程等。ThreadFactory
接口,以创建满足特定要求的线程。//带有前缀名称的线程工厂 public static class NamedThreadFactory implements ThreadFactory { //线程名前缀 private final String prefix; //线程编号 private final AtomicInteger threadNumber = new AtomicInteger(1); public NamedThreadFactory(String prefix) { this.prefix = prefix; } //重写 newThread 方法 @Override public Thread newThread(Runnable r) { return new Thread(null, r, prefix + threadNumber.getAndIncrement()); } }
ThreadPoolExecutor 使用变量 ctl 的高 3 位来表示线程池状态,低 29 位表示线程数量。
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
}
状态名 | 高 3 位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | 线程池创建时就是这个状态,能够接收新任务,以及对已添加的任务进行处理。 |
SHUTDOWN | 000 | N | Y | 调用shutdown方法线程池就会转换成SHUTDOWN状态,此时线程池不再接收新任务,但能继续处理已添加的任务到队列中任务。 |
STOP | 001 | N | N | 调用 shutdownNow 方法线程池就会转换成 STOP 状态,不接收新任务,也不能继续处理已添加的任务到队列中任务,并且会尝试中断正在处理的任务的线程。 |
TIDYING | 010 | N | N | SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池会变为 TIDYING 状态。线程池在 STOP 状态,线程池中执行中任务为空时,线程池会变为 TIDYING 状态。 |
TERMINATED | 011 | N | N | 线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法就会转变为 TERMINATED 状态。 |
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
。这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
(1)首先,需要明确的是线程池大小不能设置地过大或者过小:
(2)要想合理地配置线程池,可以从任务特性入手来进行分析:
PriorityBlockingQueue
来处理。它可以让优先级高的任务先执行。不过需要注意的是,如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。如何判断是 CPU 密集任务还是 I/O 密集任务?
答:简单来说,CPU 密集型任务指需要利用 CPU 计算能力的任务,例如对大量数据进行运算。而 I/O 密集型指涉及到网络读取、文件读取等的任务,其特点在于 I/O 操作完成的时间远大于 CPU 计算耗费的时间,即大部分时间都花在了等待 I/O 操作完成上。
Java 中如何获取当前 CPU 的核心数?
答:使用int cores = Runtime.getRuntime().availableProcessors();
这行代码即可获取当前 CPU 的核心数。
对于 CPU 密集型任务,线程池的大小为什么不设置为 n,而是 n + 1?
答:对于 CPU 密集型任务,在有 n 个处理器的系统上,当线程池的大小设置为 n + 1 时,能实现 CPU 的最优利用率,因为即使当 CPU 密集型的线程偶尔由于页缺失故障或者其他原因暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费,从未尽可能地利用 CPU 资源。
为什么对于 I/O 密集型任务,线程数建议设置为 2 * n?
因为在 I/O 密集型任务中,大部分时间线程都在等待外部 I/O 操作完成,而不是进行计算密集型的工作,并且用于上下文切换的时间要远小于阻塞等待的时间。例如,当线程在从磁盘读取数据或等待网络响应时,它会处于阻塞状态,CPU 资源不会被过多占用。
(1)如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性:
(2)此外,我们还可以通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute
、afterExecute
和 terminated
方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等,这几个方法在线程池里是空方法。
ThreadPoolExecutor
类中提供了以下这些方法来动态修改参数:
(1)在线程池的使用中,有一些注意事项需要考虑:
(2)关于线程数量的问题,更多不一定意味着更好。线程数量的增加会增加线程间的上下文切换开销,并且会占用更多的系统资源。当线程数量超出合理范围时,可能会导致系统负载过重,性能反而下降。因此,需要根据实际需求和系统资源进行评估,并选择适当的线程数量。通常情况下,可以依据CPU核心数和任务类型进行参考,一般不建议无限制增加线程数量。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。