当前位置:   article > 正文

Java并发编程第6讲——线程池(万字详解)_java中常用线程池

java中常用线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池,本篇文章就详细介绍一下。

一、什么是线程池

定义:线程池是一种用于管理和重用线程的技术(池化技术),它主要用于提高多线程应用程序的性能和效率。

ps:线程池、连接池、内存池和对象池等都是编程领域中典型的池化技术。

首先有关线程的使用会出现两个问题:

  • 线程是宝贵的内存资源、单个线程约占1MB空间,过多分配易造成内存溢出。
  • 频繁的创建及销毁线程会增加虚拟机回收频率、资源开销,造成性能下降。

基于如上问题,出现了线程池:

  • 线程容器,可设定线程分配的数量。
  • 将预先创建的线程对象存入池中,并重用线程池中的线程对象。
  • 避免频繁的创建和销毁。

Java中线程池的继承关系如下:

二、ThreadPoolExecutor

Executor框架最核心的类是ThreadPoolExecutor,我们可以通过ThreadPoolExecutor类来创建一个线程池。

2.1 ThreadPoolExecutor构造函数

ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。

2.2 线程池的7个核心参数

从ThreadPoolExecutor的构造函数可以看出,创建一个线程池需要7个核心参数,下面我们介绍一下这7个参数的含义:

  • 核心线程数数量(corePoolSize):当线程池被创建时,在你池子中初始化多少个线程。

  • 最大线程数(maximumPoolSize):当我的所有核心线程数都去干活时,又来了一个任务,如果我的当前线程数小于我最大线程数,这时候可以再帮你创建一个线程去指定你的任务。

  • 线程闲置时间(keepAliveTime):额外线程完成任务后,存活的时间。

  • 限制时间的单位(unit):存活时间单位。

  • 工作队列(workQueue):当没核心线程去处理任务时,会把任务放在工作队列中,当有闲下来的线程时再去执行队列的任务,常见的工作队列有以下几种,前三种用的最多:

    • ArrayBlockingQueue:列表形式的工作队列,必须要有初始队列大小,有界队列,先进先出(FIFO)。

    • LinkedBlockingQueue:链表形式的工作队列,可以选择设置初始队列大小,有界(设置了初始大小)/无界队列(没设置) ,先进先出(FIFO)。

    • SynchronousQueue:这不是一个真正的队列,而是一种在线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个线程,否则根据拒绝策略,这个任务将被拒绝。使用直接移交更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取任务。只有当线程是无界的或者可以拒绝任务时,SynchronousQueue才有价值。

    • PriorityBlockingQueue:优先级队列,有界队列,根据优先级来安排任务,任务的优先级是通过自然顺序或Comparator来定义的。

    • DelayedWorkQueue:延迟的工作队列,无界队列。

  • 创建线程的工厂(threadFactory):线程池不会帮你创建线程,这时候就要用到线程工厂:

    • DefaultThreadFactory:默认线程工厂,创建一个新的、非守护的线程,并且不包括特殊的配置信息。

    • PrivilegedThreadFactory:通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext、ContextClassLoader。如果不使用privilegedThreadFactory,线程池创建的线程将从在需要新线程时调用execute或submit的客户程序中继承访问权限。

    • 自定义线程工厂:可以自己实现ThreadFactory接口来自定义线程工厂。

  • 拒绝策略(handler):当你的线程数达到最大,工作队列任务也满了,就执行拒绝策略:

    • AbortPolicy:抛出异常!(RejectedExecutionException),默认的拒绝策略。(调用者可以将异常进行捕获,然后根据需求处理代码)

    • CallerRunsPolicy:调用者自己处理任务!(要把任务派发给我的线程池,要有一个线程执行操作,如果没有闲置的线程,由调用者自己处理任务)

    • DiscardOldestPolicy:丢弃任务队列中最老的任务,把自己放进去!

    • DiscardPolicy:丢弃掉当前任务!

2.3 线程池Demo

我们可以使用ThreadPoolExecutor创建一个线程池,然后调用execute()或submit()的方法来向线程池中提交任务。

2.3.1 execute()方法(自己的方法)

public void execute(Runnable command)

execute()方法没有返回值,所以它适用于不需要返回值的任务,当然也无法判断任务是否被线程池执行成功。

  1. public class ThreadPoolExecutorTest {
  2. public static void main(String[] args) {
  3. //1.创建线程池
  4. ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,
  5. 3, 60L, TimeUnit.SECONDS,
  6. new LinkedBlockingQueue<>(1),
  7. Executors.defaultThreadFactory(),
  8. new ThreadPoolExecutor.DiscardOldestPolicy());
  9. //2.创建任务
  10. Runnable runnable = () -> {
  11. try {
  12. Thread.sleep(1000);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. System.out.println(Thread.currentThread().getName() + "执行了。。。");
  17. };
  18. //3.提交任务
  19. for (int i = 0; i < 5; i++) {
  20. poolExecutor.execute(runnable);
  21. }
  22. //4.关闭线程池
  23. poolExecutor.shutdown();
  24. }
  25. }

