当前位置:   article > 正文

线程池ThreadPoolExecutor底层原理源码分析_线程池队列底层源码

线程池队列底层源码

今天来看看JDK下的线程池- ThreadPoolExecutor 的一些源码分析
纯手打码字硬肝1天1w+字,有一些可能笔误或者理解有误的地方欢迎大家指出。谢谢。

看完本文你可以了解到:

  1. 线程池的执行过程
  2. 为什么创建线程池最好用阻塞队列
  3. 线程池中创建线程的过程
  4. Tomcat和JDK线程池的区别
  5. JVM线程池的4种拒绝策略
  6. 线程池的淘汰策略(CAS)
  7. 线程是如何抢占任务、
  8. 线程执行过程异常处理
  9. 线程要怎么关闭(中断信号)
  10. 为什么要先unlock()再lock()?

直入主题:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
	10, 200, 30, TimeUnit.SECONDS, new LinkedBlockDeque<>(500)
	);
  • 1
  • 2
  • 3

这段代码很简单,就是new了一个线程池出来,几个参数分别是:

  1. corePoolSize(10):线程池的核心线程数,即线程池中保持活动状态的线程数。即使线程池处于空闲状态,核心线程也不会被回收。
  2. maximumPoolSize(200):线程池允许的最大线程数,包括核心线程和非核心线程。当工作队列已满并且活动线程数小于最大线程数时,线程池会创建新的线程来处理任务。
  3. keepAliveTime(30):非核心线程的空闲时间超过该值时,线程会被回收销毁,直到线程数量减少到核心线程数为止。
  4. unit(TimeUnit.SECONDS):空闲时间的单位,这里是秒。
  5. workQueuenew LinkedBlockingQueue<>(500)):用于保存等待执行的任务的阻塞队列。这里使用了一个容量为 500 的 LinkedBlockingQueue,它是一个基于链表的无界队列。

实际上我们打开源码可以看到,这个它要的是 BlockingQueue 也就是阻塞队列

Java里面的队列可以分为阻塞和非阻塞,要new线程池的话,最好传阻塞队列(不过其实你给他传个非阻塞队列也能创建出来)。后面会讲到这是为什么。

ThreadPoolExecutor的构造方法

这里有个问题:这个线程池new出来了对象,这个线程池里面,现在有几个线程呢?

答案是0。

来看这个ThreadPoolExecutor构造方法的源码:

ThreadPoolExecutor构造方法的源码

我们可以看到,它并没有什么创建线程的操作,它就做了一些基本参数的判断和赋值。

这和 Tomcat还是有点不一样的。Tomcat也有个线程池嘛。

Tomcat和JDK线程池的区别

我们也去看看源码:在Tomcat的 AbstractEndpoint 的类下,有个 createExecutor() 方法,这里面有个方法。

 new org.apache.tomcat.util.threads.ThreadPoolExecutor(基本参数);
  • 1

Tomcat的AbstractEndpoint类下的createExecutor方法

这个线程池命名和JDK的一模一样

我们点进去看看,它的实现基本和JDK一样(甚至可以说是照抄,不过高一点的版本的Tomcat已经改了,但是也大差不差的)。有个很关键的区别在于:

Tomcat在创建完线程池之后,马上就会启动一定量的线程。

在这里插入图片描述

另一个区别是 Tomcat 的线程池可以在运行时动态调整线程的数量。它可以根据当前的请求负载情况,自动增加或减少线程数量,以适应不同的并发需求。这个特性在高并发场景下非常有用。

回到JDK这边。

那到底什么时候线程池才会去创建线程呢

我们接着往下看:

线程池创建出来之后,我们可以往它里面去提交任务,去执行了。

executor.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("abc...");
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

execute 方法就会去做很多事情了。

打开源码看看:

在这里插入图片描述

可以看到它会先获取现在线程池中线程个数 workerCountOf(c) 拿它跟 核心线程数来比较

如果小于,那么它直接调用 addWorker()方法。

到这里也能回答一个问题:如果现在构造了任务2,任务1已经完成了,是先去拿线程1呢执行呢 还是去把线程2创建出来执行。

答案是创个新的。 也就是说,虽然现在是有空闲的线程,但是没到核心线程数,它就会直接去创新的。

另外 addWorker() 的两个参数蛮留个心眼:

  1. 第一个参数为 command 也就是后面它的线程的第一个任务。
  2. 第二个参数为 一个Boolean值,这里传的是 true

我们进到addWorker()方法看看:

addWorker的源码

