当前位置:   article > 正文

Java线程池源码分析(一)_线程池源码解析

线程池源码解析

Java线程池源码分析(一)

使用线程池场景,好处,不在本文范围内,我们分析的是源码。
带着几个问题我们来分析源码:

①线程池的池子是哪个数据结构
②线程池构造方法的参数的含义
③FutureTask如何获取到结果,任务没完成就ft.get()是怎么阻塞的
④线程池提交runnable和callable是有什么区别和联系
⑤工作线程Worker是如何处理池子和阻塞队列的任务的
⑥ coreSize个线程数是如何保持住的
⑦ 线程池是如何进程保持在哪里的(除非你手动shutDown)

如上是基础问题,但能衡量你是否熟悉线程池,如果答不上来还是进来补一下吧,大部分文章上来就是高并发情况下线程池数量和队列选择等等。不能循序渐进,我也看过别人写的文章,我觉得看完之后还是有很多地方不够清晰,所以自己来吧,希望对有同感的读者有帮助。

PS:本文正是按照一个提交请求来跟踪代码的,能让读者有一条清晰明了的了解线程池工作原理,解释一些基础线程池概念,线程池和阻塞队列的交互,高级部分在高级篇的博文中。
我一直习惯从使用开始入手,有些人上来就画图,各种继承关系,各种属性,我觉得这样有时候会让人上来就迷糊,所以我不用这种方式。

还是老套路上一段代码,从怎么使用开始入手:

public void go() {
    ExecutorService pool = Executors.newFixedThreadPool(2);
    Random random = new Random();

    pool.submit(new Runnable() {

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "开始执行任务");
                Thread.sleep(random.nextInt(111));
                System.out.println(Thread.currentThread().getName() + "执行任务完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    pool.shutdown();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

直接逐行分析,进入线程池的构造方法,参数比较多,每一个都很重要。
Executors.newFixedThreadPool(2)工具类初始化了线程池,构造方法如下
线程池构造方法
总共7个,每个都挺重要,但是目前看不出来。
ps:不要看网上一些帖子简单几句话说完了:
小于等于coreSize 直接new Worker
大于coreSize放入阻塞队列BlockQueue
队列满了在放入池子中直到达到maxSize
maxSize在放入就使用拒绝策略
只了解这些真的体现不出你真的熟悉线程池,不熟悉的背一下面试题都能答出来,继续往下读看干货!!!!

在往下来pool.submit();两个重载方法
submit(Callable task)

 public <T> Future<T> submit(Callable<T> task) {
     if (task == null) throw new NullPointerException();
     RunnableFuture<T> ftask = newTaskFor(task);
     execute(ftask);
     return ftask;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

submit(Runnable)

 public <T> Future<T> submit(Runnable task, T result) {
     if (task == null) throw new NullPointerException();
     RunnableFuture<T> ftask = newTaskFor(task, result);
     execute(ftask);
     return ftask;
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

都执行了如下两行代码:

1) newTaskFor(task,val)
2) execute(futureTask)

Runnable和Callable都转成了RunnableFuture

newTaskFor(task,val)
这个时候又出现了另个重要的类FutureTask

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
  if (task == null)
         throw new NullPointerException();
     return new RunnableAdapter<T>(task, result);
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

runnable被适配成了Callable是通过RunnableAdapter–实现了callable,引用了Runnable!!!!

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

上面三段代码用如下FutureTask类图表示:
FutureTask类图
从这张类图中我们可以猜测得出(稍后会验证):
FutureTask是除了实现了Future,还实现了runnable的,所以它有run方法。
FutureTask内部引用一个callable,submit(runnable)会将runnable转成callable,执行run方法,实际是借用了callable来实现的,这样一来FutureTask就有了返回值。

问题④线程submit(callable)和submit(runnable)的区别与联系
- runnable是没有返回值的不会阻塞,
- callable是有返回值的,当任务没完成就调用ft.get()会阻塞。
- 不管submit(callable)还是submit(runnable)都是提交一个任务是指是创建了一个FutureTask,被FutureTask构造函数统一适配为自己的成员callable,最终都会执行execute(futureTask)。

execute(futureTask)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {//①
        if (addWorker(command, true))//②
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {//③
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

这个要看下ThreadPoolExecutor几个跟计数有关的成员变量
线程池状态和数量
总之就是用一个int值得高3位表示线程池的运行状态,低29位表示线程池线程数量。方法是计算具体数值的,我们不进行计算机基础的说明了,分析主干。

***我们先看execute的第一部分***

if (workerCountOf(c) < corePoolSize) {//①
        if (addWorker(command, true))//②
            return;
        c = ctl.get();
    }
  • 1
  • 2
  • 3
  • 4
  • 5

①和②处的逻辑:worker的数量小于核心线程数直接addWorker(整个的execute方法是并发的,所有线程都可能同时进入)。

看下addWorker(firstTask,core),该方法有两个参数四种组合都会被调用:
firstTask: worker线程的初始任务,可以为空
core:true:将corePoolSize作为上限,false:将maximumPoolSize作为上限
addWorker方法有4种传参的方式:

入参方式Are
addWorker(firstTask, true)线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
addWorker(firstTask, false)当阻塞队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
addWorker(null, false)放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去阻塞任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
addWorker(null, true)这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行

在execute方法中就使用了前3种,结合这个核心方法进行以下分析:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //① 线程池状态
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //② 线程数量范围
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //③ addWorker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //④获取锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//⑤执行add到非线程安全容器
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//⑥启动worker
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

所以看到了addWorker失败的原因有很多:

1)线程池关闭了,addWoker失败
2)线程池没关闭,但是阻塞队列已经有线程在排队了,说明该出并发了,该线程刚要addWorker的时候,另一个线程已经addWorker完毕了,再有其他线程已经进入阻塞队列排队,然后该线程才走到这部分代码,addWorker失败
3)对列有可能是无上限的,所以worker的数量可能超出最大值,addWorker失败
4)worker的数量要++ 这不是原子操作,cas的失败,addWorker失败

我们来关注主流程addWoker(futureTask);

1) 独占锁锁住,构建一个Worker对象
2)判断线程池状态,未关闭任务不为空add到HashSet< Worker>中
3)释放锁,启动Worker。

