赞
踩
目录
干货分享,感谢您的阅读!
Java线程池是Java中用于管理和重用线程的机制。它可以有效地管理线程的创建、调度和销毁,以提高应用程序的性能和资源利用率。
线程池的主要目的是避免频繁创建和销毁线程的开销,并控制并发线程的数量,防止资源过度占用和系统过载。通过使用线程池,可以将任务提交给线程池执行,而不是直接创建新线程来执行每个任务。线程池会维护一组预先创建的线程,并在有任务时将任务分配给其中的线程进行执行。
Java提供了java.util.concurrent.Executors类来创建线程池。常见的线程池实现是ThreadPoolExecutor类,它允许配置线程池的大小、任务队列、线程工厂和拒绝策略等参数。
线程池的优点包括:
Java线程池的常见使用场景包括处理多线程任务、并发执行大量独立的任务、优化资源使用等。
在使用线程池时,以下是一些建议的最佳实践方式:
这些是一些常见的建议和最佳实践,可以帮助我们正确地使用线程池并优化应用程序的并发性能。根据具体的应用场景和需求,可能还会有其他特定的建议。在实际使用中,建议根据情况进行适当的调整和优化。
如果需要更高级的配置和更多的灵活性,或者对线程池的行为和执行细节有特定的要求,那么直接使用ThreadPoolExecutor类可能是更合适的选择。ThreadPoolExecutor提供了更多的参数和配置选项,可以满足更复杂的需求。
以下是一些情况下可能需要直接使用ThreadPoolExecutor的示例:
综上所述,对于一般的使用情况,使用Executors工厂类创建线程池是简单和推荐的方式。但对于特定的需求和更高级的配置,直接使用ThreadPoolExecutor类可以提供更大的灵活性和控制能力。根据具体的情况和需求,选择适合的方式使用线程池。
假设我们有一个电商平台,用户可以提交订单,并且每个订单的处理需要一定的时间。在订单提交后,我们希望通过线程池并发地处理这些订单,以提高处理效率。
在这个场景中,我们可以使用线程池来处理订单,具体步骤如下:
通过使用线程池,我们可以控制并发处理订单的线程数量,并且可以根据系统的负载情况进行动态调整。这可以提高订单处理的效率和性能,并且有效地利用系统的资源。
需要注意的是,实际的业务应用中,还可能需要考虑更多的因素,例如订单处理的顺序性、任务优先级、错误处理等。在实际场景中,需要根据具体需求和业务逻辑进行适当的调整和扩展。
下面是一个简单的示例代码:
- package org.zyf.javabasic.thread.base;
-
- import lombok.Data;
-
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- /**
- * @author yanfengzhang
- * @description
- * @date 2021/5/1 17:47
- */
- public class OrderProcessor {
- // 创建线程池
- private ExecutorService executor;
-
- public OrderProcessor() {
- // 创建一个固定大小的线程池,核心线程数为5
- executor = Executors.newFixedThreadPool(5);
- }
-
- public void processOrder(Order order) {
- // 提交订单处理任务给线程池
- executor.execute(() -> {
- // 执行订单处理的逻辑
- // ...
- System.out.println("Processing order: " + order.getId());
- // ...
- });
- }
-
- public void shutdown() {
- // 关闭线程池
- executor.shutdown();
- }
-
- @Data
- static class Order{
- private int id;
-
- Order(int id) {
- this.id = id;
- }
- }
-
- public static void main(String[] args) {
- OrderProcessor orderProcessor = new OrderProcessor();
- // 模拟提交订单
- for (int i = 1; i <= 10; i++) {
- Order order = new Order(i);
- orderProcessor.processOrder(order);
- }
-
- // 关闭线程池
- orderProcessor.shutdown();
- }
- }
上述示例代码中,OrderProcessor类是用来处理订单的业务类。在构造函数中,我们创建了一个固定大小为5的线程池。processOrder()方法用于提交订单处理任务给线程池,其中使用了Lambda表达式来定义任务的逻辑。在这个例子中,我们只是简单地打印订单的ID来表示订单处理的逻辑。
在main()方法中,我们模拟了10个订单的提交,通过调用processOrder()方法将订单提交给线程池处理。最后,在适当的时候,我们调用shutdown()方法来关闭线程池。
现在增加一个使用ThreadPoolExecutor处理任务的代码示例。
- package org.zyf.javabasic.thread.base;
-
- import lombok.Data;
-
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
- /**
- * @author yanfengzhang
- * @description
- * @date 2021/5/1 17:56
- */
- public class TaskProcessor {
- private ThreadPoolExecutor executor;
-
- public TaskProcessor() {
- // 创建ThreadPoolExecutor实例,进行自定义配置
- // 核心线程数
- int corePoolSize = 5;
- // 最大线程数
- int maxPoolSize = 10;
- // 非核心线程的空闲超时时间
- long keepAliveTime = 60;
- // 任务队列容量
- int queueCapacity = 100;
-
- executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(queueCapacity)
- );
- // 设置拒绝策略
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
- }
-
- public void processTask(Task task) {
- // 提交任务给线程池
- executor.execute(() -> {
- // 执行任务的逻辑
- // ...
- System.out.println("Processing task: " + task.getId());
- // ...
- });
- }
-
- @Data
- static class Task {
- private int id;
-
- Task(int id) {
- this.id = id;
- }
- }
-
- public void shutdown() {
- // 关闭线程池
- executor.shutdown();
- }
-
- public static void main(String[] args) {
- TaskProcessor taskProcessor = new TaskProcessor();
- // 模拟提交任务
- for (int i = 1; i <= 10; i++) {
- Task task = new Task(i);
- taskProcessor.processTask(task);
- }
- // 关闭线程池
- taskProcessor.shutdown();
- }
- }
在上述示例代码中,TaskProcessor类使用ThreadPoolExecutor来处理任务。在构造函数中,我们通过传递参数来配置线程池的核心线程数、最大线程数、非核心线程的空闲超时时间和任务队列的容量。processTask()方法用于提交任务给线程池,其中使用了Lambda表达式来定义任务的逻辑。在这个示例中,我们只是简单地打印任务的ID来表示任务的处理逻辑。在main()方法中,我们模拟了10个任务的提交,通过调用processTask()方法将任务提交给线程池处理。最后,在适当的时候,我们调用shutdown()方法来关闭线程池。
线程池是多线程编程中常用的一种设计模式,用于管理和复用线程,以提高程序的性能和资源利用率。下面是线程池的核心设计和实现介绍:
核心组件
核心参数
执行流程
拒绝策略
当任务队列已满且线程池中的线程数达到最大线程数时,新的任务无法被执行,此时需要采用一定的策略来处理这些被拒绝的任务。常见的拒绝策略包括:
线程池状态
以上是线程池的核心设计和实现介绍。使用线程池可以简化线程的管理和调度,提高多线程程序的性能和资源利用率。根据具体的业务需求,可以调整线程池的参数和拒绝策略来适应不同的场景。
本部分直接通过来展示Java线程池实现原理及其在美团业务中的实践 - 美团技术团队整理
Java中的线程池核心实现类是ThreadPoolExecutor,基于JDK 1.8的源码来分析Java线程池的核心设计与实现。首先分析ThreadPoolExecutor的UML类图,了解ThreadPoolExecutor的继承关系。
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力:
AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?其运行机制如下图所示:
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
线程池的运行主要分成两部分:任务管理、线程管理。
任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。
线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl
这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。
通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
- private static int runStateOf(int c) {
- //计算当前运行状态
- return c & ~CAPACITY;
- }
- private static int workerCountOf(int c) {
- //计算当前线程数量
- return c & CAPACITY;
- }
- private static int ctlOf(int rs, int wc) {
- //通过状态和线程数生成ctl
- return rs | wc;
- }
其生命周期转换如下入所示:
图中ThreadPoolExecutor的运行状态有5种,介绍如下:
运行状态 | 状态描述 |
RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务。 |
SHUTDOWN | 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。 |
STOP | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。 |
TIDYING | 所有的任务都已终止了,workerCount (有效线程数) 为0 |
TERMINATED | 在terminated() 方法执行完后进入该状态 |
任务调度
任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
其执行流程如下图所示:
任务缓冲
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:
使用不同的队列可以实现不一样的任务存取策略。阻塞队列成员如下:
名称 | 描述 |
ArrayBlockingQueue | 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。 |
LinkedBlockingQueue | 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,所以默认创建的该队列有容量危险。 |
LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。 |
SynchronousQueue | 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。 |
DelayQueue | 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。 |
PriorityBlockingQueue | 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。 |
LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。 |
任务申请
由上文的任务分配部分可知,任务的执行有两种可能:
第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:
getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。
任务拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
拒绝策略是一个接口,其设计如下:
- public interface RejectedExecutionHandler {
- void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
- }
用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种拒绝策略,其特点如下:
名称 | 描述 |
ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。 这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。 |
ThreadPoolExecutor.CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务。这种情况是需要让所有任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕。 |
ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常。 使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。 |
ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新提交被拒绝的任务。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。 |
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。我们来看一下它的部分代码:
- private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
- final Thread thread;//Worker持有的线程
- Runnable firstTask;//初始化的任务,可以为null
- }
Worker工作线程实现了Runnable接口,并持有一个线程thread,一个初始化任务firstTask。
Worker执行任务的模型如下图所示:
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
在线程回收过程中就使用到了这种特性,回收过程如下图所示:
Worker线程增加
增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。
其执行流程如下图所示:
Worker线程回收
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
- try {
- while (task != null || (task = getTask()) != null) {
- //执行任务
- }
- } finally {
- processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
- }
线程回收的工作是在processWorkerExit方法完成的。
事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
Worker线程执行任务
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
执行流程如下图所示
CachedThreadPool的特点是根据需要动态创建和回收线程,适用于需要处理大量短期任务的场景。下面是关于CachedThreadPool的介绍:
下面是使用CachedThreadPool的示例代码:
- package org.zyf.javabasic.thread.base;
-
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- /**
- * @author yanfengzhang
- * @description
- * @date 2021/5/1 19:33
- */
- public class CachedThreadPoolExample {
- public static void main(String[] args) {
- // 创建CachedThreadPool
- ExecutorService executor = Executors.newCachedThreadPool();
-
- // 提交任务
- for (int i = 0; i < 10; i++) {
- final int taskId = i;
- executor.execute(() -> {
- System.out.println("Executing task: " + taskId + " on thread: " + Thread.currentThread().getName());
- });
- }
-
- // 关闭线程池
- executor.shutdown();
- }
- }
-
在上述示例中,我们使用Executors.newCachedThreadPool()创建了一个CachedThreadPool实例。然后,我们提交了10个任务给线程池执行,每个任务打印自己的ID和执行线程的名称。最后,我们调用executor.shutdown()来关闭线程池。
FixedThreadPool的特点是固定线程数量,适用于需要控制线程数量的场景。下面是关于FixedThreadPool的介绍:
下面是使用FixedThreadPool的示例代码:
- package org.zyf.javabasic.thread.base;
-
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- /**
- * @author yanfengzhang
- * @description
- * @date 2021/5/1 19:38
- */
- public class FixedThreadPoolExample {
- public static void main(String[] args) {
- // 创建FixedThreadPool,指定线程数为3
- ExecutorService executor = Executors.newFixedThreadPool(3);
-
- // 提交任务
- for (int i = 0; i < 10; i++) {
- final int taskId = i;
- executor.execute(() -> {
- System.out.println("Executing task: " + taskId + " on thread: " + Thread.currentThread().getName());
- });
- }
-
- // 关闭线程池
- executor.shutdown();
- }
- }
在上述示例中,我们使用Executors.newFixedThreadPool(3)创建了一个包含3个线程的FixedThreadPool实例。然后,我们提交了10个任务给线程池执行,每个任务打印自己的ID和执行线程的名称。最后,我们调用executor.shutdown()来关闭线程池。
ScheduledThreadPool的特点是可以在指定的延迟时间后或按固定的时间间隔执行任务,适用于需要定时执行任务的场景。下面是关于ScheduledThreadPool的介绍:
下面是使用ScheduledThreadPool的示例代码:
- package org.zyf.javabasic.thread.base;
-
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- /**
- * @author yanfengzhang
- * @description
- * @date 2021/5/1 19:41
- */
- public class ScheduledThreadPoolExample {
- public static void main(String[] args) {
- // 创建ScheduledThreadPool
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
-
- // 延迟执行任务
- executor.schedule(() -> {
- System.out.println("Task 1 executed after 2 seconds.");
- }, 2, TimeUnit.SECONDS);
-
- // 固定时间间隔执行任务
- executor.scheduleAtFixedRate(() -> {
- System.out.println("Task 2 executed every 3 seconds.");
- }, 0, 3, TimeUnit.SECONDS);
-
- // 关闭线程池
- executor.shutdown();
- }
- }
在上述示例中,我们使用Executors.newScheduledThreadPool(2)创建了一个包含2个线程的ScheduledThreadPool实例。然后,我们使用schedule()方法延迟执行一个任务,该任务会在2秒后执行。另外,我们使用scheduleAtFixedRate()方法按照固定的时间间隔(3秒)来执行另一个任务。最后,我们调用executor.shutdown()来关闭线程池。
SingleThreadExecutor的特点是只有一个工作线程,适用于需要保证任务按照顺序执行的场景。下面是关于SingleThreadExecutor的介绍:
下面是使用SingleThreadExecutor的示例代码:
- package org.zyf.javabasic.thread.base;
-
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- /**
- * @author yanfengzhang
- * @description
- * @date 2021/5/1 19:44
- */
- public class SingleThreadExecutorExample {
- public static void main(String[] args) {
- // 创建SingleThreadExecutor
- ExecutorService executor = Executors.newSingleThreadExecutor();
-
- // 提交任务
- for (int i = 0; i < 5; i++) {
- final int taskId = i;
- executor.execute(() -> {
- System.out.println("Executing task: " + taskId + " on thread: " + Thread.currentThread().getName());
- });
- }
-
- // 关闭线程池
- executor.shutdown();
- }
- }
在上述示例中,我们使用Executors.newSingleThreadExecutor()创建了一个SingleThreadExecutor实例。然后,我们提交了5个任务给线程池执行,每个任务打印自己的ID和执行线程的名称。最后,我们调用executor.shutdown()来关闭线程池。
以下是一些合理配置线程池的建议:
总之,合理配置线程池需要根据具体的业务需求、任务特性和系统资源来选择适当的参数和策略。关注线程池的大小、队列容量、拒绝策略以及监控和调优是配置线程池的关键点。
本部分通过Java线程池实现原理及其在美团业务中的实践 - 美团技术团队整理
线程池使用面临的核心的问题在于:线程池的参数并不好配置。
Case1:2018年XX页面展示接口大量调用降级:
事故描述:XX页面展示接口产生大量调用降级,数量级在几十到上百。
事故原因:该服务展示接口内部逻辑使用线程池做并行计算,由于没有预估好调用的流量,导致最大核心数设置偏小,大量抛出RejectedExecutionException,触发接口降级条件,示意图如下:
Case2:2018年XX业务服务不可用S2级故障
事故描述:XX业务提供的服务执行时间过长,作为上游服务整体超时,大量下游服务调用失败。
事故原因:该服务处理请求内部逻辑使用线程池做资源隔离,由于队列设置过长,最大线程数设置失效,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败。示意图如下:
回到最初的问题,业务使用线程池是为了获取并发性,对于获取并发性,是否可以有什么其他的方案呢替代?我们尝试进行了一些其他方案的调研:
综合考虑,这些新的方案都能在某种情况下提升并行任务的性能,然而本次重点解决的问题是如何更简易、更安全地获得的并发性。另外,Actor模型的应用实际上甚少,只在Scala中使用广泛,协程框架在Java中维护的也不成熟。这三者现阶段都不是足够的易用,也并不能解决业务上现阶段的问题。
有没有一种计算公式,能够让开发同学很简易地计算出某种场景中的线程池应该是什么参数呢?带着这样的疑问,我们调研了业界的一些线程池参数配置方案:
调研了以上业界方案后,我们并没有得出通用的线程池计算方式。并发任务的执行情况和任务类型相关,IO密集型和CPU密集型的任务运行起来的情况差异非常大,但这种占比是较难合理预估的,这导致很难有一个简单有效的通用公式帮我们直接计算出结果。
尽管经过谨慎的评估,仍然不能够保证一次计算出来合适的参数,那么我们是否可以将修改线程池参数的成本降下来,这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间呢?基于这个思考,我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:
动态化线程池的核心设计包括以下三个方面:
简化线程池配置
线程池构造参数有8个,最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。
考虑到在实际应用中我们获取并发性的场景主要是两种:
所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。
参数可动态修改
为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。
增加线程池监控
对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。
动态化线程池提供如下功能:
JDK原生线程池ThreadPoolExecutor提供了如下几个public的setter方法,如下图所示:
JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略,以setCorePoolSize为方法例,在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize具体流程如下:
线程池内部会处理好当前状态做到平滑修改,其他几个方法限于篇幅,这里不一一介绍。重点是基于这几个public方法,我们只需要维护ThreadPoolExecutor的实例,并且在需要修改的时候拿到实例修改其参数即可。基于以上的思路,我们实现了线程池参数的动态化、线程池参数在管理平台可配置可修改,其效果图如下图所示:
用户可以在管理平台上通过线程池的名字找到指定的线程池,然后对其参数进行修改,保存后会实时生效。目前支持的动态参数包括核心数、最大值、队列长度等。除此之外,在界面中,我们还能看到用户可以配置是否开启告警、队列等待任务告警阈值、活跃度告警等等。
除了参数动态化之外,为了更好地使用线程池,我们需要对线程池的运行状况有感知,比如当前线程池的负载是怎么样的?分配的资源够不够用?任务的执行情况是怎么样的?是长任务还是短任务?基于对这些问题的思考,动态化线程池提供了多个维度的监控和告警能力,包括:线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等,既能帮助用户从多个维度分析线程池的使用情况,又能在出现问题第一时间通知到用户,从而避免故障或加速故障恢复。
负载监控和告警
线程池负载关注的核心问题是:基于当前线程池参数分配的资源够不够。对于这个问题,我们可以从事前和事中两个角度来看。事前,线程池定义了“活跃度”这个概念,来让用户在发生Reject异常之前能够感知线程池负载问题,线程池活跃度计算公式为:
线程池活跃度 = activeCount/maximumPoolSize。
这个公式代表当活跃线程数趋向于maximumPoolSize的时候,代表线程负载趋高。事中,也可以从两方面来看线程池的过载判定条件,一个是发生了Reject异常,一个是队列中有等待任务(支持定制阈值)。以上两种情况发生了都会触发告警,告警信息会通过大象推送服务所关联的负责人。
任务级精细化监控
在传统的线程池应用场景中,线程池中的任务执行情况对于用户来说是透明的。比如在一个具体的业务场景中,业务开发申请了一个线程池同时用于执行两种任务,一个是发消息任务、一个是发短信任务,这两类任务实际执行的频率和时长对于用户来说没有一个直观的感受,很可能这两类任务不适合共享一个线程池,但是由于用户无法感知,因此也无从优化。动态化线程池内部实现了任务级别的埋点,且允许为不同的业务任务指定具有业务含义的名称,线程池内部基于这个名称做Transaction打点,基于这个功能,用户可以看到线程池内部任务级别的执行情况,且区分业务,任务监控示意图如下图所示:
运行时状态实时查看
用户基于JDK原生线程池ThreadPoolExecutor提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数,如下图所示:
动态化线程池基于这几个接口封装了运行时状态实时查看的功能,用户基于这个功能可以了解线程池的实时状态,比如当前有多少个工作线程,执行了多少个任务,队列中等待的任务数等等。效果如下图所示:
面对业务中使用线程池遇到的实际问题,我们曾回到支持并发性问题本身来思考有没有取代线程池的方案,也曾尝试着去追求线程池参数设置的合理性,但面对业界方案具体落地的复杂性、可维护性以及真实运行环境的不确定性,我们在前两个方向上可谓“举步维艰”。最终,我们回到线程池参数动态化方向上探索,得出一个且可以解决业务问题的方案,虽然本质上还是没有逃离使用线程池的范畴,但是在成本和收益之间,算是取得了一个很好的平衡。成本在于实现动态化以及监控成本不高,收益在于:在不颠覆原有线程池使用方式的基础之上,从降低线程池参数修改的成本以及多维度监控这两个方面降低了故障发生的概率。希望本文提供的动态化线程池思路能对大家有帮助。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。