赞
踩
一、ThreadPoolExecutor
线程池在工作中使用的频率很高,因此,有必要好好了解其原理。这篇博客将会告诉大家,线程池是怎么实现线程的复用的,以及创建线程池的几个入参的作用等等。那么,话不多说,直接进入正题。
先看代码,如下图所示:
这是我创建的的一个线程池,并且调用了ThreadPoolExecutor#execute()方法,入参为Runnable对象,就可以启动线程,处理我们的业务。当然,ThreadPoolExecutor还有另外一个方法,即ThreadPoolExecutor#submit()方法,也可以传入Runnable对象。但是实际上,submit()方法最终也会调用到execute()方法。因此,这里以execute()为例,作为入口来讲解。在此之前,先看看ThreadPoolExecutor的构造方法,做了些什么,如下图所示:
可以看出仅仅只是给属性赋值而已,并没有做什么其他的操作,对这些属性有些印象即可,下文会用到。那就看看最核心的ThreadPoolExecutor#execute()方法,如下图所示:
首先是校验,保证传入的Runnable对象不能为空,然后调用ThreadPoolExecutor#workerCountOf()方法,拿到当前线程池里面线程的个数,判断是否小于corePoolSize,这个corePoolSize属性就是在构造方法中设置的核心线程数,如果小于则调用ThreadPoolExecutor#addWorker()方法,传入Runnable对象和true;如果线程池中当前线程数大于核心线程数,则先判断线程池此时的状态是否是Running,这里说下,线程池和线程一样,也是有状态的,状态如下图所示:
如果状态是RUNNING的话,则调用workQueue.offer()方法,workQueue是我们传入的阻塞队列,将Runnable方法放入阻塞队列中,如果使用的是有界队列,则可能放入队列失败,如果线程池状态为RUNNING并且放入阻塞队列成功,则会double check,即:再次判断线程池此时的状态是否为RUNNING,如果不是,则从阻塞队列中移除该Runnable对象,并调用ThreadPoolExecutor#reject()方法,执行拒绝策略的逻辑,反之再判断线程数是否为0,如果是,则调用ThreadPoolExecutor#addWorker()方法,传入null和false;如果不满足上述两个if逻辑,则直接调用ThreadPoolExecutor#addWorker()方法,传入Runnable对象和false,如果addWorker()方法返回的是false,表示添加失败,则调用ThreadPoolExecutor#reject()方法,执行拒绝策略的逻辑。这几个逻辑中都调用了ThreadPoolExecutor#addWorker()方法,可以看出,该方法很重要,点进去看看,如下图所示:
在for循环中,首先判断此时线程池的状态:如果大于SHUTDOWN状态,或者大于等于SHUTDOWN状态且传入的Runnable,或者队列不为空,都返回false,表示添加Runnable对象失败。再进入另一个for循环中,拿到当前线程数,判断当前线程数是否大于最大容量CAPACITY,即 (1 << 29) - 1 = 536870911,如果小于的话,再判断,传入的参数是否是true,如果是true,则比较当前线程数是否大于核心线程数,是则返回false,表示添加Runnable对象失败;如果是false则比较当前线程数是否大于最大线程数,是则返回false,也表示添加Runnable对象失败。如果不满足上述条件,则进入下一个if逻辑。也就是ThreadPoolExecutor#compareAndIncrementWorkerCount()方法,通过CAS设置当前线程数+1,如果成功,则跳出retry,走添加Runnable的逻辑;如果CSA失败,再次算出当前线程数和前面的算出的线程数是否一致,不一致则继续循环。假设CAS成功了,就可以看下面的逻辑了,主要是如何添加Runnable对象的,如下图所示:
在上面的图中可以看出,创建了一个Worker对象,调用它的有参构造,传入了Runnable对象,看看它的构造,如下图所示:
在有参构造中也就是把传入的Runnable对象赋值给了firstTask属性,并通过线程工厂创建了了一个新的线程,并传入了this,即Worker对象自己,实际上Worker是ThreadPoolExecutor的内部类,也是Runnable接口的实现类,如下图所示:
再回到ThreadPoolExecutor#addWorker()方法的创建Worker对象,还是先检查此时线程池的状态:如果线程池的状态小于SHUTDOWN,或者状态是SHUTDOWN并且传入的Runnable状态为空,则将Worker对象那个放入worker中(HashSet类型),并将workerAdded设置为true,这是最重要返回的结果,表示添加Runnable对象是否成功。然后拿到了Worker对象的thread属性后,调用了t.start()方法,启动了线程,实际上就是启动调用的了Worker#run()方法,并返回了true,表示添加Runnable对象成功,看看Worker#run()方法,如下图所示:
在Worker#run()方法中,又调用了ThreadPoolExecutor#runWorker()方法,传入this,即Worker对象本身。再看ThreadPoolExecutor#runWorker()方法,如下图所示:
首先是拿到Worker对象的firstTask,这个属性实际上就是我们传入的Runnable,并把该属性赋值给task,并把Worker对象的firstTask设置为空,避免后面重复调用该属性的run()方法。当然也有可能为空,接着进入while循环,有两个条件:task不为空或者调用ThreadPoolExecutor#getTask()方法。由于我们传入的了Runnable对象,因此task不为空,最后就会调用task.run()方法,执行我们在run()方法中写的的业务逻辑,在finally中将task置为空。由于是while循环,会再次进入判断while中的条件,这次task为空,则调用ThreadPoolExecutor#getTask()方法,进到这个方法中看看,如下图所示:
在该方法中,有几种情况会返回空:
① 此刻线程池的状态是大于等于STOP,或者状态大于等于SHUTDOWN且队阻塞队列为空,则返回null;
② 此时线程池的线程数大于最大线程数,且线程数大于1阻塞队列为空,再通过CAS设置当前线程数-1成功,则返回null,否则继续for循环;
③ time和timedOut均为true,且线程数大于1阻塞队列为空,再通过CAS设置当前线程数-1成功,则返回null,否则for循环继续。
如果不满足上述三种情况,则继续往下走:当此时的线程数(Worker数量)大于核心线程数或者设置allowCoreThreadTimeOut属性为true,则局部变量timed为true。到三元运算这边,会判断timed的值,如果为true,则调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),而keepAliveTime也是我们设置的,非核心线程线程的最大存活时间,拿到的Runnable r,可能为空,也可能不为空,因为设置了超时时间为keepAliveTime;如果为false,则调用 workQueue.take(),一直阻塞在这里,直到从队列可以拿到Runnable对象,程序才继续往下走。如果得到的结果 r 如果不为空则直接返回,并在while循环中被显式的调用,即r.run()方法。不为空则继续循环,不管r为不为空,都会设置timedOut为true。timedOut为true的话可能会满足③的条件。
总结一下:排除线程池是非RUNNING状态的话,
① 如果没有设置 allowCoreThreadTimeOut,则该属性默认为false,则在ThreadPoolExecutor#getTask()方法中,如果此时线程池中worker的数量小于等于核心线程数的数量,则程序会阻塞在workQueue.take()这里,知道拿到Runnable对象,才会返回,并在while中执行Runnable#run()方法,执行完 ,再次进入ThreadPoolExecutor#getTask()方法中阻塞,直到有新的Runnable对象被放入阻塞队列中为止;
② 如果如果此时线程池中worker的数量大于核心线程数的数量(小于等于worker的数量是否大于最大线程的数量),则会调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果可以拿到Runnable则后续处理逻辑同①,如果到了超时时间,还是没有从workQueue中拿到Runnable对象,timedOut = true,再次循环,在第二次循环的时候 ,就满足了timed && timedOut的条件,如果此时workQueue为空,则会通过CAS设置线程数-去1,并返回null;
③ worker的数量大于最大线程的数量的情况,就直接走拒绝策略的逻辑了。
回到while循环中,如果task为空且调用ThreadPoolExecutor#getTask()方法得到的结果也为空的话,则会跳出while循环,走下面的逻辑,具体如下:
看看ThreadPoolExecutor#processWorkerExit()方法,代码如下图所示:
completedTasks,即完成的任务数加上再通过worker完成的任务数,这就是个统计而已,然后将worker从workers中移除。再就是判断线程池状态,如果线程池此时的状态小于等于STOP,进入if逻辑。判断completedAbruptly的值,它表示"突然完成",程序只要进入过上文while循环中,则completedAbruptly的值就会被赋值为false,则满足if逻辑,在这个里面,会判断是否将allowCoreThreadTimeOut设置成了true,如果是,则min = 0;反之,min = corePoolSize。如果min = 0且阻塞队列不为空,则min会重新被赋值为1。判断此时线程数是否大于等于min,是则直接返回,反之调用ThreadPoolExecutor#addWorker()方法 ,传入null和false,创建非核心线程。到这里,就执行完了ThreadPoolExecutor#runWorker()方法,woker线程就行完毕了,线程被回收。
最后做一个总结:
① 使用ThreadPoolExecutor,调用ThreadPoolExecutor#execute()方法,传入一个Runnable对象,如果如果此时线程池的线程数量小于核心线程池的数量,则调用addWorker()方法,传入的是true;
② 如果大于等于核心线程数,则将Runnable对象放入阻塞队列中;
③ 如果设置的是有界队列,且达到了有界阻塞队列的最大值,则放入有界阻塞队列失败(如果设置的无界队列,则永远都不会进入到③中,如果放入队列的任务非常之多,则会导致OOM)。则会创建非核心线程来处理;核心线程和非核心线程的区别就在于,调用addWorker()方法时,会依据传入的true还是false,来判断此时的线程池的线程数是否是大于核心线程数还是最大线程数,如果大于,则直接返回false,表示添加Runnable对象失败,调用reject()方法,执行拒绝策略的逻辑。
④ 如果设置的是要有界队列,且核心线程执行不过来 ,则最终会创建非核心线程,当过了高峰期,核心线程够用了,则非核心线程再经过了 keepAliveTime长的时间,还没有从队列中拿到Runnable对象,就会跳出Worker#run()方法,非核心线程被回收。而非核心线程会阻塞在 ThreadPoolExecutor#getTask()方法中,直到从阻塞队列中拿到Runnable对象,执行Runnable对象,并再次进入getTask()中陷入阻塞,而不会被回收,除非是⑤这种情况。
⑤ 如果设置了allowCoreThreadTimeOut为true,如果 task很少,则核心线程也会慢慢被回收掉。
⑥ 刚创建线程池,此时是没线程(Worker)的,除非调用ThreadPoolExecutor#execute()方法或者ThreadPoolExecutor#submit()方法,才会创建worker对象。对于有些情况下,可以做预热,比如Tomcat定义的线程池,它会自己循环调用ThreadPoolExecutor#execute()方法,创建线程,后续有请求过来,就可以直接用线程池里面现有的线程进行处理,而不需要先创建。
⑦ 在某些情况下 ,会碰到父子线程,ThreadLocal数据失效的问题。也就是说,在创建了一个线程并将该线程对象交给了线程池管理,并在该线程中设置了ThreadLocal的值,然后在该线程中又创建了一个线程,这种场景是有可能出现的。此时问题就来了,在该子线程中,是拿不到在父线程中设置的ThreadLocal的值的。这该怎么解决呢?实际上 Spring给我们提供了解决方案,想必很多小伙伴在使用线程的时候,并不是使用的JDK提供的原生的线程池,而是使用的Spring提供的线程池,即ThreadPoolTaskExecutor。只需要给ThreadPoolTaskExecutor设置一个属性即可解决上述问题,调用ThreadPoolTaskExecutor#setTaskDecorator()方法,传入一个TaskDecorator对象,该接口只有一个方法,需要实现该方法,在该方法中处理即可,该方法返回的也是一个Runnable对象,如下图所示:
在ThreadPoolTaskExecutor#initializeExecutor()方法中,就会用到传入的TaskDecorator对象,如下图所示:
由上图可知,实际上Spring提供的ThreadPoolTaskExecutor是对ThreadPoolExecutor的包装真正干活的是ThreadPoolTaskExecutor的threadPoolExecutor属性,在ThreadPoolTaskExecutor#initializeExecutor()方法中,会先判断taskDecorator属性是否为空,如果我们设置了,就会走if逻辑,然后就是创建ThreadPoolExecutor的实现类,并最终赋值给threadPoolExecutor属性,然后重写了ThreadPoolExecutor的execute()方法,在execute()方法中,会先调用taskDecorator.decorate()方法,传入command对象(即一个Runnable对象),在这个的方法中,我们就可以做操作 ,因为返回的是一个Runnable对象,因此我们可以拿到这个入参,而这个入参,就是我们传入的Runnable对象 ,因此我么可以线调用ThreadLocal#get()方法,拿到前面设置的值,然后再创建一个新的Runnable对象,再这个Runnable对象的run()方法中,调用ThreadLocal#set()方法,将拿到的值进行设置,并调用我们传入的Runnable对象的run方法,最后返回新创建的Runnable对象。最终ThreadPoolExecutor的实现类的execute最终会调用它父类,即ThreadPoolExecutor#execute()方法,传入的就是新创建的Runnable对象的run()方法,这样 就可以再子线程中获取到父线程设置的ThreadLocal的值。代码如下:
以上,就是我对线程池的浅显认识,如有错误,欢迎批评指正!
二、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,可以实现任务的定时调度功能,想必也是一个很常用的工具类,可以简单聊聊。先看看使用,如下图所示:
实现调度的方法主要有上图中的三个,在将之前,先看看ScheduledThreadPoolExecutor的创建,调用有参构造,传入1,再看看它的构造方法都干了些什么,如下图所示;
主要是有四个构造方法,我用的是比较简单的一个,传入的是核心线程的数量,再调用父类的构造,即ThreadPoolExecutor的构造方法,参数为:核心线程数(我们设置的1),最大线线程数是Integer.MAX_VALUE,最大存活时间为0,时间是TimeUnit.NANOSECONDS,阻塞队列为DelayedWorkQueue对象。这个的DelayedWorkQueue对象,看名字,像是跟任务的延期有关,亦或是实现定时调度的关键?后面再验证我们的猜想。
接着再看核心的任务调度方法,先看看 ScheduledThreadPoolExecutor#scheduleWithFixedDelay()方法,如下图所示:
先是入参commane非空校验,delay参数校验,再创建ScheduledFutureTask对象,再看ScheduledFutureTask的构造方法前,先看看 ScheduledThreadPoolExecutor#triggerTime()方法,如下图所示:
可以看出,得到的结果是当前时间+传入的时间(5s)的纳秒值,再看的构造方法,如下图所示:
入参有四个:分别是:
① r:传入的Runnable对象;
② result:为null
③ ns:当前时间+5秒对应的纳秒值,赋值给 time成员属性
④ period:10s的纳秒值,赋值给period成员属性
再调用ScheduledFutureTask父类的构造,入下图所示:
做了两个事:
① 调用Executors.callable()方法,传入runnable, result,得到的是RunnableAdapter对象,赋值给callable属性;
② 将state的成员属性设置为FutureTask.NEW。
RunnableAdapter类用到了适配器设计模式,实现了Callable接口,具体如下图所示:
调用RunnableAdapter#call()方法的时候,最终就睡调用我们传入的Runnable#run方法。到这里,ScheduledFutureTask的构造方法就讲完了。回到ScheduledThreadPoolExecutor#scheduleWithFixedDelay()方法中,如下图所示:
调用ScheduledThreadPoolExecutor#decorateTask()方法,传入创建好的command和sft属性。如下图所示:
在该方法中,实际上什么也没做,就只是返回了传入的task对象而已,回到ScheduledThreadPoolExecutor#scheduleWithFixedDelay()方法中,把decorateTask()方法返回的对象 t,又赋值给 sft的outerTask 属性,而t对象就是stf,实际上sft中的outerTask属性,持有它本身。再看ScheduledThreadPoolExecutor#delayedExecute()方法,如下图所示:
在该方法中,首先判断线程池的状态,一般线程池只要是RUNNING状态,就不会进入if在代码块中,那就会进入else代码块中,将ScheduledFutureTask对象放入队列中,再次判断线程池的状态,还是会进入else,调用ScheduledThreadPoolExecutor#ensurePrestart()方法,如下图所示:
在这个方法中,核心就是调用addWorker()方法,,创建Worker对象,用于处理任务。由于在前面,已经往阻塞队列中放入了task对象,即ScheduledFutureTask,因此在Worker#run方法中,会调用ThreadPoolExecutor#runWorker()方法,在该方法中,最终会调用ThreadPoolExecutor#getTask()方法,从阻塞队列中取任务,而取得的任务,就是ScheduledFutureTask对象,并调用ScheduledFutureTask#run()方法,看看这个方法,如下图所示:
看看ScheduledFutureTask#isPeriodic()方法返回了什么,如下图所示:
由于period是赋值过了,因此,isPeriodic()方法返回的结果是true。假设此时线程池的状态是没问题的的,则进入到 下面的else if逻辑中,执行ScheduledFutureTask父类的runAndReset()方法,即FutureTask#runAndReset()方法,如下图所示:
前面构建ScheduledFutureTask的时候,在父类的构造中初始化了state的状态,就是FutureTask.NEW,因此if的第一个条件不满足,再看第二个条件,通过CAS设置runner属性为当前线程,假设设置失败,才会进入if代码块中,返回false;如果设置成功,则进入try代码块中,首先获取callable,这个callbale实际上就是前面说到的RunnableAdapter对象,它就是对Callable的包装,实际调用的是它的Runnable task属性的方法,而这个task就是我们传入Runnable对象,最终执行到我们在run()方法中的逻辑,并将ran设个属性设置为true,最终FutureTask#runAndReset()方法返回的结果就是true。再次回到ScheduledFutureTask#run()方法,就会调用到ScheduledFutureTask#setNextRunTime()方法,看名字就能猜到它是做啥的,还是点进去看看,如下图所示:
就是计算下次程序要运行的时间,time是上一次运行的时间,即程序启动的时间+5s,而period则是10s,因此最新的时间就是程序启动的时间+15s,再下次就是程序启动时间25s,以此类推…执行完该方法后,再就是执行ScheduledFutureTask#reExecutePeriodic()方法,如下图所示:
该方法传入outerTask,而该属性就是ScheduledFutureTask本身,在该方法中,会再次将outerTask放入阻塞队列中,并调用ScheduledFutureTask#ensurePrestart()方法。
总结一下:其实到这里为止,就只是知道ScheduledFutureTask会被放入阻塞队列中,然后在Worker#run()方法中取出并运行Runnable#run()方法,然后最多也只知道,ScheduledFutureTask在被重新放放入队列之前,会计算下次运行的时间,即重新给time属性赋值。但是并没有体现按照我们设置的10s进行定时的调度,那这个所谓的"定时",是不是在队列中体现的呢?因为只有从队列中拿出了task,才可以执行,没有拿到,程序就会被阻塞。想到这里,那就看看获取task的api是怎么处理的,看看DelayQueue#task()方法,如下图所示:
在该代码中,首先是通过它的属性lock,调用lock.lockInterruptibly()进行加锁(配合条件注解使用),然后是进入for死循环,在该方法中,调用q.peek()方法,这个q是DelayQueue的属性,是PriorityQueue类型,带有排序功能。这里的排序,猜都能猜到是按照任务触发的时间来排序的。按照最小堆二叉树排序的,也就是说,触发事件最短的,会排在队首,用于被获取。当然,仅仅这样还不够,因为获取到了最短就要执行的任务,并不意味着该任务就是马上要运行。因此在代码中,通过q.peek()方法,得到task后,先判断其是否为空发,如果不为空,则调用task的getDelay()方法,而task是之前放入队列的ScheduledFutureTask对象,因此调用的是ScheduledFutureTask#getDelay()方法,如下图所示:
算出距离当前时间执行还有多少纳秒并返回这个纳秒数delay。只有当得到的delay的值小于等于0的时候,意味着task马上要执行了,因此只要返回这个task即可;如果delay大于0的话,说明此时还不能返回该task,因此需要在返回前先等待这个返回的delay的时间,即调用available.awaitNanos(delay),等待delay长的时间,再唤醒。由于是for死循环,会重新上述操作,这次就可以拿到task,并且等待时间也是小于等于0,因此满足条件,直接返回这个task供使用。ScheduledThreadPoolExecutor另外两个方法调度实现逻辑差不多,有时间可以自己研究研究,这里我就不再多组赘述了。以上就是ScheduledThreadPoolExecutor源码实现任务调度的核心逻辑,如有错误,恳请批评指正,在此感激不尽!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。