前面半部分是做一些状态和参数的判断 那部分代码我们先不看,直接从new Worker()开始。

它会new 一个Worker对象出来,而这个构造方法中,就会通过线程工厂,去获取一个新的线程。

Worker对象构造方法

这三个参数分别是:

  1. State 状态码 默认为 -1
  2. firstTask 首个任务,就是传过来的Command
  3. thread 线程

也就是说 每个 Worker 对象 都会有一个自己的线程。而线程池真正调度的 实际上就是这个 Worker 对象

我们往上拉,可以看到Worker继承了AQS并实现了Runnable接口
在这里插入图片描述

AQS可能一下子有些朋友不知道是干嘛的。

实际上我们常见的 ReentrantLock锁 实际上就是使用AQS作为基础。在ReentrantLock中,锁的状态由AQS的state字段表示,0表示锁是可用的,非0表示锁被占用。(当然 这只是一个类似口头约定,不是硬性要求的。就是说 具体它 0表示什么 1表示什么 2表示什么 还是要看具体的代码实现的)

那回到addWorker()方法中,它会比对一大堆参数,然后调用线程的start()方法

而了解Runnable接口的朋友都知道,实现了该接口,线程调用start()方法时,是会去调用run()方法的。而Worker类是重写了该方法的:

Worker类重写的run方法
昂看的出来它的关键其实是在这个 runWorker()方法中,但是这个方法我们后面再看再说。

线程保活

我们知道线程其实执行完闲着就会等待系统回收资源,那我们肯定不能让它被回收的。要是用完就给回收消失,线程池就没有存在的意义了。所以这些空闲的线程要怎么保活呢?

这下面是一段伪代码:

在这里插入图片描述

当线程执行完自己的任务之后,它就去线程池中的阻塞队列执行poll方法。

阻塞队列的特点是这样的:如果有任务,线程来了poll了就去干活,没有任务就等着。

所以 线程就通过这种 一边等任务,一边让自己活下来的方式存活。

那现在有好几个线程是闲着的,比如 t1 t2 t3 。来了第11个任务。会给谁呢?

这个可以理解为随机。。

这个具体实现 不是本文的内容了,因为这个任务的分配,具体是阻塞队列里面的poll方法干的。它里面会有一些cas的操作。

这也是为什么要阻塞队列的原因:从JDK开发者的角度,他们的两大难题:

  1. 线程保活
  2. 任务分配

都伴着阻塞队列的特性一起解决了。非常聪明。

感叹一下开发者们的智慧,也是我们分析源码的意义,学习前人的开发经验和模式,以便更好地理解和应用现有的软件框架和技术。

我们再来简单梳理一下流程:

  1. 一个线程池创建之后,它不会马上创建线程。每提交一个任务进来,它就会调用 addWorder方法 去创建一个Worker对象,并赋予它线程。
  2. 线程执行完任务之后,就会去阻塞队列里等着。
  3. 等到 第 核心线程数+1 个 和它之后的所有任务(我们创建的这个核心线程数为10,所以可以理解成,第11个和它之后的任务)提交上来时,它就会放入阻塞队列中,然后有空闲的线程 就去干活

有的朋友可能就会问了:为什么不直接分配给线程去干呢。

因为实现起来麻烦,通过这种类似监听一样的方式,利用阻塞队列的机制,一边又能实现线程的保活,一边还能实现任务的直接分配,非常优雅。

好继续。

队列满了会发生什么?

我们在创建的时候,会分配阻塞队列的容量。比如我们上面的代码是500

假如现在500满了,也就是队列放不下了,我们的10个线程也在忙。

这时候又来任务,它怎么处理呢?

还是配合源码来看:

在这里插入图片描述

workQueue.offer(command) 尝试放进队列,但是放不进了。那它就走进下面的else

!addWorker(command, false)
  • 1

它又去addWorker了。

注意:一定是队列满了,它才会去创建核心线程数以外的线程。

从本质上来讲。这是不公平的实现。为什么?

你想想,队列是满的,也就是说它起码是第501个任务。

而第11个任务都还没开始,它第501个任务就已经在弄了。

确实是设计上的一些缺陷。但是没有办法。

假设接着来任务,这种一直创建线程可以持续到什么时候呢?

这时候就要看我们设置的最大线程数了。我们上面的代码是给了 200

那除去核心线程数的10个,也就是说 最多最多我们还能再创建190个临时线程。

那要是现在创到了200个线程了,还接着来任务 线程池会怎么做。

接着看源码:

在这里插入图片描述

