赞
踩
传统答案:
通过继承Thread类并重写其run方法来创建线程。具体步骤包括定义Thread类的子类,在子类中重写run方法以实现线程的具体逻辑,然后创建子类的实例并调用其start方法来启动线程。
通过实现Runnable接口并重写其run方法来创建线程。这种方式相较于继承Thread类更为灵活,因为Java不支持多重继承,而实现接口则没有这个问题。实现Runnable接口后,需要将其实现类的实例作为参数传递给Thread类的构造函数,然后调用Thread对象的start方法来启动线程。
Callable接口与Runnable接口类似,但其call方法可以返回值,并且可以抛出异常。创建Callable实现类的实例后,通常使用FutureTask类来包装它,并将FutureTask对象作为Thread对象的target来创建并启动新线程。调用FutureTask对象的get方法可以获取子线程执行结束后的返回值。
Executor框架提供了多种线程池的实现,如单线程池(newSingleThreadExecutor)、固定大小的线程池(newFixedThreadPool)、可缓存的线程池(newCachedThreadPool)以及定时线程池(newScheduledThreadPool)等。使用线程池可以更有效地管理线程的创建、销毁和调度,从而提高系统的性能和稳定性。
在源码中找到ThreadPoolExecutor类中参数最多的一个构造方法
这七个就是线程池的核心参数:
unit
参数指定。线程池执行任务的方法是execute方法。
想要查看执行流程的话,需要查看的就是execute方法的源码。
将源码文本粘贴出来,逐步分析:
- // 任务交给线程池处理时,一般会执行execute方法,并传递任务
- // command 就是传递过来的任务
- public void execute(Runnable command) {
- // 非空校验
- if (command == null)
- throw new NullPointerException();
-
- // 以下是核心业务流程
- // ctl 是什么?ctl是线程池的一个核心属性。
- // 想要了解线程池的执行流程需要先知道线程池的核心属性
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
由于在execute方法中使用到了线程池的核心属性 ctl,所以我们先看一下ctl
粘贴出来分析:
- // 线程池的核心属性
- // AtomicInteger是系统底层保护的int类型,通过对int类型的数据进行封装,提供执行方法的控制进行值的原子操作。
- // 可以理解为private final int ctl = 0;
- // ctl 存储了线程池的两个核心属性:线程池状态和工作线程个数
- // int类型占32个比特位
- // 线程池状态:基于ctl的高三位存储线程池状态
- // 工作线程个数:基于ctl的低29位存储工作线程个数
- // 那么线程池中最多可以有多少个工作线程呢?答案是2^29个
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
- // Integer.SIZE=32 所以COUNT_BITS =29
- private static final int COUNT_BITS = Integer.SIZE - 3;
-
- // CAPACITY 就是2^29 也就是线程池中工作线程数的最大值
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
-
- // 下面5个属性是线程池的状态
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
线程池的生命周期(五个状态):
整明白了线程池的核心属性,下面可以继续看ThreadPoolExecutor的execute方法了
- // 任务交给线程池处理时,一般会执行execute方法,并传递任务
- // command 就是传递过来的任务
- public void execute(Runnable command) {
- // 非空校验
- if (command == null)
- throw new NullPointerException();
-
- // 以下是核心业务流程
- // ctl.get():拿到存储线程池状态和工作线程个数的核心属性
- int c = ctl.get();
- // workerCountOf() 获取工作线程个数 corePoolSize:核心线程数
- // 判断当前工作线程数是否小于核心线程数(构建线程池时指定的)
- if (workerCountOf(c) < corePoolSize) {
- // 创建工作线程
- //第一个参数 command :传递的任务
- //第二个参数 是否是核心线程,true:创建核心线程;false:创建非核心线程
- // addWorker方法 返回值是布尔类型 代表创建是否成功
- if (addWorker(command, true))
- // 创建成功,结束,任务给核心线程处理
- return;
- //创建失败 重新获取核心属性ctl
- c = ctl.get();
- }
-
-
- // 如果当前工作线程数已经达到核心线程数 执行下面的语句
- // isRunning(c) 判断当前线程池是不是running状态 如果是直接将任务扔进工作队列
- // offer方法:扔到工作队列,成功返回true,失败返回false
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 尝试构建非核心线程去处理当前任务
- else if (!addWorker(command, false))
- // 如果非核心线程创建失败 执行拒绝策略
- reject(command);
- }
从上面源码可以看出,执行流程为:核心线程-工作队列-非核心线程-拒绝策略
如果想要修改默认的顺序可以重写execute方法来实现。
问题1:
下面代码块,是在工作线程数量小于核心线程数量时执行的代码,作用是,工作线程数量小于核心线程数量时,创建一个核心线程,并把任务给这个核心线程。
- if (workerCountOf(c) < corePoolSize) {
-
- if (addWorker(command, true))
- // 创建成功,结束,任务给核心线程处理
- return;
- //创建失败 重新获取核心属性ctl
- c = ctl.get();
- }
那么问题是为什么创建核心线程失败后要使用 c = ctl.get();来重新获取核心属性呢?
举个例子来回答这个问题,假如核心线程数是5,当前工作线程数是4,在并发情况下,有两个线程同时进入到if代码块里面,都去执行addWorker(command, true)方法,创建核心工作线程,由于addWorker方法内部通过一定的方式保证了原子性,所以只能创建成功1个核心工作线程,另一个不会创建成功(返回false),这种情况,不会直接return,会去继续执行方法下面的代码,下面的代码会使用到核心线程属性ctl,而此时显然ctl核心线程属性已经发生了变化(另外一个线程创建成功了一个工作线程,工作线程数量发生了变化),需要重新获取最新的。
问题2:
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
这段代码中,主要实现的是如果线程池是Running状态,将任务放入工作队列的功能。
那么isRunning(c) 和workQueue.offer(command)两句代码不是就能实现这个功能了吗,为什么还要写上述代码?
因为在多线程环境中,当执行完isRunning(c) 和执行workQueue.offer(command)之间的间隙,线程池的状态可以就发生了变化。
比如一开始线程池是Running状态,执行isRunning(c)得到返回值true,然后在执行workQueue.offer(command)代码前线程池状态改变,不再是Running状态了,不过由于之前获取到的isRunning(c)的返回值是true,所以还是会执行workQueue.offer(command)方法。
这样显然是有问题的,所以为了应对这种情况,在下面添加了以下代码:
- // ctl.get();重新获取线程池核心属性
- int recheck = ctl.get();
- // 根据核心属性 重新校验线程池是否是Running状态,如果不是Running状态,移除前边添加的任务并执行拒绝策略
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 如果线程池状态还是为Running状态 则执行下面语句
- // 如果线程池仍然在运行,但此时工作线程的数量为0(workerCountOf(recheck)返回0),则调用addWorker(null, false)方法来添加一个新的核心线程。
- //null 作为第一个参数,表示这个新线程在启动时不带有任何初始任务 ,false代表创建非核心线程
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
关于 addWorker(null, false):
firstTask
参数允许你指定一个新创建的线程应该首先运行的任务。如果 firstTask
是 null
,那么新线程将不会立即执行任何任务,而是会等待从工作队列中取出一个任务来执行。
在线程池的工作机制中,如果当前线程数少于 corePoolSize
(核心线程数),并且有新任务提交,那么线程池会尝试启动一个新线程来执行这个任务,而不是将任务放入工作队列。
查看创建工作线程的流程主要是查看addWorker方法的源码
对源码逐步解析:
- // 创建工作线程(核心线程和非核心线程都是基于addWorker创建的)
- // 第一个参数:任务,第二个参数:指定是核心还是非核心
- private boolean addWorker(Runnable firstTask, boolean core) {
-
- // 第一部分代码
- // 做两件事情:
- // 1.判断线程池状态(外层for循环)
- // 2.判断线程个数(内层for循环),然后基于cas修改ctl属性,给工作线程个数+1
-
-
- retry: // 给外层for循环取一个名称retry
- for (;;) { // 死循环 相当于while(true)
- int c = ctl.get(); // 拿到核心属性ctl
- int rs = runStateOf(c); // rs:高三位的线程池状态
-
- // rs >=SHUTDOWN 即rs为SHUTDOWN/STOP/TIDYING/TERMINATED 即不是Running状态
- // 根据前面线程池五个属性的那张图可知线程池不能接收新任务
- if (rs >= SHUTDOWN &&
- // 第二个判断 解决在SHUTDOWN 状态下,没有工作线程,但是工作队列有任务
- // 要构建一个线程处理阻塞队列(工作队列)任务的情况
- // rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())的情况不能走return语句
- ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
- return false;
-
-
- for (;;) {
- int wc = workerCountOf(c); // 工作线程个数
- // CAPACITY :工作线程最大值
- if (wc >= CAPACITY ||
- // 核心线程:判断corePoolSize
- // 非核心线程:判断maximumPoolSize
- // core 是addWorker的第二个参数 true代表核心线程,false代表创建非核心线程
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- // 为什么要用CAS的方式修改?
- // 为了避免多线程并发创建工作线程,导致破坏设置的核心参数(比如设置核心参数是5,创建了6个)
- if (compareAndIncrementWorkerCount(c))
- // 如果成功 跳出外层循环
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
-
-
- // 第二部分代码
- // 做两件事情:1.创建工作线程 2.启动工作线程
-
-
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;// Worker 对象就是工作线程
- try {
- // 创建工作线程,并把任务交给Worker 对象
- w = new Worker(firstTask);
- // 将new Worker时创建的thread拿到
- final Thread t = w.thread;
- // 判断使用线程池的用户,指定的线程工厂构建的thread不是null
- if (t != null) {
- // 同步锁 为什么要加锁?
- // 下面workers.add(w); 按住Ctrl+单击 查看可知 workers是HashSet类型
- // HashSet不是线程安全的 为了保证线程安全,所以加锁了,不加锁不安全
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
-
- int rs = runStateOf(ctl.get());
-
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- // 如果工作线程添加成功 则启动该线程
- if (workerAdded) {
- t.start();// 启动一个线程,执行run方法
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
根据源码总结:
线程池在创建工作线程时:
线程池中的工作线程就是Worker对象,查看一下Worker里面做了什么事情。
- // Worker是工作线程,Worker也会存储一个任务(只存储第一个任务)
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
-
- final Thread thread; // 工作线程
-
- Runnable firstTask; // 任务
-
- volatile long completedTasks;
-
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- // 任务存放
- this.firstTask = firstTask;
- // 线程构建 传入this 当前对象
- // 创建的线程调用start启动时,执行的是谁的run方法?
- // 执行的是Worker对象里面的run方法
- this.thread = getThreadFactory().newThread(this);
- }
-
- // addWorker方法中执行thread.start()方法后,执行的是Worker对象中的run方法
- public void run() {
- runWorker(this);
- }
- }
AQS是什么?
AQS是AbstractQueuedSynchronizer,是JUC包下的并发基础类,很多同步内容都是基于AQS实现的,比如:
Worker继承AQS干嘛?
Worker线程继承了AQS后,可以使用基于CAS修改的属性state
在shutdown状态下空闲线程要执行interrupt中断,工作中的线程,不能执行interrupt
工作线程在处理任务前,会先执行lock方法(将state从0改为1),也就是正在干活的线程state==1
在终端线程前,判断每一个线程的state,如果为0直接interrupt,如果为1什么也不做。
执行任务,就是启动工作线程后,执行了Worker对象中的run方法,run方法中执行了runWorker方法。
工作线程直接通过task.run执行任务,并且线程池预留了beforeExecute和afterExecute方法,可以在任务执行前后做一些额外处理。
- // runWorker传递的参数就是Worker对象本身
- final void runWorker(Worker w) {
- // 拿到工作线程中的thread
- Thread wt = Thread.currentThread();
- // 拿到Worker对象中存储的第一个任务
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // 如果Worker对象在启动时携带了任务,那就优先执行携带的任务
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // 判断线程池状态是不是stop 如果时stop强制中断当前线程
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 钩子函数 执行任务之前执行该函数;默认该方法方法体为空,没有实现,需要的话重写这个方法
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- // 执行任务
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 钩子函数 执行任务之后执行该函数,默认该方法方法体为空,没有实现,需要的话重写这个方法
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
工作线程在处理完自带的任务后会直接基于getTask方法,从阻塞队列中拉取任务。
如果是核心线程,默认情况下,会基于take方法在工作队列中拉取任务。
如果是非核心线程,会基于poll方法,拉取指定时间任务。(时间到了直接告辞)
- // 工作线程从阻塞队列拉取任务的操作
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
-
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // 判断线程池状态 如果状态已经变为stop或者状态为shutdown且工作队列任务都处理完毕
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- // 工作线程个数-1 并且干掉当前工作线程
- // 线程正常死亡:run方法结束
- decrementWorkerCount();
- return null;
- // return null: runWarker方法中 while循环结束,线程正常消亡;
- // 因为是Worker类中的run 方法中调用的runWarker方法,并且run方法中没有其他代码
- // 所以runWarker结束,run方法也就结束了,线程也就消亡了
- }
-
- int wc = workerCountOf(c);
-
- // 核心线程执行take 非核心线程执行poll方法(poll方法拉取最大空闲时间)
- // 线程池中的核心线程可以基于keepAliveTime(最大空闲时间)去结束吗?
- // 或者说线程池中的核心线程一定会永远存放在线程池里面吗?
- // 不一定 有一个属性allowCoreThreadTimeOut:是否允许核心线程超时,默认是false,但可以设置为true
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
-
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
-
- try {
-
- Runnable r = timed ?
- // poll方法,拉取阻塞队列任务,指定keepAliveTime时间
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- // take方法,死等任务,知道中断
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
shutdown方法:
- // 优雅的关闭线程池
- // 为什么要加锁?
- // 会操作Worker对象
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- advanceRunState(SHUTDOWN);// 线程池状态修改为shutdown
- interruptIdleWorkers();// 中断空闲的工作线程
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- }
shutdownNow方法:
在工作中核心线程数、最大线程数、最大空闲时间、任务队列怎么设置比较好?
如果为了充分发挥硬件性能,一般只需要考虑三个信息的设置
任务类型:
CPU密集型任务:线程一直在干活,不希望CPU做上下文切换。
io密集型任务:因为线程干一会儿歇一会儿。
混合型任务:因为混合型偶尔要求CPU一直调度,偶尔不干活,可以切换。
想要设置好核心线程数,去发挥服务器硬件性能,需要动态的调试和压测。为了避免调试参数时反复重启,并且成本太高,可以直接设置动态线程池,因为线程池提供了Set方法设置核心参数,以及get方法查看核心参数,可以在压测时,根据CPU占用率和使用情况,来调整核心线程数。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。