当前位置:   article > 正文

线程池源码解析_线程池源码详解

线程池源码详解

线程池源码解析

  • Java引入Excutor框架将任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池即可。

1. 线程池的用处

  • 使用线程池的时机
    • 单个任务处理时间比较短
    • 需要处理的任务数量很大
  • 线程池的优点
    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
    • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2. Executor接口

  • 思想:将任务提交和任务执行解耦。无需关注如何创建线程,如何调度线程来执行任务,用户只需提供实现Runnable接口的任务对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分
  • 线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程
  • 线程池的运行主要分成两部分:任务管理、线程管理。
  • 任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转
    • (1)直接申请线程执行该任务;
    • (2)缓冲到队列中等待线程执行;
    • (3)拒绝该任务。
  • 线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收

3. 线程池实现类ThreadPoolExecutor的构造方法参数

  • 构造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • corePoolSize
    • 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行
    • 核心线程数线程数定义了最小可以同时运行的线程数量。即便当前活动的线程有空闲的,只要这个活动的线程数量小于设定的核心线程数,那么依旧会启动一个新线程来执行任务。也就是说不会去复用任何线程
  • maximumPoolSize
    • 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
  • keepAliveTime
    • 当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁
  • unit
    • keepAliveTime 参数的时间单位
  • workQueue
    • 用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
      • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务,支持公平锁和非公平锁;
      • LinkedBlockingQuene:基于链表结构的有界阻塞队列,按FIFO排序任务,默认的长度为Integer.MAX_VALUE;
      • SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态;
      • PriorityBlockingQuene:支持线程优先级排序的无界阻塞队列,默认自然序进行排序,也可以自定义compareTo()方法来指定排序规则,不能保证同优先级元素的顺序;
  • threadFactory
    • 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,方便出错时进行回溯
  • handler
    • 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
      • AbortPolicy:直接抛出异常,默认策略;
      • CallerRunsPolicy:用调用者所在的线程来执行任务;
      • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
      • DiscardPolicy:直接丢弃任务;

4. 线程池的属性标识

4.1 线程池核心属性

//原子整型。1.声明当前线程池的状态;2.声明线程池中的线程数
//高3位是线程池的状态,低29位是线程池中的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //方便后面做位运算
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //线程池中线程的最大数量2^29-1

// 高3位是线程池的状态
//-1用二进制表示就是32个1,左移29位是高3位是1,低29位是0
 
//111,代表线程池为RUNNING,代表正常接收任务
private static final int RUNNING    = -1 << COUNT_BITS; 
//000,代表线程池为SHUTDOWN状态,不接受新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也正常处理
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//001 代表线程池为STOP状态,不接受新任务,也不去处理阻塞队列中的任务,同时会中断正在执行的任务
private static final int STOP       =  1 << COUNT_BITS;  
//010 代表线程池为TIDYING状态,过渡的状态,代表线程池即将关闭
private static final int TIDYING    =  2 << COUNT_BITS; 
//011 TERMINATED状态,代表线程池已经关闭
private static final int TERMINATED =  3 << COUNT_BITS;
//~CAPACITY代表高3位是1,低29位是0,得到线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获得线程池中工作线程的数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
//rs为高3位,表示线程池状态;wc是低29位,ctl是合并的结果
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。线程池源代码中,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多

4.2 线程池状态

  • 线程池状态转换图
    • 从前面每个状态对应的数值能得到:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    • 状态转换是由低到高的(-1 < 0 < 1 < 2 < 3)

在这里插入图片描述

5 任务执行机制

5.1 任务调度

  • 任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。这部分就是线程池的核心运行机制。

  • 所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

    • 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务
    • 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务
    • 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中
    • 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务
    • 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常
  • 从execute()方法的源码中可以看出在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会创建新线程来执行任务,除非调用了prestartCoreThread/prestartAllCoreThreads方法提前启动线程池中的核心线程

  • 线程池的execute方法执行

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    //获取代表线程池状态和线程数量的整数
    int c = ctl.get();
    //工作线程数<核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //创建核心线程执行任务,成功就返回
        if (addWorker(command, true))
            return;
        //创建核心线程数失败,重新获取ctl
        c = ctl.get();
    }
    //确认线程池是Running状态,将任务添加到阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        //再次获取ctl
        int recheck = ctl.get();
        //再次判断是否是RUNNING,如果不是RUNNING状态,移除任务
        if (! isRunning(recheck) && remove(command))
            //拒绝策略
            reject(command);
        //如果线程池处在RUNNING状态,但是工作线程为0
        //线程池中allowCoreThreadTimeOut参数允许核心线程和非核心线程超时时间一样
        else if (workerCountOf(recheck) == 0)
            //阻塞队列有任务但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务
            addWorker(null, false);
    }
    //线程池是Running状态,阻塞队列已满,创建非核心线程处理任务
    else if (!addWorker(command, false))
    //if语句中的非核心线程创建失败,说明线程池已经达到最大线程数,拒绝策略
        reject(command);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

5.2 任务缓冲

  • 线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务
  • 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素

5.3 任务申请

  • 任务的执行有两种可能:

    • 一种是任务直接由新创建的线程执行。
    • 一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。
  • 线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现。

  • getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

  • 整个getTask操作在自旋下完成:
    1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
    2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;

  • 所以,线程池中实现的线程可以一直执行由用户提交的任务。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