前面提到过addWorker的参数,除了任务,还有一个Boolean值

它只用在了一个地方,就这里。

在这里插入图片描述

前面我只截了后半部分的代码,也就是new Worker()还有 线程.start() 的过程

前半部分如上图所示。

wc是现在线程池中的线程数。当它大于 maximumPoolSize 也就是最大线程数的时候,它直接

return false;

后面的 线程.start就不会执行了。

并且它会拒绝你提交的任务。

在这里插入图片描述

进到这个rejectedExecution

在这里插入图片描述

我们可以看到它是有4个实现,这就对应着线程池的4种拒绝策略。

JVM线程池的4种拒绝策略

我们还是看源码:

1、第一种 这个是默认的策略,就是直接抛异常。

默认策略,抛异常

2、 第二种 主线程执行该任务

​ 前面的所有方法的调用啥的,其实都是主线程在调用的。而这个r.run() 实际上就是让调用的那个线程去执行这个任务。说白了就是主线程去做这个任务。这也是一种线程池的拒绝策略,因为是由我们自己的主线程去干这个活了,没有让线程池里面的线程做。

线程池拒绝策略

3、 第三种是 DiscardOldestPolicy

从名字也能读出来 丢弃最旧的。

这种策略是把阻塞队列里面最早来的 还没有线程处理的,直接淘汰掉,然后新的放到队列的末尾。

在这里插入图片描述

4、 第四种是最简单的 Discard

直接丢弃 啥都不做

在这里插入图片描述

一般我们就是前两种用的比较多,要么抛异常,要么就让调用者线程去执行。

临时线程完成了分配的任务后,干嘛去?

前面我们说到了,临时在任务比较多,核心线程不够用的情况,我们会去创建新的线程。其实这个问题不用想也能知道答案 耗费了一些资源创建的线程,怎么能让他说散了就散了。跟核心线程一样,它们也是回去阻塞队列那边盯着,有任务就接着干。

那没有任务的话嘞,或者说 任务已经做完的情况呢?

我们创建的核心线程数是10嘛。那现在有10多个线程了,在没任务的情况下,肯定是不能让它们继续在这边呆着了。那我们还淘汰掉哪几个线程呢?一定是后面来的那几个线程吗?

其实并不是的。反正它就是根据去掉两个。因为不管是临时线程还是核心线程,不都是线程。都是addWorder() 出来的线程。是一样的。

也就是说,假设 t1 - t10 是原来的核心线程,然后任务比较多的时候,又创建了 t11、t12、t13。那现在这13个线程要去掉三个,它们本质上是一样的,所以去哪三个无所谓,留下的10个,就是核心线程。

它又是怎么淘汰的呢?那么继续往下

线程池的淘汰策略(CAS)

​ 前面我们说到Worker类是实现了Runnable的接口。一个类实现了Runnable接口并创建了该类的实例后,需要将实例传递给Thread的构造方法,等调用Thread对象的start方法,它会去调用Runnable对象的run方法。

​ 线程会调用实现了Runnable接口的类的run方法来执行任务的逻辑。可以将任务的逻辑代码放在run方法中,线程启动后会自动调用该方法,执行任务。不是直接调用run方法,而是通过start方法启动线程,让系统自动调度和执行run方法。

我们前面也提到了 Worder类中的run()方法 是直接调用了 runWorker()方法。所以关键都在这个方法里面。

那么我们直接来看看它的源码:

runWorker源码(1)

代码第二行的这个firstTask很眼熟吧,就是我们给它的第一个任务,我前面提到过。它一定是有任务 才会创建,这个第一个任务它执行完了,才会到后面。

再来梳理一下:创建这个worker对象绝对是有任务,有任务,这个fistTask就绝对不为空 那么这个 while (task != null ) (第二个判断先不看)就一定会进去。 可以理解吧。

runWorker源码(2)

然后就是咔咔一顿操作。就到了这个task.run() 这是什么会知道吗?

这个就是我们 execute 的 run 方法了 也就是我们丢进去的任务。

在这里插入图片描述

可以看到 task.run() 的上面两行和后面几行 有个

beforeExecute(wt, task) afterExecute(task, thrown)

这是切面的一些拓展。就是你想在这个任务前后做点什么 可以去写它们俩。

我们注意到 这个在最后的finally中,task它会变为null。

然后外层有个while循环,就要看第二个条件了。

在这里插入图片描述

while (task != null || (task = getTask()) != null) 
  • 1

就是又去重新获取任务,如果获取到了,那就接着干活。以此循环。