上述例子我们创建了一个核心线程数为2、最大线程数为3的线程池,然后通过execute方法提交了5个任务给线程池执行。

2.3.2 submit()方法(AbstractExecutorService父类的方法)

  • public Future<?> submit(Runnable task)

  • public <T> Future<T> submit(Runnable task,T result)

  • public <T> Future<T> submit(Callable<T> task)

sublmit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个对象可以判断任务是否执行成功,并且可以通过get()方法以阻塞的方式来获取返回值。

  1. public class ThreadPoolExecutorTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. //1.创建线程池
  4. ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,
  5. 1, 60L, TimeUnit.SECONDS,
  6. new LinkedBlockingQueue<>(1),
  7. Executors.defaultThreadFactory(),
  8. new ThreadPoolExecutor.DiscardOldestPolicy());
  9. //2.提交任务,Future标识将要执行任务的结果,submit可以传入一个Callable<T>对象
  10. Future<Integer> future = poolExecutor.submit(new Callable<Integer>() {
  11. private int sum = 0;
  12. @Override
  13. public Integer call() throws Exception {
  14. for (int i = 0; i < 100; i++) {
  15. sum += i;
  16. Thread.sleep(10);
  17. }
  18. return sum;
  19. }
  20. });
  21. //3.获取任务的结果(等待任务执行完毕才返回,阻塞)
  22. System.out.println(future.get());
  23. //4.关闭线程池
  24. poolExecutor.shutdown();
  25. }
  26. }

2.4 当一个任务提交到线程池后

  • 首先查看核心线程数是否达到最大

    • 否:创建核心线程直接执行任务

    • 是:尝试将任务放到工作队列中

  • 查看工作队列是否已满

    • 否:将任务放入到工作队列。

    • 是:尝试创建非核心线程(最大线程数-核心线程数)。

  • 查看最大线程数是否达到最大

    • 否:创建非核心线程执行任务

    • 是:执行拒绝策略

2.5 常用方法

除了在创建线程池时指定上述参数值外,还可在线程池创建后通过如下方法进行设置:

此外还有一些方法:

  • getCorePoolSize():返回线程池的核心线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;
  • getMaximumPoolSize():返回线程池的最大线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;
  • getLargestPoolSize():记录了曾经出现的最大线程个数(水位线);
  • getPoolSize():线程池中当前线程的数量;
  • getActiveCount():Returns the approximate(近似) number of threads that are actively executing tasks;
  • prestartAllCoreThreads():会启动所有核心线程,无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到 corePoolSize;
  • prestartCoreThread():会启动一个核心线程(同上);
  • allowCoreThreadTimeOut(true):允许核心线程在KeepAliveTime时间后,退出;

2.6 线程数设定多少合适

这个是个高频的面试点,至于设定多少,流传最多的就是“公式”,所谓的公式,一般情况下,需要根据你的任务情况来设置线程数,任务可能分为两种类型——CPU密集型和IO密集型:

  • CPU密集型:则线程池大小设置为N+1。
  • IO密集型:则线程池大小设置为2N+1。

上面的N为CPU总核数,但在实际场景下,公式只能是当作一个参考。

很多时候,我们应用部署在云服务器上,有时候我们的机器显示是8核的,但实际上使用的只是虚拟机而已,并不是物理机,这就导致大多数情况下发挥不出8核的作用来。

而且,上面的公式中,前提要求是知道你的应用是IO密集型还是CPU密集型,那么,评判一个应用是CPU密集型还是IO密集型的标准是什么?真的能明确区分出来吗?

还有一点就是,现在很多CPU都采用了超线程技术,也就是利用特殊的硬件指令,把两个逻辑内核模拟成两个物理芯片,让单个处理器都能使用线程级并行计算。所以我们可以看到“4核8线程的CPU”,也就是物理内核有4个,逻辑内核有8个,如果用上述的公式,貌似按照4和8配置都不合理,因为超线程技术的性能提升也并不是100%的。

所以在设定线程数的时候,要考虑什么业务场景、什么机器配置、多大的并发量、一次业务处理整体耗时是多少,最后在上线的时候可以根据公式大致设置一个数值,然后再根据你自己的实际业务情况,以及不断的压测结果,再不断调整,最终达到一个相对合理的值。

三、线程池的实现原理(源码分析)

下面我们通过源码来分析一下ThreadPoolExecutor类中三个比较核心的方法。

3.1 添加一个任务(execute()方法)

首先我们看一下线程池中的execute()方法,该方法用于向线程池中添加一个任务。

源码:

