赞
踩
目录
大龄程序员老王
老王是一个已经北漂十多年的程序员,岁数大了,加班加不过年轻人,升迁也无望,于是拿着手里的一些积蓄,回老家转行创业。他选择了洗浴行业,开一家洗浴中心,没错,一家正规的洗浴中心。之前在北京的时候,喜欢去的澡堂叫“清华池”,他想了想,就给自己的洗浴中心取名为“线程池”。线程池洗浴中心
线程池开业以后,老王发现有顾客想做足疗,于是就招聘了1个足疗技师,多增加了一项业务增加了收入。随着做足疗的顾客增多,为了赚更多钱又招聘了4个足疗技师。
过了一段时间,洗浴中心的生意越来越好,做足疗的顾客也越来越多。但是,老王发现自己店里的足疗技师已经有5个足疗技师,再招聘就太多了,支付不起再多工资了。足疗技师忙不过来怎么办?老王是个聪明人,马上想到办法:让顾客排队,有哪个足疗技师做完了,空闲出来了,就在队伍里再叫一个顾客继续做。忙碌的周末
一到周末,来洗浴中心的顾客比平时多了几倍,想足疗的顾客排队时间太长,顾客们已经不耐烦了。老王马上做出反应,又紧急从其他洗浴中心招聘了5个足疗技师,为队伍里顾客做足疗,大大减少排队的顾客。
不过,有时生意太火爆了,紧急招聘的技师也用上了,顾客排队时间也是很长,再来新的顾客,老王只能满脸赔笑地和顾客说:“您下次再来吧,下次给您找个好技师。”,把顾客拒之门外。
过了周末以后,店里不能养闲人啊,老王就把紧急招聘的技师都辞退了。老王的经营之道
老王的生意越做越红火,很快就要开分店、融资上市、走上人生巅峰。既然这么成功,就让我们来复盘一下他的经营之道吧:
原文链接:https://blog.csdn.net/heihaozi/article/details/102882698
线程生命周期如下图:
新建:java.lang.Thread.State.NEW
- public static void thread_state_NEW(){
- Thread thread = new Thread();
- System.out.println(thread.getState());
- }
就绪:java.lang.Thread.State.RUNNABLE
- public static void thread_state_RUNNABLE(){
- Thread thread = new Thread();
- thread.start();
- System.out.println(thread.getState());
- }
超时等待:java.lang.Thread.State#TIMED_WAITING
- public static void thread_state_SLEEP(){
- Thread thread3 = new Thread(() -> {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } });
- thread3.start();
- Thread.sleep(500);
- System.out.println(thread3.getState());
- }
等待:java.lang.Thread.State.WAITING
- public static void thread_state_WAITING(){
- Thread thread2 = new Thread(new Runnable() {
- public void run() {
- LockSupport.park();
- }
- });
- thread2.start();
- Thread.sleep(500);
- System.out.println(thread2.getState());
- LockSupport.unpark(thread2);
- }
阻塞:java.lang.Thread.State.BLOCKED
- public static void thread_state_BLOCKED(){
- final byte[] lock = new byte[0];
- Thread thread1 = new Thread(() -> {
- synchronized (lock){
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } }
- });
- thread1.start();
- Thread thread2 = new Thread(() -> {
- synchronized (lock){
- } });
- thread2.start();
- Thread.sleep(1000);
-
- System.out.println(thread1.getState());
- System.out.println(thread2.getState());
- }
销亡:java.lang.Thread.State.TERMINATED
- public static void thread_state_TERMINATED(){
- Thread thread = new Thread();
- thread.start();
- Thread.sleep(1000);
- System.out.println(thread.getState());
- }
项目中使用线程池也有一些注意事项,参照《Java开发手册 - 泰山版》说明:
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回的线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
线程池(ThreadPool):线程池就是创建一个缓冲池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等候下次任务来临,这使得线程池比手动 创建线程有着更多的优势,
常应用于高并发场景下。使用多线程对代码效率进行优化,因此,试用线程池比手动创建线程有着更多的优势:
降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM
节省cpu切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)
提供更强大的功能,延时定时线程池。 Timer vs ScheduledThreadPoolExecutor
常见的线程池结构(UML)
最常用的线程池,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
RUNNING:接受新任务并处理排队的任务。
SHUTDOWN:不接受新任务,但处理排队的任务。
STOP:不接受新任务,不处理排队的任务,并中断正在进行的任务。
TIDYING:所有任务都已终止,workerCount 为零,线程转换到 TIDYING 状态将运行 terminated() 钩子方法。
TERMINATED:terminated() 已完成。
上述为线程池的五种状态,那么这五种状态由什么记录呢?mark一下~下面详细介绍。
假设场景:
- 创建线程池,无限循环添加task,debug看works和queue数量增长规律
- 等待一段时间后,查看works数量是否回落到core
先附结论:
首先我们了解下ThreadPoolExecutor的构造函数
从源码中可以看出,ThreadPoolExecutor的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释
1、corePoolSize 核心线程数量:
线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。任务提交到线程池后,首先会检查当前线程数是否达到了corePoolSize,如果没有达到的话,则会创建一个新线程来处理这个任务。
2、maximumPoolSize 线程池最大线程数量:
当前线程数达到corePoolSize后,如果继续有任务被提交到线程池,会将任务缓存到工作队列(后面会介绍)中。如果队列也已满,则会去创建一个新线程来出来这个处理。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。
3、keepAliveTime 空闲线程存活时间:
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定
4、unit 空闲线程存活时间单位:
keepAliveTime的计量单位,常用 SECONDS(秒) MILLISECONDS(毫秒)
5、workQueue 工作队列:
任务队列,用于传输和保存等待执行任务的阻塞队列。当corePoolSize均初始化完成后,再来任务就会直接存入queue中,线程通过getTask()方法自旋获取任务。常见的队列设置如下:
①ArrayBlockingQueue 数组阻塞队列:
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②※LinkedBlockingQuene 链表阻塞队列(注意:可以指定长度):
基于链表的无界阻塞队列(默认最大容量为Interger.MAX,可以指定长度),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而基本不会去创建新线程直到maxPoolSize(很难达到Interger.MAX这个数),因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
ps:ThreadPoolTaskExecutor有界场景默认使用
③SynchronousQuene 同步队列:
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue 优先级阻塞队列:
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
6、threadFactory 线程工厂:
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
7、handler 拒绝策略:
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:
①CallerRunsPolicy
该策略下,在调用者线程中直接执行被拒绝任务的run方法。
②AbortPolicy
该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
ps:ThreadPoolTaskExecutor默认
③DiscardPolicy
该策略下,直接丢弃任务,什么都不做。
④DiscardOldestPolicy
该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
介绍了上述线程池的基本信息后,接下来开始源码解析。首先再看个源码基础概念。
什么是“ctl”?
ctl 是一个打包两个概念字段的原子整数。
1)workerCount:指示线程的有效数量;
2)runState:指示线程池的运行状态,有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 等状态。
int 类型有32位,其中 ctl 的低29为用于表示 workerCount,高3位用于表示 runState,如下图所示。
源码介绍:
- /**
- * 主池控制状态ctl是包含两个概念字段的原子整数: workerCount:指有效的线程数量;
- * runState:指运行状态,运行,关闭等。为了将workerCount和runState用1个int来表示,
- * 我们限制workerCount范围为(2 ^ 29) - 1,即用int的低29位用来表示workerCount,
- * 用int的高3位用来表示runState,这样workerCount和runState刚好用int可以完整表示。
- */
- // 初始化时有效的线程数为0, 此时ctl为: 1010 0000 0000 0000 0000 0000 0000 0000
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 高3位用来表示运行状态,此值用于运行状态向左移动的位数,即29位
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 线程数容量,低29位表示有效的线程数, 0001 1111 1111 1111 1111 1111 1111 1111
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
-
- /**
- * 大小关系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,
- * 源码中频繁使用大小关系来作为条件判断。
- * 1110 0000 0000 0000 0000 0000 0000 0000 运行
- * 0000 0000 0000 0000 0000 0000 0000 0000 关闭
- * 0010 0000 0000 0000 0000 0000 0000 0000 停止
- * 0100 0000 0000 0000 0000 0000 0000 0000 整理
- * 0110 0000 0000 0000 0000 0000 0000 0000 终止
- */
- private static final int RUNNING = -1 << COUNT_BITS; // 运行
- private static final int SHUTDOWN = 0 << COUNT_BITS; // 关闭
- private static final int STOP = 1 << COUNT_BITS; // 停止
- private static final int TIDYING = 2 << COUNT_BITS; // 整理
- private static final int TERMINATED = 3 << COUNT_BITS; // 终止
runstate获取:
- /**
- * 得到运行状态:入参c为ctl的值,~CAPACITY高3位为1低29位全为0,
- * 因此运算结果为ctl的高3位, 也就是运行状态
- */
- private static int runStateOf(int c) { return c & ~CAPACITY; }
workCount获取:
- /**
- * 得到有效的线程数:入参c为ctl的值, CAPACITY高3为为0,
- * 低29位全为1, 因此运算结果为ctl的低29位, 也就是有效的线程数
- */
- private static int workerCountOf(int c) { return c & CAPACITY; }
CTL这么设计有什么优点?
ctl 这么设计的主要好处是将对 runState 和 workerCount 的操作封装成了一个原子操作。
runState 和 workerCount 是线程池正常运转中的2个最重要属性,线程池在某一时刻该做什么操作,取决于这2个属性的值。
因此无论是查询还是修改,我们必须保证对这2个属性的操作是属于“同一时刻”的,也就是原子操作,否则就会出现错乱的情况。如果我们使用2个变量来分别存储,要保证原子性则需要额外进行加锁操作,这显然会带来额外的开销,而将这2个变量封装成1个 AtomicInteger 则不会带来额外的加锁开销,而且只需使用简单的位操作就能分别得到 runState 和 workerCount。
还是上述假设场景:
- 创建线程池,无限循环添加task,debug看works和queue数量增长规律
- 核心线程3,有界队列2,最大线程5
- 等待时间20s,自定义拒绝策略
- 等待一段时间后,查看works数量是否回落到core
- 任务场景
- 线程池开业,前三天大酬宾,来客速度大于消费速度,观测逐渐上升趋势
- 线程池第八天开始,任务量减少,消费速度大于产生速度。观测逐渐下落趋势
代码:
- /**
- * desc : 回落场景
- */
- @SneakyThrows
- private static void test_threadPoolExecutor_down_core() {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 3,
- 5,
- 20,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(2),
- new RejectedExecutionHandler() {
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- System.out.println("爆满了,择日再来啊!!!!!拒绝任务~~~~~~~~~~~~~~~");
- }
- });
-
- // 开业前七天高峰,瞬间打满
- for (int i = 0; i < 100; i++) {
-
- int finalI = i + 1;
- executor.execute(() -> {
- try {
- System.out.println("~~~~~~~~~~~~~~~~~~~~~~来活了,第" + finalI + "位客人~~~~~~~~~~~~~~~~~~~~~~");
- System.out.println("当前排队任务数: " + executor.getQueue().size() + " 当前线程数量: " + executor.getPoolSize());
- Thread.sleep(10 * 1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- if (i >= 7) {
- if (i == 8) {
- System.out.println("线程任务高峰期已过 | 分界线!!!!!!!!!!");
- }
- // 此时任务产生的速率高于线程执行速度(线程有富余)
- Thread.sleep(15 * 1000);
- } else {
- Thread.sleep(1L * 1000);
- }
- }
- }
结果如下:
线程逐渐增加到3,之后排队增加到2,之后线程增加到5。第7个任务来的时候,上升到顶峰
后续任务产生变慢,队列任务减少 2->0,之后线程数量逐渐减少5->3
通过上述案例,可以观测到线程池动态变化的过程,下面将从源码角度来分析产生此现象的原因。【ps:由广入深的角度跟随源码】
- public void execute(Runnable command) {
- // 防御性容错
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- // case1 -> worker数量小于核心线程数,addWork
- if (workerCountOf(c) < corePoolSize) {
- // 添加worker - core
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // case2 -> 如果线程池还在运行态,offer到队列
- if (isRunning(c) && workQueue.offer(command)) {
- //再检查一下状态
- int recheck = ctl.get();
- //如果线程池已经终止,直接移除任务,不再响应
- if (! isRunning(recheck) && remove(command))
- reject(command);
- //否则,如果没有线程干活的话,创建一个空work,该work会从队列获取任务去执行
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // case3 -> 队列也满,继续调addWork,但是注意,core=false,开启到maxSize的大门
- else if (!addWorker(command, false)) {
- // case4 -> 超出max的话,addWork会返回false,进入reject
- reject(command);
- }
- }
接着进入addWork方法,方法提供了两个入参(任务,是否核心线程),内部逻辑如下:
- /**
- * desc : 线程创建过程
- */
- private boolean addWorker(Runnable firstTask, boolean core) {
- // 第一步,先是ctl-wc 通过CAS + 1
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // 判断线程池状态是否是可运行态(停止及之后 直接falsee)
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
-
- for (;;) {
- // 获取运行中线程数量,判断是否能增加
- int wc = workerCountOf(c);
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- // 满足条件,此时ctl-wc CAS原子性增加,正常break
- if (compareAndIncrementWorkerCount(c))
- break retry;
- // 增加失败,判断线程池状态决定内循环 or 外循环(结束)
- c = ctl.get();
- if (runStateOf(c) != rs)
- continue retry;
- }
- }
-
- // 第二步,创建新work放入线程集合works(一个HashSet)
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 符合条件,创建新的work并包装task
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- // 加锁 - 避免workers的线程安全问题
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 再次校验运行状态,防止关闭
- int rs = runStateOf(ctl.get());
-
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- // 添加打工人
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- // 添加成功,启动线程
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- // 添加失败,减ctl,集合内移除
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
这里我们着重声明下我们的打工人对象,Worker
- private final class Worker extends AbstractQueuedSynchronizer
- implements Runnable{
- /** Thread this worker is running in. Null if factory fails. */
- final Thread thread;
- /** Initial task to run. Possibly null. */
- Runnable firstTask;
- }
Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。用于回收线程。
下面我们继续跟随Worker类,查看他的run方法
- //在worker执行runWorker()的时候,不停循环,先查看自己有没有携带Task,如果有,执行
- while (task != null || (task = getTask()) != null)
-
-
- //如果没有绑定,会调用getTask,从队列获取任务
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- // 自旋获取任务
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
-
- int wc = workerCountOf(c);
-
- // 判断是不是要超时处理,重点!!!决定了当前线程要不要被释放
- // 首次进来 allowCoreThreadTimeOut = false 主要看 wc > corePoolSize
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- //线程数超出max,并且上次循环中poll等待超时了,那么说明该线程已终止 //将线程队列数量原子性减
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 重点!!!
- // 如果线程可被释放,那就poll,释放的时间为:keepAliveTime
- // 否则,线程是不会被释放的,take一直被阻塞在这里,直到来了新任务继续工作
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- //到这里说明可被释放的线程等待超时,已经销毁,设置该标记,下次循环将线程数减少
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
最后是我们的Worker的退出(线程的释放)
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
-
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 统计总执行任务数 && 释放worker
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- // 中断线程
- tryTerminate();
- }
完整流程回顾:
- 线程池是如何保证核心线程不被销毁的呢?
- 非核心线程如何实现在 keepAliveTime 后死亡?
- 当核心线程数小于corePoolSize且有空闲线程,此时添加任务是创建线程 or 现有执行
- 核心线程与非核心线程有什么区别?
- 线程池中用到了哪些锁,为何使用?
上述注意点可在源码解析部分寻找答案~~
线程池选用了Spring提供的“ThreadPoolTaskExecutor”,看下他的UML类图
从上继承关系可知:
- ThreadPoolExecutor是一个java类不提供spring生命周期和参数装配。
- ThreadPoolTaskExecutor实现了InitializingBean, DisposableBean ,xxaware等,具有spring特性
- AsyncListenableTaskExecutor提供了监听任务方法(相当于添加一个任务监听,提交任务完成都会回调该方法)
简单理解:
1、ThreadPoolTaskExecutor使用ThreadPoolExecutor并增强,扩展了更多特性
2、ThreadPoolTaskExecutor只关注自己增强的部分,任务执行还是ThreadPoolExecutor处理。
3、前者spring自己用着爽,后者离开spring我们用ThreadPoolExecutor爽。
注意:ThreadPoolTaskExecutor 不会自动创建ThreadPoolExecutor需要手动调initialize才会创建
如果@Bean 就不需手动,会自动InitializingBean的afterPropertiesSet来调initialize4、@bean ThreadPoolTaskExecutor 的线程,容器关闭会自动调用其实现的关闭方法。可以用来实现优雅关停。而@Bean ThreadPoolExecutor 容器虽然可以移除该bean,但是若有线程仍然执行导致应用退出不彻底就需要用户自己调用
5、自定义任务装饰者方法 ,可传递traceId
corePoolSize
基本线程数,一旦有任务进来,在core范围内会立刻创建线程进入工作。所以这个值应该参考业务并发量在绝大 多数时间内的并发情况。同时分析任务的特性。
高并发,执行时间短的,要尽可能小的线程数,如配置CPU个数+1,减少线程上下文的切换。因为它不怎么占时 间,让少量线程快跑干活。
并发不高、任务执行时间长的要分开看:如果时间都花在了IO上,那就调大CPU,如配置两倍CPU个数+1。不能 让CPU闲下来,线程多了并行处理更快。如果时间都花在了运算上,运算的任务还很重,本身就很占cpu,那尽量 减少cpu,减少切换时间。
workQueue
任务队列,用于传输和保存等待执行任务的阻塞队列。这个需要根据你的业务可接受的等待时间。是一个需要权衡 时间还是空间的地方,如果你的机器cpu资源紧张,jvm内存够大,同时任务又不是那么紧迫,减少coresize,加大 这里。如果你的cpu不是问题,对内存比较敏感比较害怕内存溢出,同时任务又要求快点响应。那么减少这里。
maximumPoolSize
线程池最大数量,这个值和队列要搭配使用,如果你采用了无界队列,那很大程度上,这个参数没有意义。同时要
注意,队列盛满,同时达到max的时候,再来的任务可能会丢失(下面的handler会讲)。 如果你的任务波动较大,同时对任务波峰来的时候,实时性要求比较高。也就是来的很突然并且都是着急的。那么调小队列,加大这里。如果你的任务不那么着急,可以慢慢做,那就扔队列吧。
队列与max是一个权衡。队列空间换时间,多花内存少占cpu,轻视任务紧迫度。max舍得cpu线程开销,少占内 存,给任务最快的响应。
keepaliveTime
线程存活保持时间,超出该时间后,线程会从max下降到core,很明显,这个决定了你养闲人所花的代价。如果 你不缺cpu,同时任务来的时间没法琢磨,波峰波谷的间隔比较短。经常性的来一波。那么实当的延长销毁时间, 避免频繁创建和销毁线程带来的开销。如果你的任务波峰出现后,很长一段时间不再出现,间隔比较久,那么要适 当调小该值,让闲着不干活的线程尽快销毁,不要占据资源。
下面是摘自《Java并发编程实战》的一些处理方案。
上述方案有个共通的问题
线程池使用面临的核心的问题在于:线程池的参数并不好配置。
- 一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;
- 另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大。
- 这导致业界并没有一些成熟的经验策略帮助开发人员参考。
那么有一种解决方案是什么呢?引出下文的线程池参数动态化。
我们可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:
首先看下为什么可以动态扩容
从源码分析可以看到,是提供了corePoolSize,MaximumPoolSize的set方法。
点开 setCorePoolSize,我们可以看到是允许运行期间变更的
所以基于经过前面方法的分析,我们知道了最大线程数和核心线程数可以动态调整。
这里简单介绍下扩容场景
- 拒绝次数打到阈值
- 线程池负载达到阈值
本案例使用的惰性扩容,定义场景:
- public class T04_Dynamic_ThreadPollExecutor {
-
- // 定义个普通线程池
- public static ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 3,
- 5,
- 20,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(2),
- new NamedThreadFactory("Evan-Dynamic-ThreadPool",true),
- new T04_MyRejectedExecutionHandler());
-
-
- @SneakyThrows
- public static void main(String[] args) {
-
- // 开业前七天高峰,瞬间打满
- for (int i = 0; i < 100; i++) {
- int finalI = i + 1;
- executor.execute(() -> {
- try {
- System.out.println("~~~~~~~~~~~~~~~~~~~~~~来活了,第" + finalI + "位客人~~~~~~~~~~~~~~~~~~~~~~");
- System.out.println("当前排队任务数: " + executor.getQueue().size() + " 当前线程数量: " + executor.getPoolSize());
- Thread.sleep(15 * 1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- if (i >= 20) {
- if (i == 20) {
- System.out.println("线程任务高峰期已过 | 分界线!!!!!!!!!!");
- }
- // 此时任务产生的速率高于线程执行速度(线程有富余)
- Thread.sleep(15 * 1000);
- } else {
- Thread.sleep(1L * 1000);
- }
- }
-
- }
- }
- public class T04_MyRejectedExecutionHandler extends ThreadPoolExecutor.DiscardPolicy {
-
- // 线程池阈值活跃度
- public static BigDecimal THREADPOLL_THRESHOLD = BigDecimal.valueOf(0.8);
- // 拒绝策略告警
- public static BigDecimal REJECT_THRESHOLD = BigDecimal.valueOf(10);
- // 自定义拒绝策略
- public AtomicInteger rejectCount = new AtomicInteger(0);
-
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- // 1.加个计数
- rejectCount.incrementAndGet();
- int count = rejectCount.get();
-
- System.out.println("线程池过载警告 | rejectCount = [" + count + "],core = [" + executor.getCorePoolSize() + "],max = [" + executor.getMaximumPoolSize() + "],active = [" + executor.getActiveCount() + "]" +
- ",queueSize = [" + executor.getQueue().size() + "],负载 = [" + BigDecimal.valueOf(executor.getActiveCount() * 1.00 / executor.getMaximumPoolSize()).toPlainString() + "]");
-
- // 判断是否命中扩容策略
- if (checkNeedExpansion(executor)) {
- dynamicResize(executor);
- }
- ;
- super.rejectedExecution(r, executor);
- }
-
- /**
- * desc : 动态扩容方法
- */
- private void dynamicResize(ThreadPoolExecutor executor) {
- // TODO: 2022/6/15 fengmz 这里扩容简单做 * 2,可以监控apollo配置 当触发报警的时候去处理
- executor.setCorePoolSize(executor.getCorePoolSize() * 2);
- executor.setMaximumPoolSize(executor.getMaximumPoolSize() * 2);
- executor.prestartAllCoreThreads();
- System.out.println("扩容完成");
- soutCurrentThreadPoolState(executor);
- }
-
- /**
- * desc : 打印当前线程池状态
- */
- private void soutCurrentThreadPoolState(ThreadPoolExecutor executor) {
- System.out.println("soutCurrentThreadPoolState | core = [" + executor.getCorePoolSize() + "],max = [" + executor.getMaximumPoolSize() + "],active = [" + executor.getActiveCount() + "]" +
- ",queueSize = [" + executor.getQueue().size() + "],负载 = [" + BigDecimal.valueOf(executor.getActiveCount() * 1.00 / executor.getMaximumPoolSize()).toPlainString() + "]");
- }
-
- private boolean checkNeedExpansion(ThreadPoolExecutor executor) {
-
- // case1 拒绝任务超过阈值
- if (rejectCount.get() > REJECT_THRESHOLD.intValue()) {
- // TODO: 2022/6/15 fengmz 报警 or 触发自动扩容策略
- System.out.println("checkNeedExpansion | result = [" + true + "],reason = [拒绝任务超过阈值]");
- return true;
- }
-
- // case2 线程池负载超过阈值
- BigDecimal load = BigDecimal.valueOf(executor.getActiveCount() * 1.00 / executor.getMaximumPoolSize()).setScale(2, RoundingMode.HALF_UP);
- if (load.compareTo(THREADPOLL_THRESHOLD) > 0) {
- // TODO: 2022/6/15 fengmz 报警 or 触发自动扩容策略
- System.out.println("checkNeedExpansion | result = [true],reason = [线程池负载超过阈值]");
- return true;
- }
- return false;
- }
- }
基于上述调试过程,那么我们的新流程图就呼之欲出了。
java8 的juc提供了很多并发工具,CompleteFuture是笔者最常用的工具之一,它和threadPoolExecutor的友好集成也是选择它的条件之一。本文不过多介绍,罗列其常用api,有兴趣自行学习
- CompleteFuture.runAsync(runnable) : 无返回值异步执行
- CompleteFuture.supplyAsync(runnable) : 带返回值异步执行
- allOf(CompletableFuture<?>... cfs):全部任务组合成一个CompletableFuture
- join() :一般和allOf组合使用,阻塞等待执行完成
- get(long timeout, TimeUnit unit) :获取返回值,超时抛TimeoutException
- exceptionally(Function<Throwable, ? extends T> fn) : 异常处理,降级逻辑
通过之前的源码分析我们看到,在任务到来的时候才会addWorker(),这样会增加前几次请求的响应时间,这里我们可以考虑通过预热在启动的时候完成核心线程的初始化,来优化一些我们的响应速度。
jdk原生 ThreadPoolExecutor
- prestartAllCoreThreads()
spring ThreadPoolTaskExecutor
- executor.getThreadPoolExecutor().prestartAllCoreThreads()
回想下上文说到的线程池状态,【SHUTDOWN:不接受新任务,但处理排队的任务】
可以看到线程池提供了是优雅关停的思想的。但是不同实现,可能他的关停方式也不同。
jdk原生 ThreadPoolExecutor
- 虽然提供了优雅关闭方法,但是需要再容器关闭钩子中手动调用
spring ThreadPoolTaskExecutor -- 推荐,自动集成
- 因为本身实现了 DisposableBean 接口
- spring容器关闭的时候会调用 destroy() 方法
- 内部调用了shutDown()方法,实现了优雅关停
完结撒花~
本来只想简单写一写,没想到越写越停不下来。由此可见源码涉及的知识点还是很多的。线程池的源码虽然不是很复杂,但是可以从中学习到很多的设计思想,希望大家都能有所收获~
最后码文不易
「点赞,收藏,评论 三连走一走,谢谢各位Thanks♪(・ω・)ノ」
需要注释源码的可以留下邮箱,我私聊发你~~~~
本文动态扩容参考了美团博客:
Java线程池实现原理及其在美团业务中的实践
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。