可以看到 判断条件为:

task = getTask() != null
  • 1

那是不是就意味着,当getTask()的返回值为null的时候,这个线程就循环结束 然后执行后面的一些 finally块的代码

这个线程也就消亡了对吧。

那我们是不是就可以通过控制这个null的返回时机,来控制线程的消亡。

比如:现在有15个线程,它们做完任务就在这边循环等待,我们想去掉多余的5个线程,那直接返5个null就完事了。

而我们点进来看这个getTask()方法 我直接把源码贴进来:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

注意看第二行:for (; ; ) 它是被一个for循环包裹着的

现在先来分析一个情况,现在我有10多个线程,假如是15个。我们想象一个极端的情况,假如它们同时完成了各自的任务,然后一起过来执行getTask()方法。

可以看到它会获取当前线程数 那这15个线程得到的结果就都为15。

在这里插入图片描述

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  • 1

allowCoreThreadTimeOut大家可以先当它一直为false ,我们只要看后面的

所以timed = 当前线程数 是否大于 该线程池核心线程数 15 > 10 肯定是true现在。

看方法的第一行,timedOut 先是 false

maximumPoolSize是最大线程数。也就是我们开始设定的200

那么 这个if 它就进不去了。(这图通俗易懂有没有)

在这里插入图片描述

它就直接执行后面的try块

在这里插入图片描述

阻塞队列的这两个方法的差别是

  • workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 是一个具有超时特性的方法。它将等待指定的时间(keepAliveTime)来获取元素,如果超过指定时间而队列仍为空,则返回null。
  • workQueue.take() 是一个阻塞方法,如果队列为空,它将一直等待,直到有元素可用为止。

timedtrue嘛现在 所以它会执行 poll 方法。

而我们这15个线程都在等着这个poll方法。假如现在一直是没任务,那么过了这个超时时间,它们都会返回一个null,然后就会执行

在这里插入图片描述

而这个timedOut改变后,它就进入下一次循环了。

而随着这个 timedOut改变 这个if 就会满足了。 timed && timedOut

在这里插入图片描述

那么就会执行它:

compareAndDecrementWorkerCount(c)
  • 1

点进这个compareAndDecrementWorkerCount()方法 可以看到 compareAndSet 都不用再看,从名字也能知道它是怎么一回事。CAS操作。
在这里插入图片描述

也就是说,这15个线程 都会一起来执行这个CAS操作。

CAS有些朋友可能不知道 简单提一下:
就是它们会去竞争,是用的乐观锁的那种对比值的想法。让这个数量去-1。而且只会有一个线程能成功。

成功的返回null。而这个 Return 到哪里还记得吗?

runWorker()方法中调用的。
在这里插入图片描述

而现在返回了null,那我不就等于退出循环,然后执行后面的代码,这个线程也就消失了。剩下的14个线程就continue 接着循环,然后接着取wc 也就是当前线程池中的线程数14,然后再判断,然后还是会进if 然后还是去CAS。没成功的 接着循环,直到wc=10;就不再进if了。而

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  • 1

timed 由于 wc不再大于最大线程数。那它就会为 false

Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
  • 1
  • 2
  • 3

然后它就去执行 阻塞队列的workQueue的take方法 无限等待。

总结一下:

当阻塞队列中没任务了,线程就一直循环,每次循环都是一次CAS操作,销毁掉一个线程,直到剩下10个 为核心线程。然后去阻塞队列中无限阻塞等待新任务。也就能解释线程是怎么保活的。

优雅 真的优雅。用这么短短不到30行的代码,就实现了这么巧妙的设计。

线程执行过程异常处理

如果线程在执行过程中出异常了。比如报了个NPE

那请问这个线程是继续活着还是消亡呢。我们继续看源码

