/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
大致意思就是,通过该方法会创建一个线程池,当你执行一个任务,并且线程池中不存在可用的已构造好的线程时,它就会创建一个新线程,否则它会优先复用已有的线程,当线程未被使用时,默认 60 秒后被移除。
These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
我们接着看 ThreadPoolExecutor 构造方法的描述:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory and rejected execution handler. * It may be more convenient to use one of the {@link Executors} factory * methods instead of this general purpose constructor. * 核心线程数 * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * 最大线程数 * @param maximumPoolSize the maximum number of threads to allow in the * pool * 存活时长 * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * 时间单位 * @param unit the time unit for the {@code keepAliveTime} argument * 任务队列 * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
结合ThreadPoolExecutor 构造方法的描述,我们可以知道,当我们调用newCachedThreadPool()方法的时候,它会创建一个核心线程数为 0 ,最大线程数为Integer上限,无用线程存活时间为 6 秒的线程池。
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
try {
} catch (InterruptedException e) {
threadPool.awaitTermination(1, TimeUnit.HOURS);
模拟瞬间创建100000000十万个任务,且每个任务需要等待一小时,会发现电脑内存使用率迅速增加并一直持续到 OOM。
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService#awaitTermination(1, TimeUnit.HOURS);
operating off a shared unbounded queue
通过查看newFixedThreadPool()在创建线程池时传入的队列 new LinkedBlockingQueue()
public LinkedBlockingQueue() {
例如,一条简单的 查询操作 和 文件读取操作 就应该放在不同的线程池里面
//队列长度为100 BlockingQueue<Runnable> blockqueue = new LinkedBlockingQueue<Runnable>(100) { /** * 这里重写offer方法 * 在接收到新的任务时,会先加入到队列中,当队列满了之后,才会创建新的线程 直到达到线程池的最大线程数 * 我们现在需要接收到新任务时,优先将线程数扩容到最大数,后续任务再放入到队列中 * 加入队列会调用 offer方法 ,我们直接返回false,制造队列已满的假象 */ @Override public boolean offer(Runnable e) { return false; } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 10, 10, TimeUnit.SECONDS, blockqueue , new ThreadFactoryBuilder().setNameFormat("mypool-%d").get(), (r, executor) -> { /** * 这里拒绝策略,被拒绝的任务会走该方法 及没添加到队列中,且没有获取到线程的任务 * 因为我们设置的队列中 offer方法固定返回false */ try { //如果允许该任务执行但是不阻塞,及如果进不了队列就放弃,我们可以调用 offer 的另一个多参的方法 if (!executor.getQueue().offer(r, 0, TimeUnit.SECONDS)) { throw new RejectedExecutionException("ThreadPool queue full, failed to offer " + r.toString()); } //如果我们需要让任务一定要执行,及足协而等待进入队列,可以使用put executor.getQueue().put(r) } catch (InterruptedException e) { Thread.currentThread().interrupt(); } });
