赞
踩
背景:
线程池的主要工作:
线程池的优点:
线程池的核心:
架构实现:
最原始的线程池创建方式,下面前四种创建方式都是对 ThreadPoolExecutor() 的封装。
public ThreadPoolExecutor(int corePoolSize, // 核心线程的数量
int maximumPoolSize, // 最大线程数量
long keepAliveTime, // 超出核心线程数量以外的线程空余存活时间
TimeUnit unit, // 存活时间的单位
BlockingQueue<Runnable> workQueue, // 保存待执行任务的队列
ThreadFactory threadFactory, // 创建新线程使用的工厂(非必填)
RejectedExecutionHandler handler // 当任务无法执行时的处理器(非必填)
) {...}
使用示例:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
10,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(10000),
r -> new Thread(r, "DemoThread-" + r.hashCode()));
}
corePoolSize:核心线程池数量
线程池中已有线程数 < 核心线程池数量,有新任务进来就新建一个线程,即使有的线程没事干。
线程池中已有线程数 ≥ 核心线程池数量,有新任务进来不会新建线程了,空闲的线程就得去任务队列里取任务执行。
maximumPoolSize:最大线程数量
keepAliveTime:核心以外的线程存活时间,即没有任务的线程的存活时间
unit:存活时间的单位
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
workQueue:保存待执行任务的阻塞队列
threadFactory:每个线程创建的地方(非必填)
handler:拒绝策略(非必填)
拒绝策略 | 拒绝行为 |
---|---|
AbortPolicy | 直接抛出 RejectedExecutionException,本策略也是默认的拒绝策略。 |
CallerRunsPolicy | 只要线程池没关闭,就直接用调用者所在的线程来运行任务。 |
DiscardPolicy | 悄悄把任务放生,不做了。 |
DiscardOldestPolicy | 把队列里待最久的那个任务扔了,然后再调用 execute() 尝试执行。 |
主要特点:
主要特点:
主要特点:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}
主要特点:
注意:
扩展内容:Executors创建的4中线程池的使用 - https://www.cnblogs.com/ljp-sun/p/6580147.html;
阿里规约明确规定,不允许使用Executors去创建线程池。
OOM:Out Of Memory,内存溢出。
扩展内容:Executors为什么会导致 OOM,如何正确的创建线程池? - https://www.zhihu.com/question/23212914
方法:
代码:
import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.time.LocalDateTime; import java.util.concurrent.*; /** * <p> @Title ExecutorsDemo * <p> @Description 创建线程池Demo * * @author zhj * @date 2020/12/9 13:25 */ public class ExecutorsDemo extends Thread { @Override public void run() { System.out.println("date: " + LocalDateTime.now()); } /** * 使用guava提供的ThreadFactoryBuilder创建ThreadFactory,可以方便地重命名线程 */ private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); /** * 创建线程池 * 【恶搞】:此处可以尝试使用一下 Executors.newCachedThreadPool(); * 注意:会迅速把计算机的内存吃掉导致死机。【不要轻易尝试,后果自负】 * 最佳线程数: 操作系统内核数+2 * 操作系统内核数: Runtime.getRuntime().availableProcessors() */ private static ExecutorService pool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 2, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws InterruptedException { // 由于执行很快,瞬间的任务量已经超过了队列长度+线程池的最大线程数, // 根据我们设置的 AbortPolicy 拒绝策略,会抛 RejectedExecutionException for (int i = 0; i < Integer.MAX_VALUE; i++) { pool.execute(new ExecutorsDemo()); // 加上此行代码,可以给系统时间让线程去执行任务,然后去接新的任务 // Thread.sleep(1L); } } }
输出结果:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[Thread-1224,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@2c13da15[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.tsingsoft.forecast.vo.ExecutorsDemo.main(ExecutorsDemo.java:39)
date: 2020-12-09T14:09:16.088
date: 2020-12-09T14:09:16.089
线程池的状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中ctl这个AtomicInteger的功能很强大,其高3位用于维护线程池运行状态,低29位维护线程池中线程数量。
状态
:
1、RUNNING:-1 << COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务。
2、SHUTDOWN:0 << COUNT_BITS,即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务。
3、STOP:1 << COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务。
4、TIDYING:2 << COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法。
5、TERMINATED:3 << COUNT_BITS,即高3位为100,低29位为0,terminated()方法调用完成后变成此状态。
这些状态均由int型表示,大小关系为 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这个顺序基本上也是遵循线程池从 运行 到 终止这个过程。
方法
:
runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态。
workerCountOf(int c)方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量。
ctlOf(int rs, int wc)方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl。
线程池的执行过程:
在创建了线程池后,等待提交过来的任务请求。
当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
(1)如果,线程池中的线程数量 < corePoolSize,则每来一个任务,就创建线程去执行这个任务;
(2)如果,线程池中的线程数量 ≥ corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;
(3)如果,线程池中的线程数量 ≥ corePoolSize,并且队列 workQueue 已满,
但线程池中的线程数量 < maximumPoolSize,则会创建新的线程来处理被添加的任务;
(4)如果,线程池中的线程数量 ≥ maximumPoolSize,则会采取拒绝策略进行处理。
参考文章:(内容略有改动)
1.线程池的简介及底层原理 - https://www.cnblogs.com/lveyHang/p/12060832.html
2.线程池底层原理 - https://www.jianshu.com/p/483656576bf2
3.Java线程池的底层实现与使用 - https://www.cnblogs.com/sxkgeek/p/9343519.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。