还是一样 我把runWorker部分源码直接贴过来(删掉了一些暂时无用的行,后面提到了相关的点 我会再附源码):

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            try {
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

可以看到刚进来的第四行

boolean completedAbruptly = true; 
  • 1

然后进入while循环。而上面我们刚讲的, getTask()的结果 直接决定这个while是否结束。

所以它有两种情况会结束:

  • 一种是getTask()返回null 正常结束
  • 另一种是 异常结束

大家看倒3行的 completedAbruptly = false;

它的位置很巧妙,如果是正常退出,它就会将completedAbruptly 改为 false

如果报错了,异常就会被catch捕获然后finally对吧。 completedAbruptly = false; 就不会执行到,直接执行倒二行的 finally

也就是说 最后一行的 finally块的代码可以理解为:

//如果报错,出异常
processWorkerExit(w, true);
//如果正常退出
processWorkerExit(w, false);
  • 1
  • 2
  • 3
  • 4

也就是这个线程在结束之前,做的最后一个事情。

那它具体做了什么事,我们接着看源码
在这里插入图片描述

我们主要看异常的情况,也就是说暂时就当它 completedAbruptly = ture;

那代入到代码中,它是不是就等于不会走 里面那层if块。

而在最后 你会看到,这段代码它执行了addWorder()

所以:当任务出异常了,它退出前,它会去再补一个线程回线程池。

关闭线程池的流程

大家仔细想想,其实除了正常退出和异常退出销毁掉线程。还有一种也会让线程关闭,就是线程池关闭的时候嘛。线程池都关了,里面的线程肯定也要关掉。

在这里插入图片描述
两个都是关闭线程池的方法

  • shutdown()是比较慢点的,它会把阻塞队列中的任务执行完了,再关闭
  • shutdownNow()是不管阻塞队列有没有任务,直接关闭

这里我先问个问题,线程要怎么关闭?所谓的kill掉线程嘛。

在Thread里面确实有个 stop() 方法。

但是不建议这么用。实际上Java已经不推荐使用了,标记了@Deprecated。主要是因为我们不知道这个线程执行到哪里的,如果直接杀掉,是很危险的。而且还有一点,stop是会释放线程占用的synchronized锁,但是不会自动释放ReentrantLock锁,这也是它危险的一个点。

现在我们比较优雅、比较常用的方式是 中断

这两个代码的是怎么实现完成任务或不管有没有任务的?

还有正在执行任务的线程是继续执行任务,还是直接关闭?

我们都可以在源码中找到答案(这次是没删减 纯源码):

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    	checkShutdownAccess();
    	advanceRunState(SHUTDOWN);
    	interruptIdleWorkers();
		onShutdown(); 
	} finally {
		mainLock.unlock();
    }
    tryTerminate();
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

可以看到两个方法基本一样。可以看到其中一个区别是 advanceRunState状态的区别。 一个是 SHUTDOWN 一个是 STOP

线程池中的状态常量

而这两个对应的值分别为 0 和 1

而在设置完这个状态后,它们都会去执行 发中断信号操作 的方法

interruptIdleWorkers()
  • 1

这个中断怎么理解,它其实只是发了个中断的信号。给你说 我们停工了。

不是直接让这个线程消失了或者别干了。

我们回头看runWorker的 while循环
在这里插入图片描述

它在这里 会判断你是不是执行了中断。

注意它的位置,它是在 task.run()之前的。

前面我们说过我们执行任务其实就是在这个 task.run()里面执行的嘛。也就是说 如果现在有线程在执行任务,你发了这个中断信号,其实并不会直接让已经在干活的线程停止。除非你这个任务执行的本身里面,有做中断信号的检查。比如你这个run任务里面,有个

 if(是否中断){
 	return;
 }
  • 1
  • 2
  • 3

不然,它都是会执行完的。

只有等它这次正常执行完了,下次再进到下一次while循环,取下一个任务也就是getTask() 后,才会去关注一下是否状态是否中断

我们再进到 getTask()方法中:
getTask源码

比如现在是 shutdown()的,线程池的状态会为SHUTDOWN 也就是0,0比STOP的1小嘛,所以它会继续去判断你这个队列 是不是空的。

如果是空的,那就执行decrementWorkerCount(); 并且return null;

decrementWorkerCount();
return null;
  • 1
  • 2

而如果你现在是 shutdownNow() 那它的状态会为STOP

那么这个if 它就直接成立了。因为 1>=1 嘛。 所以,后面的workQueue是不是Empty()它压根不管,直接return null

这也就回答了两个问题:shutdown和shutdownNow是如何实现它们的特性的? 同时还能明白 在执行的线程是停下手头的事直接断掉,还是等做完再断掉?

shutdown() 它会把阻塞队列中的任务执行完了,再关闭。

shutdownNow() 则会等现在手头上的事一做完,就直接结束。

一行代码就实现了两个状态的区分,优雅我已经说不过来了。

阻塞状态的线程要怎么办?

之前我们了解到,如果线程没任务做,那它就会无限阻塞。workQueue.take()
在这里插入图片描述

那现在阻塞了,压根不动,那谈什么循环去判断信息,怎么去销毁线程什么的。

