赞
踩
- /**
- * Executes the given task sometime in the future. The task
- * may execute in a new thread or in an existing pooled thread.
- *
- * If the task cannot be submitted for execution, either because this
- * executor has been shutdown or because its capacity has been reached,
- * the task is handled by the current {@code RejectedExecutionHandler}.
- *
- * @param command the task to execute
- * @throws RejectedExecutionException at discretion of
- * {@code RejectedExecutionHandler}, if the task
- * cannot be accepted for execution
- * @throws NullPointerException if {@code command} is null
- */
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- /*
- * Proceed in 3 steps:
- *
- * 1. If fewer than corePoolSize threads are running, try to
- * start a new thread with the given command as its first
- * task. The call to addWorker atomically checks runState and
- * workerCount, and so prevents false alarms that would add
- * threads when it shouldn't, by returning false.
- *
- * 2. If a task can be successfully queued, then we still need
- * to double-check whether we should have added a thread
- * (because existing ones died since last checking) or that
- * the pool shut down since entry into this method. So we
- * recheck state and if necessary roll back the enqueuing if
- * stopped, or start a new thread if there are none.
- *
- * 3. If we cannot queue task, then we try to add a new
- * thread. If it fails, we know we are shut down or saturated
- * and so reject the task.
- */
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
看方法注释:
执行给定的任务在将来的某个时间。该任务可能会用一个新的线程来执行,也有可能用线程池中一个已有的线程来执行。
如果该executor已经被shutdown了,或者因为容量已满,任务不会被执行,通过RejectedExecutionHandler来处理剩下流程。
将来的某个时间执行:说的是任务会入队列,然后根据线程池目前的各项指标状况来决定何时执行。
新的线程或已有线程:根据线程池的各项指标状况来决定是唤醒线程池中一个已有的阻塞线程来执行还是new一个Thread来执行任务。
看方法内部注释:
1.如果当前正在run的线程数小于corePoolSize,那么就调用addWorker方法来new一个线程用来执行传入的任务。
对应代码片:
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
2.如果addWorker方法执行失败了,任务要入队列,如果成功入队列了,需要做double check来处理一些极端情况,比如线程池是否shutdown了等等。
对应代码片:
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
3.如果任务无法入队列,再次尝试addWorker,这次是用正在run的线程数和maximumPoolSize比,如果超过了maximumPoolSize则reject任务,说明线程池已经饱和了。
对应代码片:
- else if (!addWorker(command, false))
- reject(command);
- /*
- * Methods for creating, running and cleaning up after workers
- */
-
- /**
- * Checks if a new worker can be added with respect to current
- * pool state and the given bound (either core or maximum). If so,
- * the worker count is adjusted accordingly, and, if possible, a
- * new worker is created and started running firstTask as its
- * first task. This method returns false if the pool is stopped or
- * eligible to shut down. It also returns false if the thread
- * factory fails to create a thread when asked, which requires a
- * backout of workerCount, and a recheck for termination, in case
- * the existence of this worker was holding up termination.
- *
- * @param firstTask the task the new thread should run first (or
- * null if none). Workers are created with an initial first task
- * (in method execute()) to bypass queuing when there are fewer
- * than corePoolSize threads (in which case we always start one),
- * or when the queue is full (in which case we must bypass queue).
- * Initially idle threads are usually created via
- * prestartCoreThread or to replace other dying workers.
- *
- * @param core if true use corePoolSize as bound, else
- * maximumPoolSize. (A boolean indicator is used here rather than a
- * value to ensure reads of fresh values after checking other pool
- * state).
- * @return true if successful
- */
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
-
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
-
- for (;;) {
- int wc = workerCountOf(c);
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
-
- Worker w = new Worker(firstTask);
- Thread t = w.thread;
-
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
- int rs = runStateOf(c);
-
- if (t == null ||
- (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null))) {
- decrementWorkerCount();
- tryTerminate();
- return false;
- }
-
- workers.add(w);
-
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- } finally {
- mainLock.unlock();
- }
-
- t.start();
- // It is possible (but unlikely) for a thread to have been
- // added to workers, but not yet started, during transition to
- // STOP, which could result in a rare missed interrupt,
- // because Thread.interrupt is not guaranteed to have any effect
- // on a non-yet-started Thread (see Thread#interrupt).
- if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
- t.interrupt();
-
- return true;
- }
先看注释:
- if (compareAndIncrementWorkerCount(c))
- break retry;
如果线程被stop了或者可以shutdown,addWorker方法返回false。
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
- int rs = runStateOf(c);
-
- if (t == null ||
- (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null))) {
- decrementWorkerCount();
- tryTerminate();
- return false;
- }
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
- int rs = runStateOf(c);
-
- if (t == null ||
- (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null))) {
- decrementWorkerCount();
- tryTerminate();
- return false;
- }
-
- workers.add(w);
-
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- } finally {
- mainLock.unlock();
- }
这个代码块,首先加锁,整个类用到这个锁的地方,除了获取该线程池的一些关键参数之外,就是shutdown和terminate等相关操作。
private static int CAPACITY = (1 << COUNT_BITS) - 1;
CAPACITY是1左移COUNT_BITS,然后减一。
private static int COUNT_BITS = Integer.SIZE - 3;
COUNT_BITS是Integer的位数减去3,即29。
private static final int RUNNING = -1 << COUNT_BITS;
二进制,1111111111111111111111111111111111100000000000000000000000000000,和CAPACITY互补,所以~CAPACITY也是这个值。
private static final int STOP = 1 << COUNT_BITS;
二进制,100000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
二进制,1000000000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
二进制,1100000000000000000000000000000000
private static int runStateOf(int c) { return c & ~CAPACITY; }
把低28位都清掉了,只拿高位的运行状态。
private static int workerCountOf(int c) { return c & CAPACITY; }
只取低28位,即拿workerCount。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。