赞
踩
目录
无返回提交任务execute(Runnable command)方法
带返回提交任务Future submit(Runnable task)
新建一个工作线程---addWorker(Runnable firstTask,boolean core)方法
工作线程销毁之后---processWorkerExit方法
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
比如BIO编程过程中,需要利用线程来解决阻塞问题,如果不用线程池,那么可能会造成很多线程被频繁的创建和销毁,这个时候就可以利用线程池。
Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
- public interface Executor {
- void execute(Runnable command);
- }
线程池的核心实现类,用于执行被提交的任务。
构造线程池的七大参数:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- //在自定义线程池时需要注意参数需要符合要求,否则可能会造成下面两种异常
- if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
- throw new IllegalArgumentException();
-
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
-
- this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- /**
- * ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,共4个字节,32个比特位。
- * 利用切割比特位来表示两部分信息: 线程池的运行状态 (runState占用3个比特位来表示5种状态) 和 线程池内工作线程的数量 (workerCount)
- * 默认为RUNNING态,有效线程数量为0
- */
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0);
- private static final int COUNT_BITS = Integer.SIZE - 3; //29
- private static final int CAPACITY = (1 << COUNT_BITS) - 1; //00011111 11111111 11111111 11111111 == 536870911
-
- // 线程池的运行状态存储于高阶位中,5种状态利用3个比特位来表示
- private static final int RUNNING = -1 << COUNT_BITS; //111 00000000000000000000000000000 这是一个负数
- private static final int SHUTDOWN = 0 << COUNT_BITS; //000 00000000000000000000000000000
- private static final int STOP = 1 << COUNT_BITS; //001 00000000000000000000000000000
- private static final int TIDYING = 2 << COUNT_BITS; //010 00000000000000000000000000000
- private static final int TERMINATED = 3 << COUNT_BITS; //011 00000000000000000000000000000
-
- //~CAPACITY == 11100000 00000000 00000000 00000000,该方法用于获取线程池的运行状态runState的值
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- //CAPACITY == 00011111 11111111 11111111 11111111,该方法用于获取线程池内工作线程数量workerCount的值
- private static int workerCountOf(int c) { return c & CAPACITY; }
- private static int ctlOf(int rs, int wc) { return rs | wc; } //根据runState的值和workerCount的值组合获取ctl的值
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- public void execute(Runnable command) {
- //先进行判空处理
- if (command == null)
- throw new NullPointerException();
-
- //获取当前ctl的值
- int c = ctl.get();
- //如果当前线程池中有效线程个数 < 核心线程个数
- if (workerCountOf(c) < corePoolSize) {
- /*创建新线程执行新任务
- *
- */
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
-
- /* 能执行到这里的条件:
- * 1.当前线程池中的有效线程个数 >= 核心线程个数;
- * 2.创建新线程执行任务(addWorker)失败。
- * 若当前线程池的运行状态为RUNNING态,则将当前任务添加进阻塞队列中,若队列已满,则offer方法会返回false;若该任务被成功加入到阻塞队列中,则执行下面if中的逻辑。
- */
- if (isRunning(c) && workQueue.offer(command)) {
- //获取ctl的值
- int recheck = ctl.get();
- //如果当前线程池的运行状态不是RUNNING态,则将刚才添加的任务从阻塞队列中移除。移除成功之后利用拒绝策略处理该任务
- if (!isRunning(recheck) && remove(command))
- reject(command);
-
- /*执行到这里的条件:
- * 1.当前线程池的运行状态就是RUNNING态
- * 2.当前线程池的运行状态已经不是RUNNING态,且阻塞队列为空
- *判断当前有效线程个数如果为0
- */
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
-
- /*能执行到这里的条件:
- * 当前线程池中的有效线程个数 >= 核心线程个数;
- * 且:
- * 1.当前线程池的运行状态不是RUNNING态
- * 2.就是RUNNING态但任务添加到阻塞队列中失败(阻塞队列已满)
- *此处执行addWorker如果返回false,则表示不能再创建工作线程了,则将任务交给拒绝策略。
- */
- else if (!addWorker(command, false))
- reject(command);
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
需要注意的是:只有在RUNNING态的时候才可以向阻塞队列里面添加新任务。
大致的处理流程如下:
该方法没有在ThreadPoolExecutor中实现,而是在其父类AbstractExecutorService中实现,具体如下:
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
-
- RunnableFuture<Void> ftask = newTaskFor(task, null);
-
- execute(ftask);
- return ftask; //返回的是FutureTask实例对象
- }
-
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
大概关系是这样的,有一个Callable<V>接口(创建线程的其中一种方式:可以通过实现该接口)该接口定了一个V call()方法,在FutureTask<V>类中维护了一个该接口的成员变量,由于FutureTask也是间接实现了Runnable接口的,所以在该类中实现了run方法。在这个run方法中调用的就是这个Callable成员的call方法,他将结果任务的执行结果存储进FutureTask实例中。
该方法的返回值表示是否已经启动工作线程,即意味着新建一个工作线程是否成功。
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get(); //读取ctl的值
- int rs = runStateOf(c); //获取线程池的运行状态
-
- /*判断当前线程池还是否需要执行任务
- *如果当前线程池的状态为RUNNING态则不会返回false
- *返回false的条件(大前提:当前线程池状态不是RUNNING态),在此基础下下面三个条件有任何一个不成立都会直接返回,而不新建工作线程:
- * 1.当前线程池的状态为SHUTDOWN态
- * 2.所提交任务为null
- * 3.阻塞队列非空
- */
- if (rs >= SHUTDOWN &&
- !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
- return false;
-
- for (;;) {
- //获取当前池中线程个数
- int wc = workerCountOf(c);
- /*
- *若当前池中线程个数 >= 2的29次方减1,则无法创建新线程。池中最大线程数量为2的29次方减1个
- *如果core为true则于核心先称数量进行比较,否则与最大线程数量进行比较
- */
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- //将workerCount的值加1,并跳出外层循环
- if (compareAndIncrementWorkerCount(c))
- break retry;
-
- //如果线程状态被修改,则再次执行外层循环
- c = ctl.get();
- if (runStateOf(c) != rs)
- continue retry;
- }
- }
-
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- /*
- *此处创建Worker实例,并将任务firstTask设置进去
- *注意Worker类中有两个特殊的字段:1. Runnable firstTask 2. final Thread thread
- *Worker类本身也继承了Runnable接口,实现了其run()方法
- */
- w = new Worker(firstTask);
- //这里的t是w本身表示的线程对象,而非firstTask。
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //获取当前线程池的运行状态rs
- int rs = runStateOf(ctl.get());
-
- /*
- *rs < SHUTDOWN的状态只有RUNNING态
- *能进去下面if的条件:
- * 1. 当前线程池运行状态为RUNNING
- * 2.当前线程池状态为SHUTDOWN而且firstTask为null
- */
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive())
- throw new IllegalThreadStateException();
- //HashSet<Worker> workers线程池中利用HashSet保存的worker对象
- workers.add(w);
- int s = workers.size();
- //largestPoolSize用来记录线程池中最大的线程数量
- if (s > largestPoolSize)
- largestPoolSize = s;
- //任务添加成功(线程创建成功)
- workerAdded = true;
- }
- }finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- //启动工作线程,这里调用的是Worker类中的run()方法
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (!workerStarted)
- addWorkerFailed(w);
- }
-
- return workerStarted;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
通过上面 方法可以看到所有的工作线程Worker对象被存入一个HashSet中(workers),所有对workers的操作都利用ReentrantLock保证线程安全。
- //ThreadPoolExecutor的内部类,继承了AQS(中断工作线程时会使用),实现了Runnable接口
- private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
- private static final long serialVersionUID = 6138294804551838833L;
-
- final Thread thread; //工作线程
- Runnable firstTask; //要运行的初始任务,可能为null
- volatile long completedTasks; //线程任务计数器
-
- //唯一的构造方法
- Worker(Runnable firstTask) {
- setState(-1);
- this.firstTask = firstTask;
- //这里的thread表示自己本身
- this.thread = getThreadFactory().newThread(this);
- }
-
- //addWorker方法中的t.start()就是执行的该方法。
- public void run() {
- runWorker(this);
- }
-
- //当前线程是否持有独占锁
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
-
- //获取锁
- protected boolean tryAcquire(int unused) {
- //注意这里只有当前AQS中state字段值为0时才可能争抢到锁,该锁不支持重入
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
-
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
-
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
-
- void interruptIfStarted() {
- Thread t;
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Worker类本来就是一个线程对象,就是为了执行提交给线程池的任务的,所以理所应当实现Runnable接口。值得注意的是Worker类还继承了AQS,实现了独占锁。所以Worker对象实例也可以看成一个锁对象。(独占锁 且 不支持重入)
- final void runWorker(Worker w) {
- //获取当前线程对象
- Thread wt = Thread.currentThread();
- //获取第一个任务
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // 允许中断
- boolean completedAbruptly = true;
- try {
- //该工作线程中第一个任务不为null,或者 可以通过getTask()取得任务
- while (task != null || (task = getTask()) != null) {
- w.lock();
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
- wt.interrupt();
- try {
- //在ThreadPoolExecutor中该方法是一个空方法
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- //执行任务。
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- //任务计数器加1
- w.completedTasks++;
- //释放锁
- w.unlock();
- }
- }
- //如果执行任务的过程中没有发生异常,则completedAbruptly会被赋值为false
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
注意上面的runWorker方法通过while循环不断的在获取任务来让工作线程去执行该任务,若获取的任务为null,那么当前的工作线程就会被销毁。而且在while循环中一个条语句就w.lock(),执行完一个任务才会进行unLock操作 。继承AQS是为了在中断工作线程时会用到。只不过现在可以确定的是,若工作线程正在执行任务,那么它一个是拿到w锁的。
- private Runnable getTask() {
- boolean timedOut = false;
-
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- /*
- *若当前线程池的工作状态为RUNNING则不会进入下面if。
- *1.若状态为STOP、TIDYING、TERMINATED则当前工作线程不能执行任务。
- *2.若状态为SHUTDOWN,且阻塞队列为空,则获取任务为null
- */
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- //workerCount的值减1
- decrementWorkerCount();
- return null;
- }
- //获取工作线程数量
- int wc = workerCountOf(c);
-
- //若allowCoreThreadTimeOut设置为true 或者 当前池中工作线程数量大于核心线程数量 则timed为true
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
-
- //若当前工作线程数量已经超过最大线程数量,则也获取不到任务,会从该方法中返回null,进而结束该工作线程
- if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
-
- try {
- /*
- *若allowCoreThreadTimeOut设置为true 或者 当前池中工作线程数量大于核心线程数量
- * 则:在指定的时间内从阻塞队列中获取任务,若取不到则返回null
- *若allowCoreThreadTimeOut设置为false 而且 当前池中工作线程数量小于核心线程数量
- * 则:在指定的时间内从阻塞队列中获取任务,若取不到则一直阻塞
- */
- Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
- if (r != null)
- return r;
- //若r == null,则此处timedOut的值被设置为true
- timedOut = true;
- } catch (InterruptedException retry) {
- //如果阻塞等待过程中线程发生中断,则将timeOut设置为false,进入下一次循环
- timedOut = false;
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
上面getTask()方法会从阻塞队列中获取任务,取不到任务的情况如下:
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //检测是否有关闭线程池的权限
- checkShutdownAccess();
- //将线程池状态设置为SHUTDOWN态
- advanceRunState(SHUTDOWN);
- //中断空闲线程(没有执行任务的线程)
- interruptIdleWorkers();
- //该方法在ThreadPoolExecutor中是一个空方法
- onShutdown();
- } finally {
- mainLock.unlock();
- }
- //尝试将线程池状态设置为TERMINATED状态。
- tryTerminate();
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
对于线程池关闭的整体逻辑:
其中中断的逻辑如下:
- private void interruptIdleWorkers() {
- interruptIdleWorkers(false);
- }
-
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //检查是否有关闭权限
- checkShutdownAccess();
- //将线程池状态设置为STOP态
- advanceRunState(STOP);
- interruptWorkers();
- //获取阻塞队列中等待执行的任务
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
-
- tryTerminate();
- //获取阻塞队列中等待执行的任务
- return tasks;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
注意上述shutdownNow()方法和shutdown()方法在中断线程时调用的是不同的方法,shutdownNow()方法中会先将线程池状态设置为STOP态,然后中断所有工作线程,具体源码如下:
- private void interruptWorkers() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers)
- w.interruptIfStarted();
- } finally {
- mainLock.unlock();
- }
- }
-
- //该方法在Worker类中定义,该方法用于中断当前的worker实例表示的工作线程
- void interruptIfStarted() {
- Thread t;
- //注意这里的逻辑,getState() >= 0 : 正在执行任务的工作线程state == 1,空闲的工作线程state == 0
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
所以shutdownNow方法的具体流程如下:
shutdown方法会结束空闲线程,空闲线程在阻塞队列中被阻塞着,shutdown方法会设置中断信号,然后在getTask方法中捕获该中断信号,进而结束空闲线程。
shutdownNow方法会打断正在执行任务的工作线程,它是对所有线程都发中断信号,所以这里能打断的任务执行的前提是当前这个任务的可以捕获中断信号。如果这个任务是一个死循环没有结束期限,并且不能捕获中断信号,那么执行这个任务的工作线程永远都不会停止。因为本次任务无法响应该中断信号。如果这个任务可以结束,但是不能响应中断信号,那么也是该任务执行完毕之后才能结束当前的工作线程。
当然像那种死循环不能结束的情况,如果可以,可以将其设置为守护线程(当运行的唯一线程都是守护进程线程时,Java虚拟机将退出。)将其结束。
该方法中会将线程池的状态设置为TERMINATED,在shutdown()方法和shutdownNow方法的最后都调用了该方法。
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- /*
- *RUNNING、TIDYING、TERMINATED这三种状态都会直接返回。
- *SHUTDOWN状态下如果阻塞队列不为空依然会直接返回
- */
- if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
- return;
- /*
- *执行到这里的前提:当前状态为STOP状态 或者 当前为SHUTDOWN状态但阻塞队列为空。
- *这里的判断意思是:如果此时还存在工作线程
- */
- if (workerCountOf(c) != 0) {
- //ONLY_ONE为true,如果参数为true那么该方法内部只会中断一个空闲的线程。
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
-
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //将当前线程池状态先设置为TIDYING,设置成功的前提是ctl的值未发生变化:状态或者工作线程数量
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- //在ThreadPoolExecutor中该方法为一个空方法
- terminated();
- } finally {
- //最后将线程池状态设置为TERMINATED
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
分析完该方法之后就可以画出线程池的状态变迁图了,如下:
该方法是在每一个工作线程销毁前都要执行的。
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- //completedAbruptly == true : 表示执行任务的过程中发生异常,导致该工作线程结束
- if (completedAbruptly)
- decrementWorkerCount();
-
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //在每个工作线程结束时都会将自己执行的任务数汇总给线程池
- completedTaskCount += w.completedTasks;
- //将当前工作线程对象从池子中移除,因为它将被销毁
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
-
- //尝试将线程池状态设置为TERMINATED
- tryTerminate();
-
- int c = ctl.get();
- /*
- *该if条件成立的条件是:线程池状态为SHUTDOWN 或者 RUNNING
- * SHUTDOWN状态属于正常关闭线程池销毁线程,completedAbruptly为false
- * RUNNING状态属于执行任务时出现异常导致执行该任务的工作线程需要被销毁
- */
- if (runStateLessThan(c, STOP)) {
- //该if条件成立表示SHUTDOWN态下销毁工作线程
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- //如果allowCoreThreadTimeOut被设置为true 而且 还有正在等待执行的任务
- if (min == 0 && !workQueue.isEmpty())
- min = 1;
- /*
- * 1.allowCoreThreadTimeOut == false(默认) :
- * 如果当前线程池中的工作线程数量 >= 核心线程数量,则可以直接返回
- * 2.allowCoreThreadTimeOut == true :
- * 如果当前线程池中的工作线程数量 >= 1,则可以直接返回
- */
- if (workerCountOf(c) >= min)
- return;
- }
-
- addWorker(null, false);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
注意上述方法执行的条件:
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (;;) {
- //如果当前线程池状态为TERMINATED,才会返回true
- if (runStateAtLeast(ctl.get(), TERMINATED))
- return true;
- if (nanos <= 0)
- return false;
- /*
- *阻塞当前执行awaitTermination方法的线程,阻塞完毕之后还会再次循环,若线程池状态还不是TERMINATED,那么会返回false。
- *被唤醒说明线程池状态已经为TERMINATED。已经没有工作线程了。
- */
- nanos = termination.awaitNanos(nanos);
- }
- } finally {
- mainLock.unlock();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
可以看到调用该方法,如果线程池状态不为TERMINATED,即线程池中还有工作线程,那么该方法会阻塞你指定的时间。
一般该方法配合shutdown()方法使用,shutdown()方法不会立刻销毁所有线程,它会在当前已提交的所有任务都执行完毕之后才销毁(所有线程都销毁后线程池状态变为TERMINATED)。那么可以调用该方法指定一个期望的时间,等任务执行结束之后再执行主方法的逻辑。
要想合理地配置线程池,就必须首先分析任务特性,可以从如下几个角度来分析。
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置N(cpu) + 1个线程的线程池。由于IO密集型的任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N(cpu)。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行拆分。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。(注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行)
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数来进行监控,在监控线程池的时候可以使用一下属性。
还可以通过扩展线程池进行监控。可以通过继承线程池还自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也可以在任务执行前、任务执行后、线程池关闭前执行一些代码来监控。例如:监控任务的平均执行时间、最大执行时间和最小执行时间等。
JDK中有一个Executor框架的工具类Executors,可以通过该类来直接使用帮我们定义好的线程池。下面简述其中三个:
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
该线程池的 核心线程数量 == 最大线程数量,由于这两个值相等所以阻塞的时间就无所谓了,因为这个时间就是用来销毁核心线程外的空闲线程的,所以阻塞时间设置为0L,而且使用的阻塞队列为LinkedBlockingQueue<Ruunable>,这是一个基于链表的队列,而且其默认的队列容量为Integer.MAX_VALUE,这相当于一个无界的队列了,也正是因为使用的阻塞队列是无界的,所以maximumPoolSize将失去意义,因为当线程池达到核心线程数量时,在没有空闲线程的情况下新任务将被提交到队列里面,当队列满了才继续创建线程直到maximumPoolSize为止,所以如果队列无界,则最大线程数的配置将失去意义。而且无界队列也会导致拒绝策略执行不到。拒绝策略是当阻塞队列已满并且当前线程数量已经达到maximumPoolSize才能执行到。
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
可以看到该线程池并非ThreadPoolExecutor的实例,而是FinalizeableDelegatedExecutorService的实例,该类又继承自DelegatedExecutorService类,其中除过finalize()方法外的其他方法都是在DelegatedExecutorService中实现。该线程池不可以使用ThreadPoolExecutor中的一些独有方法,但是本质上都是使用ThreadPoolExecutor。
- static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
- FinalizableDelegatedExecutorService(ExecutorService executor) {
- super(executor);
- }
- protected void finalize() {
- super.shutdown();
- }
- }
-
- static class DelegatedExecutorService extends AbstractExecutorService {
- private final ExecutorService e;
- DelegatedExecutorService(ExecutorService executor) { e = executor; }
- //……
- }
通过上面配置可看出,该线程池只有一个核心线程,最大线程数也被设置为1,所以依然阻塞时间为0L,也就是说全程只有一个线程在执行任务,只不过这个工作线程一直在复用来执行不同的任务。
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
该线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE(可以当成无限),阻塞时间为60秒,使用的阻塞队列没有容量。由于没有核心线程,所以该线程池中的空闲线程如果空闲60秒就会销毁。如果单个任务处理的时间比较长,那么可能导致池子中再不停的创建新线程来执行任务。该线程池执行任务不需要等,一旦提交可以马上执行。该线程池的线程复用性极为不好。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。