问题①线程池的池子数据结构就是HashSet< Worker>

对ReentrantLock不熟悉的参考下Reentrant源码分析,加锁的原因主要是因为HashSet是线程不安全的,Worker就启动了。

Worker
Worker

Worker本身也是Runnable,他有个成员变量叫做firstTask(名字挺有意思) 就是通过构造方法复制的传入的Runnable就是我们上文的FutureTask,构建一个Thread也是通过构造方法构建的(Runnable只是任务,Thread是才能启动任务)。
Worker构造方法
上面的⑥就是Worker.Thread.start();所以会执行run方法
runWorker(this)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //①获取任务(自身或者从队列取出)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);//②执行之前
                Throwable thrown = null;
                try {
                    task.run();//③执行真正的任务
                }catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);//④执行之后
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);//⑤
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

整个方法我们关注①和③的逻辑:
先将Worker通过构造方法传递过来的FutureTask–Runnable构建Thread,然后A:线程启动,执行FutureTask.run()
B:执行完毕后下次再getTask()为空,执行processWorkerExit(worker,false)
C:执行完毕后下次再getTask()不为空,从阻塞队列中获取

A处逻辑进入FutureTask.run()方法

 public void run() {
     if (state != NEW ||
         !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                      null, Thread.currentThread()))
         return;
     try {
         Callable<V> c = callable;//①获取FtureTask的成员(将runnable适配成callable的)
         if (c != null && state == NEW) {
             V result;
             boolean ran;
             try {
                 result = c.call();//②调用的call方法
                 ran = true;
             } catch (Throwable ex) {
                 result = null;
                 ran = false;
                 setException(ex);
             }
             if (ran)
                 set(result);//③讲结果赋值给FtureTask的成员outcome
         }
     } finally {
         runner = null;
         int s = state;
         if (s >= INTERRUPTING)
             handlePossibleCancellationInterrupt(s);
     }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

问题③FutureTask是如何获取到结果
FutureTask也是实现了Runnable,同时有个在创建时候就赋值的成员callable。
然后调用call将结果set给FutureTask成员outcome 通过set(result)方法。
我们主要看下set方法:

 protected void set(V v) {
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
         outcome = v;//①
         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
         finishCompletion();//②
     }
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这样futureTaskrun方法执行完毕,返回runWorker。

获取完结果,方法返回runWorker(this)的①处,进行下一轮的循环此时getTask(),而此时getTask()为空。
B从阻塞队列中getTask()为空,执行processWorkerExit

如果从阻塞队列getTask()为空的时候将Worker从HashSet移除掉。
我们看到一个变量allowCoreThreadTimeOut这个参数很重要,后边会提到
线程池默认给它是false,不允许核心线程超时(什么叫做超时?)
在加上线程数小于coreSize方法有执行了一次addWorker(null,false)–见上面addWorker的四个传参的表格。firstTask(FutureTask)为空,所以它只能是从getTask()即阻塞队列BlockQueue中获取一个待执行的任务,如果队列里边没有阻塞(或者是阻塞一段时间 后边会讲!!!)
这里写图片描述
我们现在有必要总结下整个提交任务的流程:
提交任务执行流程
现在一个简易版提交时序图基本就是上图的样子,所说如此,我们知道几个比较重要的组件:
线程池的几个计数器线程总数,线程池状态
池子HashSet< Worker>
工作线程Worker,实现了Runnable,构造方法传入futureTask
任务结果获取者FutureTask,也实现了Runnable
这些对后续的流程很多帮助,至少一个正常的流程就是这个样子的。

