当前位置:   article > 正文

线程池简介及底层原理_线程池底层原理

线程池底层原理

一、线程池简介

背景:

  • 在系统当中,频繁地创建销毁线程,或者创建过多的线程都会给系统带来风险,轻者拖慢系统,出现卡顿现象,严重时可能出现内存溢出导致系统瘫痪。
  • 为此,我们在处理并发量大的业务逻辑时,常常使用线程池来管理和调度线程。

线程池的主要工作:

  • 线程复用;(避免频繁创建、销毁线程)
  • 控制最大并发量;(避免因为大量的线程导致程序崩溃)
  • 管理线程。(管理线程的创建、销毁、任务调度)

线程池的优点:

  • 降低资源消耗:通过重复利用自己创建的线程,降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限地创建,不仅会消耗资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。

线程池的核心:

在这里插入图片描述

架构实现:

  • Java 中的线程池是通过 Executor 框架实现的,该框架中用到了Executor、Executors、ExecutorService、ThreadPoolExecutor 这几个类。

在这里插入图片描述

二、线程池的几种创建方式

1.ThreadPoolExecutor()

最原始的线程池创建方式,下面前四种创建方式都是对 ThreadPoolExecutor() 的封装。

public ThreadPoolExecutor(int corePoolSize,       // 核心线程的数量
                          int maximumPoolSize,    // 最大线程数量
                          long keepAliveTime,     // 超出核心线程数量以外的线程空余存活时间
                          TimeUnit unit,          // 存活时间的单位
                          BlockingQueue<Runnable> workQueue,  // 保存待执行任务的队列
                          ThreadFactory threadFactory,        // 创建新线程使用的工厂(非必填)
                          RejectedExecutionHandler handler    // 当任务无法执行时的处理器(非必填)
                          ) {...}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

使用示例:

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,
            10,
            1,
            TimeUnit.MINUTES,
            new LinkedBlockingQueue<>(10000),
            r -> new Thread(r, "DemoThread-" + r.hashCode()));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • corePoolSize:核心线程池数量

    • 线程池中已有线程数 < 核心线程池数量,有新任务进来就新建一个线程,即使有的线程没事干。

    • 线程池中已有线程数 ≥ 核心线程池数量,有新任务进来不会新建线程了,空闲的线程就得去任务队列里取任务执行。

  • maximumPoolSize:最大线程数量

    • 核心线程池数量 + 核心以外的数量 ≤ 最大线程数量;
    • 如果任务队列满了,并且池中线程数小于最大线程数,会再创建新的线程执行任务。
  • keepAliveTime:核心以外的线程存活时间,即没有任务的线程的存活时间

    • 如果给线程设置 allowCoreThreadTimeOut(true),则核心线程再空闲时也会执行销毁倒计时。
    • 如果任务是多而容易执行的,可以调大这个参数,那样线程就可以在存活的时间里有更大可能接受新任务。
  • unit:存活时间的单位

    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • workQueue:保存待执行任务的阻塞队列

    • 不同的任务类型有不同的选择。
  • threadFactory:每个线程创建的地方(非必填)

    • 可以给线程起个好听的名字,设置个优先级啥的。
  • handler:拒绝策略(非必填)

    拒绝策略拒绝行为
    AbortPolicy直接抛出 RejectedExecutionException,本策略也是默认的拒绝策略。
    CallerRunsPolicy只要线程池没关闭,就直接用调用者所在的线程来运行任务。
    DiscardPolicy悄悄把任务放生,不做了。
    DiscardOldestPolicy把队列里待最久的那个任务扔了,然后再调用 execute() 尝试执行。
    • 我们也可以实现自己的 RejectedExecutionHandler 接口自定义策略,比如记录日志什么的。

2.Executors.newFixedThreadPool(int nThreads)

在这里插入图片描述

主要特点:

  • 1.创建一个定长线程池,可控制线程的最大并发数,超出的线程会在队列中等待。
  • 2.newFixedThreadPool 创建的线程池 corePoolSize 和 MaxmumPoolSize 是相等的,它使用的是 LinkedBlockingQueue。

3.Executors.newSingleThreadExecutor()

在这里插入图片描述

主要特点:

  • 1.创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务都按照指定顺序执行。
  • 2.newSingleThreadExecutor 将 corePoolSize 和 maximumPoolSize 都设置为 1,它使用的是LinkedBlockingQueue。

4.Executors.newCachedThreadPool()

在这里插入图片描述

主要特点:

  • 1.创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程;若无可回收,则创建新线程。
  • 2.newCachedThreadPool 将 corePoolSize 设置为 0,maximumPoolSize 设置为 Integer.MAX_VALUE,它使用的是 SynchronousQueue,也就是说来了任务就创建线程运行,如果线程空闲超过 60 秒,就销毁线程。