分析:

  • 第一个红框:先检查是否有空闲的核心线程

    • workCountOf(c)方法根据ctl的低29位来得到有效的线程数。

    • 判断有效的线程数是否小于核心线程数。

    • 如果是是。则创建一个线程来处理任务(核心线程)。

  • 第二个红框:走到这,说明核心线程数已满了

    • isRunning(c)方法判断当前是否是运行状态,如果是,则尝试能否将任务放入工作队列(work.offer(command)方法)。

    • 如果也添加成功了。则再次拿到ctl值,再次检查状态,如果不再运行,并且移除添加的任务成功,则抛出拒绝策略。

    • 如果在运行,则检查有效线程数是否为0,如果是,则新建一个线程(非核心线程)。

  • 第三个红框:这里说明工作队列也满了

    • addwork(command,flase)方法尝试新建一个线程来处理任务(非核心)。

    • 如果失败,则调用拒绝策略。

3.2 添加work线程(addworker())

从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务。

这块代码比较长,所以我们把它分成两段来介绍,先看第一段。

源码:

分析:

  • 第一个红框:做是否能够添加工作线程条件过滤,这里的情况比较多,仅进行关键代码的解释

    • rs为运行状态,源码中频繁使用大小关系来作为条件判断,大小关系:RUNNING(运行)<SHUTDOWN(关闭)<STOP(停止)<TIDYING(整理)<TERMINATED(终止)。

    • firstTask判断提交的任务是否为空。

    • return false表示不处理提交的任务,直接返回。

  • 第二个红框:做自旋,更新创建线程数量

    • 第一个if:校验有效线程数是否超过阈值,如果超过则不处理提交的任务。

    • 第二个if:使用CAS将workerCount+1,修改成功则跳出循环。

    • 第三个if:重新读取ctl,判断当前运行状态,如果不等于上面获取的运行状态rs,说明rs被其它线程修改了,跳到retry重新校验线程池的状态。

ps:retry是java中的goto语法,只能运行在break和cotinue后面。

接着看后面的代码。

源码:

分析:先说下两个变量 workerStarted——Worker线程是否启动,workerAdded——Worker线程是否成功增加。

  • 第一个红框:获取线程池主锁

    • 用firstTask和当前线程创建一个Worker。

    • 拿到Worker对应的线程t。

    • 如果t不为空,获取线程池主锁,通过ReentrantLock锁来保证线程安全。

  • 第二个红框:添加线程到workers中(线程池中)。

  • 第三个红框:如果woker添加成功,则启动新建的线程执行。

我们看看这个wokers是什么:

 一个HashSet,所以,线程池的存储结构其实就是一个HashSet。

3.3 worker线程处理队列任务(runWorker())

上文addWorker()方法里提到,当Worker里的线程启动时,就会调用该方法。

源码:

分析:

  • 第一个红框:取任务执行(如果是第一次执行任务或者能从队列中取到任务)。

  • 第二个红框:获取到任务后,执行任务开始前操作钩子。

  • 第三个红框:执行任务。

  • 第四个红框:执行任务后钩子

ps:这两个钩子(beforeExecute、afterExecute)允许我们自己继承线程池,也就是我们自己可以重写这两个方法,做任务执行前、后的处理。

四、Java中自带的5种线程池

这五种线程池都是由Executors工具类提供,该类看起来功能还是比较强大的,又用到了工厂模式,扩展性很强,重要的是用起来还特别方便,如:

  1. //创建一个固定大小的线程池
  2. ExecutorService executorService = Executors.newFixedThreadPool(1);

接下来我们详细分析一下Java自带的五种线程池。

4.1 newFixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池,源码如下:

可以看到,核心线程数(corePoolSize)和最大线程数(maximumPoolSize)都是传进来的nThreads,这就意味着这个数量在其生命周期内不会变化,当线程池中的线程都在工作时,新提交的任务会被放置到一个无界队列中(没有设置初始大小)等待被执行。

4.2 newWorkStealingPool

用来执行大任务的线程池,WorkStealingPool可以自适应地调整线程池大小,它会根据处理器核数创建相应数量的线程池。源码如下:

可以看出,ForkJoinPool的构造函数被调用来创建WorkStealingPool线程池,Runtime.getRuntime().availableProcessors()来获取处理器的核心数量,来作为线程池的大小:

为什么说它是执行大任务的线程池呢?因为它本质是一个ForkJoinPool,而ForkJoinPool有一个工作窃取的概念:

工作窃取:当我去执行一个特别大的任务时,感觉我用一个线程去执行的时候周期会很长,那么我可以通过自己的业务去把他拆分成很多很多个小的业务,比如我的任务需要5分钟,那么我把它拆分成10个,可能就半分种就搞定了,然后再把这10个任务交给newWorkStealingPool线程池,找一些没事干的线程去帮我干这个任务

4.3 newSingleThreadExecutor