前面提过线程池构造方法很多参数,上面的流程是在线程数量小于coreSize的时候直接new Worker(ft) add到HashSet中,执行完毕,在执行B处逻辑

我们来看下另一种情况:
任务越来越所有的Worker都在工作,或者工作完毕移除add了一个没有任务的Worker进入到HashSet,总之worker的数量达到了coreSize并且都在没有执行完自己当前的任务

***我们再看execute的第二部分***

if (isRunning(c) && workQueue.offer(command)) {//③
   int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

线程数量超过coreSize还有任务workQueue.offer(ft)
workQueue就是构造方法的阻塞队列BlockQueue可以有界的也可以是无界的,coreSize和队列的选择不再本文之内。
还记得我们的addWorker之后执行runWorker的时那个while吗?

C处的逻辑为当前Worker构造传入的task不为空,或者从阻塞对列能获取一个任务
此时的池子中的Worker与BlockQueue的交互就是getTask()如下:
从队列拉取任务
runWorker的getTask方法:
merge
这里我们看到另外一个重要的线程池构造方法的参数keepAliveTime,它收超时影响

//允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
第一个allowCoreThreadTimeOut默认为false
第二个是当阻塞队列也满了,wc > corePoolSize才会成立
//超时的从poll(timeout)等待keepAliveTime时间,不超时的take()没有就阻塞
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

也就是说默认情况未超过阻塞队列没有满,调用的workQueue.take();方法从阻塞队列获取待执行的任务,该方法是阻塞的如果条件不满足。这个你能想到什么?

线程数量没有达到coreSize的时候会new Worker(firstTask,true)
add到HashSet中,执行runWorker,firstTask得到执行
执行完毕后调用getTask()从阻塞队列获取待执行的任务,
受到allowCoreThreadTimeout=false 和阻塞队列没有满影响执行workQueue.take()导致线程阻塞挂起,线程池得以hold住进程
同时所有的核心线程得以保持在HashSet这个池子中

其实上面的这段描述已经解释清楚了问题⑥和问题⑦
再解释一个线程池的构造方法的一个参数keepAliveTime,当我们调用了方法
pool.allowCoreThreadTimeOut(true)或者阻塞队列满了的情况下,getTask会调用workQueue.pool(keepAliveTime)从阻塞队列获取一个待执行任务,这个是阻塞超时,一个真没有任务,而是有任务获取超时,此时执行的runWorker的while(first!=null || getTask()!=null)条件会退出,执行processWorkerExit,先将该worker从队列移除掉,在判断allowCoreThreadTimeOut和此时线程数量是否小于coreSize决定是否在创建一个放入到HashSet池子中,所以如果不设置allowCoreThreadTimeOut为true的话,线程池的数量最终会从高于coreSize个数量稳定在CoreSize个数量。

我们再看execute的第三部分

else if (!addWorker(command, false))
        reject(command);
  • 1
  • 2

当workQueue.offer(futureTask)的时候,执行addWorker(ft,false)
回顾之前addWorker()四个传参的方法那个表格

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

显然core为false,线程数wc直接跟maximumPoolSize比较,不会直接return false,继续执行,跟之前的一样还是往HashSet< Worker>中添加。只不过此时变化如下所示:
max

HashSet在coreSize的基础之上在添加Worker,worker先完成firstTask再从BlockQueue中取
原先coreSize个线程直接从BlockQueue getTask()
最终coreSize和多余coreSize的线程都是从BlockQueue中getTask(),逻辑如下:

if(allowCoreThreadTimeOut || wc > corePoolSize)
调用workQueue.pull(keepAliveTime) ,方法能返回(或者超时返回)执行processWorkerExit 从hashSet池子中移除,一边维持coreSize个数量的线程
else调用workQueue.take()

我们当前的就是wc > corePoolSize所以调用的是workQueue.pull(keepAliveTime)获取到或者超时从HashSet中移除,维持coreSize个数量线程。

当wc>= maxPoolSize再来一次请求的话就会执行拒绝策略,这个网上有很多就是5中内置的,自己可以定制,我们不讲了。

到这里我们简易版的线程池源码分析结束,这里仅仅是主干流程!!

有一些高级部分,跟锁和队列有关,在分析高级篇之前我们不妨先看下阻塞队列,下篇文章开始队列的分析。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/944554
推荐阅读
相关标签
  

闽ICP备14008679号