赞
踩
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
1.降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
2.提高响应速度:任务到达时,无需等待线程创建即可立即执行。
3.提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
4.提供其他功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
1.频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
2.对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。3.系统无法合理管理内部的资源分布,会降低系统的稳定性。
为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
ThreadPoolExecutor是Java中的线程池核心实现类,以下是ThreadPoolExecutor的UML类图。
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
ExecutorService接口增加了一些能力:
(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
(2)提供了管控线程池的方法,比如停止线程池的运行。
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl
ThreadPoolExecutor的运行状态有5种,分别为
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
1.首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
2.如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
3.如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
4.如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
5.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
ThreadPoolExecutor线程池核心参数
corePoolSize:线程池的核心线程数,线程池初始化创建的最小线程数量。
maximumPoolSize:最大线程数,当阻塞队列满了以后,还可以创建线程的最大数量。
keepAliveTime:空闲线程存活时间,核心线程之外创建的线程没有任务的时候不是立即销毁,超过等待时间之后才会被回收销毁。
unit:存活的时间单位,keepAliveTime的时间单位。
workQueue:阻塞队列,存放提交但未执行任务的队列。
threadFactory:创建线程的工厂类,用来创建线程执行器。
handler:饱和策略,当前线程超过线程池最大线程数后处理策略。
AbortPolicy: 默认策略,抛出异常RejectedExecutionException,拒绝执行 。
CallerRunsPolicy: 调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且要求任何一个任务请求都要被执行的话,你可以选择这个策略。
DiscardPolicy: 不处理新任务,直接丢弃掉。
DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表。
shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略。
isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true。
线程池配置数量需要根据实际情况决定,比如业务类型,机器的IO,CPU,内存配置等,可以参考业界做法,但是要因地制宜。比如netty默认就是:实际 cpu核数 * 2。
NettyRuntime.availableProcessors() * 2
1.如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1。
2.如果是IO密集型任务,参考值可以设置为2*NCPU。
Executors 是一个 Java 中的工具类。提供四种线程池创建方式,工厂方法来创建不同类型的线程池。Executors 的创建线程池的方法,创建出来的线程池都实现了ExecutorService 接口。常用方法有以下几个:
1.newFiexedThreadPool(int Threads):创建固定数目线程的线程池。
2.newCachedThreadPool():创建一个可缓存的线程池,调用 execute将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
3.newSingleThreadExecutor() 创建一个单线程化的 Executor。
4.newScheduledThreadPool(int corePoolSize) 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代 Timer 类。
不建议大家使用Executors这个类来创建线程池呢,阿里开发手册这样定义:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。
项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE,容易导致 OOM。
所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参数,自定义线程池。
package com.zrj.unit.juc; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.junit.Test; import java.util.concurrent.*; /** * 线程池 * 注意:ThreadFactoryBuilder这里需要运用guava包,自定义线程名称 * * @author zrj * @since 2021/8/20 **/ public class ThreadPoolExecutorTest { //线程池的核心线程数 private static int corePoolSize = 30; //能容纳的最大线程数 private static int maximumPoolSize = 200; //空闲线程存活时间 private static long keepAliveTime = 0L; //空闲线程存活时间 单位 private static TimeUnit unit = TimeUnit.MILLISECONDS; //创建线程的工厂类,自定义线程名称 private static String threadName = "thread-local-pool-%d"; private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(threadName).build(); //存放提交但未执行任务的队列 private static BlockingQueue<Runnable> threadFactory = new LinkedBlockingQueue<>(1024); //等待队列满后的拒绝策略 private static RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); //定义线程池 private static ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, namedThreadFactory, handler); // 定义售票窗口数 private static int saleWindows = 10; // 自定义线程名称 private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-%d").build(); // 倒计时锁存器 private static CountDownLatch latch = new CountDownLatch(saleWindows); /** * 模拟抢票 */ @Test public void saleTicket() { try { for (int i = 0; i < saleWindows; i++) { pool.execute(() -> { try { String threadName = Thread.currentThread().getName(); System.out.println(threadName + "窗口开始售票"); Thread.sleep(1000); System.out.println(threadName + "窗口售票完成"); } catch (InterruptedException e) { System.out.println("售票异常:" + e); } finally { latch.countDown(); } }); } } catch (Exception e) { System.out.println("系统异常:" + e); } finally { pool.shutdown(); } // 等待所有线程到达放行 try { latch.await(); } catch (Exception e) { System.out.println("系统异常," + e); } System.out.println("所有线程执行完成,继续执行主线程"); } }
springboot创建线程池,Spring提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接使用注解启用。
Async相当于是方法级别的线程,本身没有自定义线程池更加灵活
相当于是每进来一个请求就开启一个线程,超过核心线程数小于最大线程数放入队列,队列满了,继续创建线程直至达到最大线程数。
application.properties
# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-service-
ExecutorConfig
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 线程池的配置 * * @author zrj * @since 2021/9/9 **/ @Slf4j @EnableAsync @Configuration public class ExecutorConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } }
AsyncService
/**
* @author zrj
* @since 2021/9/9
**/
public interface AsyncService {
void executeAsync();
}
AsyncServiceImpl
package com.zrj.unit.service.impl; import com.zrj.unit.service.AsyncService; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; /** * 线程服务 * Async相当于是方法级别的线程,本身没有自定义线程池更加灵活 * 相当于是每进来一个请求就开启一个线程,超过核心线程数小于最大线程数放入队列, * 队列满了,继续创建线程直至达到最大线程数 * * @author zrj * @since 2021/9/9 **/ @Slf4j @Service public class AsyncServiceImpl implements AsyncService { @Override @Async("asyncServiceExecutor") public void executeAsync() { log.info( "start executeAsync" ); try { System.out.println( "异步线程要做的事情" ); System.out.println( "可以在这里执行批量插入等耗时的事情" ); Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.info( "end executeAsync" ); } }
AsyncController
package com.zrj.unit.controller; import com.zrj.unit.service.AsyncService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 线程控制器 * * @author zrj * @since 2021/8/30 **/ @RestController @RequestMapping("/async") public class AsyncController { @Resource private AsyncService asyncService; /** * 多线程 */ @GetMapping("/executor") public String executor() { //开30个线程 for (int i = 0; i < 30; i++) { asyncService.executeAsync(); } return "executor success"; } }
创建线程池时,ExecutorService,核心线程0,使用无界队列,会创建线程吗?
先说结论:
CorePoolSize是线程池的基本大小,也就是线程池中始终保持活跃的线程数量,不会回收。
PoolSize是线程池当前实际存在的线程数量,包括正在执行任务的线程和空闲等待任务的线程。
CorePoolSize:始终为0。
PoolSize:始终为1。
综上所述,当核心线程数为0,无界队列时,始终会有一个非核心线程执行,并且只有一个,而且是无界队列,所以也就是单线程执行,只有当无界队列满的时候达到Integer.MAX.VALUE时,会OOM。
package com.zrj.tools.util.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 创建线程池时,ExecutorService,核心线程0,使用无界队列,会创建线程吗 * * @author zrj * @since 2023/12/28 **/ public class ThreadUtils { public static void main(String[] args) { //创建线程池时,ExecutorService,核心线程0,使用无界队列,会创建线程吗 ExecutorService es = new ThreadPoolExecutor(0, 100000, 100000, TimeUnit.HOURS, new LinkedBlockingQueue<>()); es.execute(() -> { try { //等待3S,使B线程都进入队列 Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } long threadId = Thread.currentThread().getId(); System.out.println("A线程:" + threadId + ",CorePoolSize:" + ((ThreadPoolExecutor) es).getCorePoolSize()); System.out.println("A线程:" + threadId + ",PoolSize:" + ((ThreadPoolExecutor) es).getPoolSize()); System.out.println("A线程:" + threadId + ",QueueSize:" + ((ThreadPoolExecutor) es).getQueue().size()); try { //等待5S,开始执行队列中的B线程 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); for (int i = 0; i < 5; i++) { es.execute(() -> { long threadId = Thread.currentThread().getId(); System.out.println("B线程:" + threadId + ",CorePoolSize:" + ((ThreadPoolExecutor) es).getCorePoolSize()); System.out.println("B线程:" + threadId + ",PoolSize:" + ((ThreadPoolExecutor) es).getPoolSize()); System.out.println("B线程:" + threadId + ",QueueSize:" + ((ThreadPoolExecutor) es).getQueue().size()); }); } } }
执行结果
Connected to the target VM, address: '127.0.0.1:51177', transport: 'socket' A线程:14,CorePoolSize:0 A线程:14,PoolSize:1 A线程:14,QueueSize:5 B线程:14,CorePoolSize:0 B线程:14,PoolSize:1 B线程:14,QueueSize:4 B线程:14,CorePoolSize:0 B线程:14,PoolSize:1 B线程:14,QueueSize:3 B线程:14,CorePoolSize:0 B线程:14,PoolSize:1 B线程:14,QueueSize:2 B线程:14,CorePoolSize:0 B线程:14,PoolSize:1 B线程:14,QueueSize:1 B线程:14,CorePoolSize:0 B线程:14,PoolSize:1 B线程:14,QueueSize:0 Disconnected from the target VM, address: '127.0.0.1:51177', transport: 'socket' Process finished with exit code -1
具体参考文章:https://blog.csdn.net/qq_33333654/article/details/122260945
最终的出来的结论是:
1、当核心线程数为0的时候,会创建一个非核心线程进行执行
2、核心线程数不为0的时候,如果核心线程数在执行,会有一个非核心线程数从队列中取对象执行线程
3、核心线程数执行的是队列的take,非核心线程数执行队列的offer和poll
4、核心线程数不为0且队列为SynchronousQueue时,就成了单线程运行了
参考文档
Java线程池实现原理及其在美团业务中的实践
Java线程池原理及使用
Java线程池详解
https://blog.csdn.net/qq_33333654/article/details/122260945
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。