实际上 当一个线程处于阻塞状态(如调用了 Thread.sleep()Object.wait()BlockingQueue.take() 等方法)或者被阻塞(如调用了 synchronized 块)时,如果另一个线程向其发送中断信号,即调用了 Thread.interrupt() 方法,被中断的线程将会被唤醒,并且抛出 InterruptedException 异常。

我们这些线程现在是阻塞状态对吧,然后你发了这个中断信号,它就会被catch块捕获,然后 timedOut 就 = false;

然后进入下一次循环。剩下的逻辑就和正常退出是一样的了。

而这个中断信号是在哪里发的,其实就是在showdownshowdownNow两个方法设置状态后的 interruptIdleWorkers()方法中做的。

两个方法的实现是略有差别的,但是都大同小异。

在这里插入图片描述

无非就是遍历了所有Worker 每个worker里面都有个线程嘛 它就是每个线程都调用一下 t.interrupt() 就是发中断信号嘛。

总结一下:

就是线程只有两种状态嘛:一种是在执行任务,一种是在队列阻塞等待任务。

在执行任务的,执行完手头上的任务后,再去拿任务,会根据状态 看是做完别的任务走 还是直接走。

而在阻塞等待的,其实等于是被中断信号唤醒了,然后也是去走一边拿任务的路,然后走到那个位置做判断,然后退出销毁。

另外,有个很有意思的点不知道大家有没有注意到:

为什么要先unlock()再lock()?

在这里插入图片描述

runWorker()这个方法中,它上来就unlock(),这是为什么。这个其实跟这个线程中断有点关系。

不知道大家对Worker()这个构造方法还有没有印象:
在这里插入图片描述

前面我们提到过Worker类是继承 AQS的。并且在这个构造方法中把State设为了 -1

0表示目前无锁或者说是还没上锁,1表示有锁。-1就是最开始的状态,可以理解成 它线程都还没开始运行。

对于一个线程而言,如果都还没运行它,那还谈什么中断 对吧。

实际上 这个worker从被new出来,构造方法里面去setState、赋予线程,到真正的start线程 中间跨度是非常大的(看图)
在这里插入图片描述

所以是不是有个可能,比如现在worker对象new出来了,线程还没启动呢,线程池就要关闭了。

而它现在都没start,我们还有没有必要像刚刚那些别的一样,去发送一些中断的信号呢?

其实是没有这个必要的。

实际上我们看线程池的 shotdownNow()方法,它在设置状态为 STOP 之后,会去调用中断线程,在这个方法中,它就会去判断workerstate是否大于等于0
在这里插入图片描述

我们跟着源码来一遍:
在这里插入图片描述

worker调用unlock 其实是调用 release(1)方法
在这里插入图片描述

release()方法中调用 tryRelease方法
在这里插入图片描述

而这个tryRelease方法 是有被重写的
在这里插入图片描述
也就是说 它调用的 release(1) 实际上调用到的是自己重写的 tryRelease方法 所以说白了它unlock()就是调用自己重写的这个方法。
在这里插入图片描述

而你看这个方法 它不论你传什么参数过来,state都为0。

所以它第一次调用unlock() 实际上 state 是从 -1 变成 0

要知道的一点是:runWorker方法如果运行了,就说明这个线程已经开始运行了。因为只有线程 start() 之后,它才会去调用实现类的run方法 从而才调用到worker的runWorker方法。这个得明白 不然这一块不好理解。

所以它只要值不是-1,就说明这个线程是启动了的。

runWorker方法中的while里面的 lock 和 unlock 实际上就是 0改为1 1改为0的过程。也就是加锁解锁的过程了。

再说一遍

线程start之后,进来的第一次unlock,state是从-1到0 表示 我这个线程开始运行了。

后面的所有加锁解锁 就是正常的锁逻辑 0->1,1->0的过程。在这里面 可以保证并发的安全的。

所以去判断 >=0了 我们才去中断它。<0也就是 = -1的时候 说明它都没运行,那我们去中断它干嘛呢。

不过

其实去掉-1这个状态,也可以。。就是说 不管它是 -1 0 还是1。

都给它发中断信号,其实是不会报错的,也就是没问题的。

但是按照逻辑来说没有启动的线程 就不用去中断它,虽然好像直接中断也没什么问题。只能说可能有什么性能的影响吧。

结尾

到这里也就差不多了,如果有什么不对的地方,欢迎评论指正,谢谢。

如果这篇文章能对你有些帮助的话,帮帮点点赞哈哈哈哈。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/925243
推荐阅读
相关标签
  

闽ICP备14008679号