5.Executors.newScheduledThreadPool(int corePoolSize)

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}
  • 1
  • 2
  • 3

主要特点:

  • 1.创建一个延迟执行的线程池,用来处理延时任务、定时任务、周期任务。
  • 2.newScheduledThreadPool 可以手动设置 corePoolSize 的大小,将 maximumPoolSize 设置为 Integer.MAX_VALUE,它使用的是 DelayedWorkQueue,也就是说来了任务之后,会延迟执行。

注意:

  1. 扩展内容:Executors创建的4中线程池的使用 - https://www.cnblogs.com/ljp-sun/p/6580147.html;

  2. 阿里规约明确规定,不允许使用Executors去创建线程池。

    在这里插入图片描述

    OOM:Out Of Memory,内存溢出。

  3. 扩展内容:Executors为什么会导致 OOM,如何正确的创建线程池? - https://www.zhihu.com/question/23212914

三、如何正确的创建线程池

方法:

  • 使用guava提供的ThreadFactoryBuilder创建ThreadFactory,可以方便地重命名线程

代码:

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.time.LocalDateTime;
import java.util.concurrent.*;

/**
 * <p> @Title ExecutorsDemo
 * <p> @Description 创建线程池Demo
 *
 * @author zhj
 * @date 2020/12/9 13:25
 */
public class ExecutorsDemo extends Thread {

    @Override
    public void run() {
        System.out.println("date: " + LocalDateTime.now());
    }

    /**
     * 使用guava提供的ThreadFactoryBuilder创建ThreadFactory,可以方便地重命名线程
     */
    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

    /**
     * 创建线程池
     * 【恶搞】:此处可以尝试使用一下 Executors.newCachedThreadPool(); 
     *         注意:会迅速把计算机的内存吃掉导致死机。【不要轻易尝试,后果自负】
     * 最佳线程数: 操作系统内核数+2
     * 操作系统内核数: Runtime.getRuntime().availableProcessors()
     */
    private static ExecutorService pool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 2, 200,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
            namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) throws InterruptedException {
        // 由于执行很快,瞬间的任务量已经超过了队列长度+线程池的最大线程数,
        // 根据我们设置的 AbortPolicy 拒绝策略,会抛 RejectedExecutionException
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            pool.execute(new ExecutorsDemo());
            // 加上此行代码,可以给系统时间让线程去执行任务,然后去接新的任务
            // Thread.sleep(1L);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

输出结果:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[Thread-1224,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@2c13da15[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at com.tsingsoft.forecast.vo.ExecutorsDemo.main(ExecutorsDemo.java:39)
date: 2020-12-09T14:09:16.088
date: 2020-12-09T14:09:16.089
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

  • RejectedExecutionException 异常为 AbortPolicy 拒绝策略的执行结果。
  • 添加 Thread.sleep(1L); 之后就不会出现这个异常了。

四、线程池的底层原理

线程池的状态:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

其中ctl这个AtomicInteger的功能很强大,其高3位用于维护线程池运行状态,低29位维护线程池中线程数量。

状态

1、RUNNING:-1 << COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务。

2、SHUTDOWN:0 << COUNT_BITS,即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务。

3、STOP:1 << COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务。

4、TIDYING:2 << COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法。

5、TERMINATED:3 << COUNT_BITS,即高3位为100,低29位为0,terminated()方法调用完成后变成此状态。

这些状态均由int型表示,大小关系为 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这个顺序基本上也是遵循线程池从 运行 到 终止这个过程。

方法

  • runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态。

  • workerCountOf(int c)方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量。

  • ctlOf(int rs, int wc)方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl。

线程池的执行过程:

在这里插入图片描述

  1. 在创建了线程池后,等待提交过来的任务请求。

  2. 当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:

    (1)如果,线程池中的线程数量 < corePoolSize,则每来一个任务,就创建线程去执行这个任务;

    (2)如果,线程池中的线程数量 ≥ corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;

    (3)如果,线程池中的线程数量 ≥ corePoolSize,并且队列 workQueue 已满,

    但线程池中的线程数量 < maximumPoolSize,则会创建新的线程来处理被添加的任务;

    (4)如果,线程池中的线程数量 ≥ maximumPoolSize,则会采取拒绝策略进行处理。




参考文章:(内容略有改动)

1.线程池的简介及底层原理 - https://www.cnblogs.com/lveyHang/p/12060832.html

2.线程池底层原理 - https://www.jianshu.com/p/483656576bf2

3.Java线程池的底层实现与使用 - https://www.cnblogs.com/sxkgeek/p/9343519.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/249218
推荐阅读
相关标签
  

闽ICP备14008679号