5.4 任务拒绝

  • 任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池

6 Worker线程管理

6.1 Worker线程

  • 线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。
private final class Worker 
    extends AbstractQueuedSynchronizer implements Runnable{
        final Thread thread;//Worker持有的线程
        Runnable firstTask;//初始化的任务,可以为null
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

6.2 Worker线程增加

  • 增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

  • addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
    //实现工作线程数+1的操作
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // rs >= SHUTDOWN,表明线程状态不是RUNNING
        //rs == SHUTDOWN,表明线程池是SHUTDOWN状态,外层取反,表明只能是STOP以上更高的状态,这是不需要添加线程处理任务。
        //firstTask == null,表明任务为空。外层取反,表示任务不为空,SHUTDOWN状态以上不接收新任务,创建工作线程失败
        //阻塞队列不为空,取反,为空,大于等于SHUDOWN状态,证明阻塞队列中没有任务处理,不会创建新线程处理任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            //创建工作线程失败
            return false;

        for (;;) {
            //获取工作线程个数
            int wc = workerCountOf(c);
            //当前线程数已经大于线程池最大容量,不去创建
            //判断工作线程数是否超过核心线程数或者最大线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            
            //使用CAS方法将工作线程数+1
            if (compareAndIncrementWorkerCount(c))
                //跳出外层的for循环
                break retry;
            c = ctl.get();  
            //判断状态是否变化
            if (runStateOf(c) != rs)
                //继续执行外层的for循环
                continue retry;
            // 线程池状态没变继续执行内侧循环
        }
    }

    //如何创建工作线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    //Worker w就是工作线程
    Worker w = null;
    try {
        //创建工作线程w,传入需要执行的任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //获取线程池的全局锁,避免该工作线程w添加任务时,其他线程干掉了线程池,干掉线程池前需要先获取这个锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                //RUNNING状态
                //SHUTDOWN状态,创建空任务工作线程,处理阻塞队列中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //将工作线程添加到集合中
                    workers.add(w);
                    //获取工作线程个数
                    int s = workers.size();
                    //如果工作线程数大于之前记录的最大工作线程数,就替换
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //添加工作线程成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动工作线程
                t.start();
                //启动工作线程成功
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    //返回工作是否启动
    return workerStarted;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

6.3 Worker线程执行任务

  • runWorker方法
  • runWorker方法是线程池的核心:
    • 1.while循环不断地通过getTask()方法获取任务。
    • 2.getTask()方法从阻塞队列中取任务。
    • 3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
    • 4.执行任务。
    • 5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //1. 任务不为空,执行任务
        //2. 如果任务为空,通过getTask方法从阻塞队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //获取当前状态,是否大于等于STOP.
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //执行任务前的操作
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //开始执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //线程执行完毕的后续处理
        processWorkerExit(w, completedAbruptly);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

7 其他方式创建线程池

  • 方式一:通过ThreadPoolExecutor构造函数实现(推荐)。
public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 方式二:通过 Executor 框架的工具类 Executors 来实现我们可以创建三种类型的 ThreadPoolExecutor:
    • FixedThreadPool
    • CachedThreadPool
    • SingleThreadExecutor
  • Executors 返回线程池对象的弊端如下:
    • FixedThreadPool (核心和最大都为指定的值,能确保同时并发的数量)和 SingleThreadExecutor (核心和最大都为1,唯一的工作线程保证工作按照顺序执行):允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

    • CachedThreadPool(核心线程池数为0,最大为MAX,只要有请求进来,就会一直等待,等到运行为止。缓存线程的作用)和 ScheduledThreadPool(核心为指定的,最大为MAX,配合DelayedWorkQueue执行定时任务。) :允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

8 线程池大小确定

  • 设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。CPU 根本没有得到充分利用
  • 设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率
  • 线程的上下文切换
    • 多线程编程中一般线程的个数都大于CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换
      • CPU 密集型任务(N+1): 这种任务消耗的主要是CPU资源,可以将线程数设置为 N(CPU 核心数)+ 1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间
      • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用CPU来处理,这时就可以将CPU交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N
  • 场景1:快速响应用户请求。从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间。另外,使用线程池也是有考量的,这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。
  • 场景2:快速处理批量任务。这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。

9 其他问题

  • 核心线程永远不会销毁吗?可能会被销毁allowsCoreThreadTimeOut=true,核心线程也可以被终止。
  • 核心线程(Worker)即使一直空闲也不终止,是通过workQueue.take()实现的,它会一直阻塞到从等待队列中取到新的任务。非核心线程空闲指定时间后终止是通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个空闲的Worker只等待keepAliveTime,如果还没有取到任务则循环终止,线程也就运行结束了。
  • keepAliveTime=0表示非核心线程执行完立刻终止。默认情况下,keepAliveTime小于0,初始化的时候才会报错;但如果allowsCoreThreadTimeOut,keepAliveTime必须大于0,不然初始化报错。
  • shutdown():关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow():关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

10 参考文献

Java线程池实现原理及其在美团业务中的实践

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/944544
推荐阅读
相关标签
  

闽ICP备14008679号