赞
踩
大家好,我是栗筝i,这篇文章是我的 “栗筝i 的 Java 技术栈” 专栏的第 024 篇文章,在 “栗筝i 的 Java 技术栈” 这个专栏中我会持续为大家更新 Java 技术相关全套技术栈内容。专栏的主要目标是已经有一定 Java 开发经验,并希望进一步完善自己对整个 Java 技术体系来充实自己的技术栈的同学。与此同时,本专栏的所有文章,也都会准备充足的代码示例和完善的知识点梳理,因此也十分适合零基础的小白和要准备工作面试的同学学习。当然,我也会在必要的时候进行相关技术深度的技术解读,相信即使是拥有多年 Java 开发经验的从业者和大佬们也会有所收获并找到乐趣。
–
在现代软件开发中,随着计算资源的增多和应用需求的复杂化,如何高效地管理并发任务成为一个关键问题。无论是处理海量数据的后台服务,还是需要实时响应的用户界面应用,都离不开多线程编程。Java 作为一种广泛使用的编程语言,提供了丰富的并发工具,其中线程池是实现高效并发的核心组件之一。
线程池通过重用现有的线程,减少了频繁创建和销毁线程的开销,同时能够有效地控制并发任务的数量,防止系统过载。借助线程池,开发者可以更方便地管理和调度任务,提高系统的响应速度和资源利用率。然而,正确理解和使用线程池并非易事,合理配置线程池参数、选择适当的拒绝策略、避免常见陷阱都是确保系统稳定运行的关键。
本文将深入介绍 Java 线程池的基本概念、常用类型及其适用场景,并结合实际代码示例展示如何在不同场景下合理使用线程池。希望通过本篇文章,读者能对 Java 线程池有一个全面的了解,并能在实际开发中应用自如,从而编写出更高效、更稳定的并发程序。
池化技术现在已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
线程池(Thread Pool)是一种基于池化思想管理线程的工具,由于创建和关闭线程需要花费时间,如果为每一个任务都创建一个线程,非常消耗资源。使用线程池可以避免增加创建和销毁线程的资源消耗,提高响应速度,且能重复利用线程。在使用线程池后,创建线程就变成了从线程池中获取空闲线程,关闭线程变成了向线程池归还线程。
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,使用完毕不需要销毁线程而是放回池中,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。他的主要特点为:线程复用、控制最大并发数、管理线程。
使用线程池的好处:
线程池通过复用线程来提高性能并减少线程创建和销毁的开销。
public class ThreadPoolExecutor extends AbstractExecutorService { // 省略其他方法和实现细节 ... /** * 创建一个新的 {@code ThreadPoolExecutor} 实例,使用给定的初始参数。 * * @param corePoolSize 保持在池中的线程数量,即使它们是空闲的,除非设置了 {@code allowCoreThreadTimeOut} * @param maximumPoolSize 允许在池中存在的最大线程数量 * @param keepAliveTime 当线程数量大于核心线程数时,多余的空闲线程在终止之前等待新任务的最长时间 * @param unit {@code keepAliveTime} 参数的时间单位 * @param workQueue 用于在任务执行前保存任务的队列。这个队列只会保存通过 {@code execute} 方法提交的 {@code Runnable} 任务 * @param threadFactory 当执行器创建新线程时使用的工厂 * @param handler 当执行由于线程边界和队列容量达到限制而被阻塞时使用的处理程序 * @throws IllegalArgumentException 如果以下任一情况成立:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException 如果 {@code workQueue} 或 {@code threadFactory} 或 {@code handler} 为 null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 检查传入参数的合法性 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // 初始化参数 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); // 转换 keepAliveTime 为纳秒 this.threadFactory = threadFactory; this.handler = handler; // 创建线程池容器 String name = Objects.toIdentityString(this); this.container = SharedThreadContainer.create(name); } // 省略其他方法和实现细节 ... }
在配置线程池时,有几个重要参数需要理解和配置:
核心线程数(corePoolSize):这是线程池中始终保持存活的线程数量。即使这些线程处于空闲状态,线程池也不会销毁它们;核心线程数决定了线程池在接收到任务时初始的并发处理能力;
最大线程数(maximumPoolSize):这是线程池中允许的最大线程数量。当任务队列已满且核心线程都在忙碌时,线程池会创建新线程直到达到最大线程数。这个参数决定了线程池能够处理的最大并发任务数量;
空闲线程存活时间(keepAliveTime):当线程池中的线程数超过核心线程数,且这些超出核心线程数的线程空闲时间超过了 keepAliveTime
,这些线程会被终止并从池中移除;这个参数有助于在任务负载减小时减少资源消耗;
时间单位(unit):这是 keepAliveTime
参数的时间单位,如秒(TimeUnit.SECONDS
)、毫秒(TimeUnit.MILLISECONDS
)等;
任务队列(workQueue):用于存放等待执行的任务。常见的任务队列有 LinkedBlockingQueue
、SynchronousQueue
和 ArrayBlockingQueue
等;任务队列的选择会影响线程池的行为和性能。例如,SynchronousQueue
不存储任务,而是直接将任务交给工作线程处理;
线程工厂(threadFactory):用于创建新线程。可以自定义线程工厂来设置线程的名称、优先级等。例如,可以使用 Executors.defaultThreadFactory()
来获得默认的线程工厂;
拒绝策略(RejectedExecutionHandler):当任务队列已满且线程池中的线程数量已达到最大线程数时,线程池会执行拒绝策略。
常见的拒绝策略包括:
AbortPolicy
:抛出 RejectedExecutionException
异常。CallerRunsPolicy
:由调用线程执行任务。DiscardPolicy
:直接丢弃任务,不抛出异常。DiscardOldestPolicy
:丢弃队列中最旧的任务,然后尝试提交新的任务。配置合理的线程池参数可以有效地管理资源,提高系统的并发处理能力,并确保系统的稳定性和响应速度。在实际应用中,需要根据具体的需求和工作负载进行调优。
线程池(ThreadPoolExecutor)的执行流程如下:
public class ThreadPoolExecutor extends AbstractExecutorService { // 省略其他方法和实现细节 ... /** * 在将来的某个时间执行给定的任务。任务可能在一个新的线程中执行,也可能在一个已有的线程池线程中执行。 * <p> * 如果任务不能被提交执行,无论是因为这个执行器已经关闭还是因为它的容量已经达到了极限,任务都将由当前的 * {@link RejectedExecutionHandler} 处理。 * * @param command 要执行的任务 * @throws RejectedExecutionException 在 {@code RejectedExecutionHandler} 的判断下,如果任务不能被接受执行 * @throws NullPointerException 如果 {@code command} 为 null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 分三步进行: * * 1. 如果运行的线程少于 corePoolSize,尝试启动一个新的线程并将给定的命令作为其第一个任务。 * 对 addWorker 的调用会原子地检查运行状态和工作线程数,防止在不应添加线程时误报警。 * * 2. 如果任务能够成功入队,那么我们仍然需要再次检查是否应该添加一个线程(因为现有的线程在上次检查后可能已经终止) * 或者线程池自进入此方法以来是否已关闭。因此我们重新检查状态,如果必要的话在停止时回滚入队操作, * 或者如果没有线程则启动一个新的线程。 * * 3. 如果我们不能将任务入队,那么我们尝试添加一个新的线程。如果失败,我们知道要么是已经关闭,要么是已饱和, * 因此拒绝任务。 */ // 获取线程池的状态和工作线程数 int c = ctl.get(); // 如果工作线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 尝试添加一个新的工作线程来执行任务 if (addWorker(command, true)) return; c = ctl.get(); } // 如果线程池在运行且任务成功入队 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池不再运行且任务被成功移除 if (!isRunning(recheck) && remove(command)) // 拒绝任务 reject(command); // 如果没有工作线程 else if (workerCountOf(recheck) == 0) // 添加一个新的工作线程 addWorker(null, false); // 如果不能将任务入队且添加新线程失败 } else if (!addWorker(command, false)) // 拒绝任务 reject(command); } // 省略其他方法和实现细节 ... }
以下是对这个流程的详细步骤介绍:
任务提交:通过 execute(Runnable)
方法或 submit(Callable<T>)
方法向线程池提交任务。
线程处理任务:线程池首先判断核心线程池中的线程数是否已达到 corePoolSize
。
workQueue
;任务队列:
maximumPoolSize
,则创建新的线程来处理任务。maximumPoolSize
,则执行拒绝策略(RejectedExecutionHandler
)。线程执行任务:
线程存活时间:
keepAliveTime
,则这些线程会被终止并从池中移除。图示化线程池执行流程:
任务提交 | 是否可以创建核心线程 / \ 是 否 / \ 创建核心线程 将任务放入工作队列 | 是否队列已满 / \ 否 是 / \ 放入队列 是否可以创建新线程 / \ 是 否 / \ 创建新线程 执行拒绝策略
线程池的拒绝策略(RejectedExecutionHandler)用于处理当线程池无法执行新的任务时的情况。主要有四种内置的拒绝策略,分别是:
这是默认的拒绝策略。当线程池无法处理新的任务时,它将抛出 RejectedExecutionException
。使用场景:当你希望在任务无法被执行时立即得到通知,并采取相应的措施。
示例代码:
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
当线程池无法处理新的任务时,该策略会让提交任务的线程直接运行这个任务。使用场景:当你希望减缓新任务的提交速度时,该策略可以让调用线程参与执行任务,从而降低任务提交的速率。
示例代码:
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
当线程池无法处理新的任务时,该策略会直接丢弃被拒绝的任务,不做任何处理也不抛出异常。使用场景:当你可以接受某些任务被静默丢弃且不需要额外的处理时。
示例代码:
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
当线程池无法处理新的任务时,该策略会丢弃队列中最旧的任务,然后重新尝试提交被拒绝的任务。使用场景:当你希望优先处理最新的任务,可以接受丢弃一些旧任务时。
示例代码:
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
除了上述四种内置策略外,你还可以实现 RejectedExecutionHandler
接口来创建自定义的拒绝策略。例如:
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义拒绝处理逻辑,例如记录日志或将任务放入另一个队列中
System.out.println("Task " + r.toString() + " rejected from " + executor.toString());
}
}
Java 线程池(ThreadPoolExecutor)的状态主要通过一个称为 ctl
的原子变量来表示,这个变量同时包含了线程池的运行状态和线程池中的有效线程数量。ctl
变量是一个 AtomicInteger
类型的变量,其高3位表示线程池的状态,低29位表示线程池中的线程数量。
线程池的状态主要有以下几种:
shutdown()
方法后,线程池进入该状态,不再接受新任务,但会继续处理队列中的任务。shutdownNow()
方法后,线程池进入该状态,不再接受新任务,并且会中断正在处理的任务和清空队列中的任务。terminated()
方法。terminated()
方法已经完成执行,线程池完全终止。ctl
变量的高3位表示线程池状态,低29位表示工作线程数量。以下是几个常量的定义:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
通过这些常量的定义,可以看出线程池状态的高 3 位是如何设置的。
线程池在不同状态之间的转换流程如下:
shutdown()
方法后,线程池状态从 RUNNING 转变为 SHUTDOWN,线程池不再接受新任务,但会继续处理已提交的任务。shutdownNow()
方法后,线程池状态从 RUNNING 或 SHUTDOWN 转变为 STOP,线程池不再接受新任务,并中断所有正在执行的任务。terminated()
方法执行完毕后,线程池状态从 TIDYING 转变为 TERMINATED。以下是线程池状态转换的示例代码:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); taskQueue.clear(); } finally { mainLock.unlock(); } return super.shutdownNow(); }
在 shutdown()
方法中,通过 advanceRunState(SHUTDOWN)
将状态转变为 SHUTDOWN
,在 shutdownNow()
方法中,通过 advanceRunState(STOP)
将状态转变为 STOP
。
在 Java 中,Executors
提供了一些便捷的方法来创建常用的线程池,但在实际开发中,为了避免潜在的问题和更好地控制线程池的行为,通常建议直接使用 ThreadPoolExecutor
来创建线程池。下面是对一些常用线程池的介绍和它们可能存在的问题:
newSingleThreadExecutor:
创建一个单线程的线程池,只有一个线程在工作,所有任务按照提交顺序执行。如果唯一的线程因为异常终止,会有一个新线程替代它继续工作。使用场景:适用于需要确保顺序执行任务的场景。
代码示例:
ExecutorService executor = Executors.newSingleThreadExecutor();
newFixedThreadPool:
描述:创建一个固定大小的线程池。每次提交一个任务就创建一个线程,直到达到线程池的最大大小。线程池大小达到最大值后,将继续保持固定大小,如果某个线程因为异常终止,会补充一个新线程。使用场景:适用于需要限制并发线程数,控制资源使用的场景。
代码示例:
ExecutorService executor = Executors.newFixedThreadPool(10);
newCachedThreadPool
描述:创建一个可缓存的线程池。如果线程池中有空闲线程可以重用,则会重用空闲线程;如果没有空闲线程,则创建新线程。空闲线程会在60秒没有任务执行时被终止并移除。使用场景:适用于执行许多短期异步任务的小程序,或者负载较轻的服务器。
代码示例:
ExecutorService executor = Executors.newCachedThreadPool();
newScheduledThreadPool
描述:创建一个支持定时和周期性任务执行的线程池。可以用于需要定时执行任务的场景。使用场景:适用于需要定时或者周期性执行任务的场景。
代码示例:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
使用 Executors 的弊端:阿里巴巴 Java 开发手册建议不要使用 Executors
创建线程池,而是直接使用 ThreadPoolExecutor
,主要是为了避免以下问题:
newFixedThreadPool
和 newSingleThreadExecutor
:问题:使用无界的请求队列 LinkedBlockingQueue
,可能会导致请求堆积,耗尽内存,甚至导致 OutOfMemoryError
。
newCachedThreadPool
和 newScheduledThreadPool
:问题:允许创建的线程数最大值为 Integer.MAX_VALUE
,可能会创建大量线程,耗尽系统资源,甚至导致 OutOfMemoryError
。
为了更好地控制线程池的行为,建议使用 ThreadPoolExecutor
直接创建线程池。这样可以更明确地控制线程池的核心参数,如核心线程数、最大线程数、空闲线程存活时间、任务队列等。
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor
,Executors
,ExecutorService
,ThreadPoolExecutor
,Callable
和 Future
、FutureTask
这几个类
Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。
this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。
ThreadPoolExecutor 是线程池的核心实现。线程的创建和终止需要很大的开销,线程池中预先提供了指定数量的可重用线程,所以使用线程池会节省系统资源,并且每个线程池都维护了一些基础的数据统计,方便线程的管理和监控。
通过下面的demo来了解ThreadPoolExecutor创建线程的过程。
public class TestThreadPool { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5)); ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(threadPoolExecutor); for (int i = 0; i < 20; i++) { try { executorCompletionService.submit(()-> { try { //System.out.println("---"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } },"testtask"+i); System.out.print(" New task: testtask" + i); System.out.print(" ActiveCount: " + threadPoolExecutor.getActiveCount()); System.out.print(" poolSize: " + threadPoolExecutor.getPoolSize()); System.out.print(" queueSize: " + threadPoolExecutor.getQueue().size()); System.out.println(" taskCount: " + threadPoolExecutor.getTaskCount()); } catch (RejectedExecutionException e) { System.out.println("Reject:" + i); } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } threadPoolExecutor.shutdown(); } }
Java 中的线程池核心实现类是 ThreadPoolExecutor
,先通过 JDK 1.8 中 ThreadPoolExecutor
的 UML 类图,了解下 ThreadPoolExecutor
的继承关系。
ThreadPoolExecutor
实现的顶层接口是 Executor
,顶层接口 Executor
提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable
对象,将任务的运行逻辑提交到执行器(Executor
)中,由 Executor
框架完成线程的调配和任务的执行部分。ExecutorService
接口增加了一些能力:
Future
的方法;AbstractExecutorService
则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类 ThreadPoolExecutor
实现最复杂的运行部分,ThreadPoolExecutor
将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。