这是一个使用单个worker线程的Executor,下面是它的源码:

可以看到它的核心线程数和最大线程数都是1,其它参数都和FixedThreadPool相同,既然它是单线程,那么就可以保证任务的顺序执行

4.4 newCachedThreadPool

这是一个会根据需要创建新线程的线程池,也就是大小可变的线程池,下面是它的源码:

可以看到,核心线程数为0,最大线程数为Integer的最大值(可以理解为无界),CachedThreadPool使用没有容量的SynchronousQueue(上面有提到过)作为线程池的工作队列,但最大线程数是无界的,这就意味着:如果主线程提交任务的速度高于CachedThreadPool线程处理任务的速度时,CachedThreadPool会不断地创建新线程。

ps:极端情况下,Cached会因为创建过多的线程而耗尽CPU和内存资源。

4.5 newScheduledThreadPool

它是可以执行定时任务的线程池,用ScheduledThreadPoolExecutor的构造函数来创建,而ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,源码如下:

可以看到核心线程数是传进来的,最大线程数为Integer的最大值,工作队列用的是DelayWorkQueue延迟队列、无界。

ScheduledThreadPoolExecutor的实现:

  • ScheduledThreadPoolExecutor会把待调度的任务(ScheduledFutureTask:下面都简化为task)放到一个DelayQueue中。

  • task主要包含3个变量:

    • long time:表示这个任务将要被执行的具体时间。

    • long sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。

    • long period:表示任务执行的间隔周期。

  • DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的task进行排序。排序时,time小的排在前面(时间早的任务先执行),如果两个time相同,就比较sequenceNumber,小的排在前面。

代码Demo:

  1. public class ScheduleDemo {
  2.    public static void main(String[] args) {
  3.        //创建ScheduledExecutorService线程池
  4.        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
  5.        //延迟5秒执行任务
  6.        executor.schedule(() -> {
  7.            //要执行的任务
  8.       }, 5, TimeUnit.SECONDS);
  9.        //定时执行任务(延迟5秒开始,每隔10秒执行一次)
  10.        executor.schedule(() -> {
  11.            //要执行的任务
  12.       }, 5, 10, TimeUnit.SECONDS);
  13.        // 定时执行任务(延迟 5 秒开始,每隔10秒执行一次,上一次任务执行完成后再延迟 10 秒)
  14.        executor.scheduleWithFixedDelay(() -> {
  15.            // 要执行的任务
  16.       }, 5, 10, TimeUnit.SECONDS);
  17.   }
  18. }

4.6 为什么不推荐通过Executors构建线程池

上述五种线程池,都可以通过Executors工具类来创建出来,使用起来也很方便,但是在阿里巴巴开发手册中也明确指出,不允许使用Executors创建线程池,并且指出了存在的弊端:

还有一点就是当你通过Executors创建线程池的时候,它创建的线程池大多都是已经帮你设置好了的参数,很多选项是没办法自定义的,在特殊的业务场景下,Executors工具类下的线程池也并不是“万能的”。

正确的做法就是通过ThreadPoolExecutor的构造函数来自己定义线程池。

除此之外,还可以通过一些开源类库,比如apache和guava等,笔者推荐使用guava提供的ThreadFactoryBuilder来创建线程池:

  1. public class ThreadFactoryDemo {
  2.    public static void main(String[] args) {
  3.        //线程工厂
  4.        ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
  5.        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 0L,
  6.                TimeUnit.SECONDS, new LinkedBlockingQueue<>(2048),
  7.                factory, new ThreadPoolExecutor.AbortPolicy());
  8.        for (int i = 0; i < 100; i++) {
  9.            executor.execute(() -> {
  10.                //业务代码
  11.           });
  12.       }
  13.   }
  14. }

上述方式可以自定义线程名称,更方便出错时的溯源。

五、总结

  • 线程池是一种用于管理和复用线程的机制。通过线程池,我们可以创建一组可用的线程,并且可以根据需要执行任务,避免频繁地创建和销毁线程,提高系统的性能和稳定性。
  • 使用ThreadPoolExcutor类可以自定义线程池(也是创建线程池的正确做法),通过构造函数的7个核心参数,我们可以找到自己合适的线程池。
  • 任务提交有两种方式,一种是execute():适用于没有返回值的情况,ThreadPoolExecutor自己的方法,第二种是submit():适用于需要返回值的情况,ThreadPoolExecutor父类的方法。
  • ThreadPoolExecutor类的三个核心方法:execute()、addWorker()和runWorker(),了解其源码,才更有助于我们使用和调优线程池。
  • Java中自带的5种线程池,通过Executors工具类创建,虽说用起来很方便,但并不推荐使用此方法来创建线程池。

 End:希望对大家有所帮助,如果有纰漏或者更好的想法,请您一定不要吝啬你的赐教声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】

推荐阅读
相关标签