赞
踩
ThreadPoolExecutor 是JDK自1.5添加的线程池。以下是初始化线程池的构造方法。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
int corePoolSize:核心线程数
int maximumPoolSize:最大线程数
long keepAliveTime:空闲线程的存活时间
TimeUnit unit:时间单位
BlockingQueue workQueue:工作队列,待执行的任务会放到此队列中
ThreadFactory threadFactory:线程工厂,用来创建线程池中的线程
RejectedExecutionHandler handler:拒绝策略,当线程池关闭或者任务队列已满并且达到最大线程数时,需要执行的拒绝任务的策略
线程池中执行一个任务的代码如下。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
从代码中可以看到线程池执行一个线程的流程。
1、校验任务是否为空,如果任务为空则报空指针异常
2、获取线程池中的线程数,如果运行的线程少于corePoolSize数量,会尝试以给定命令作为第一个线程启动新线程(核心线程)的任务。
3、如果任务可以成功加入队列,那么我们仍然需要再次检查是否应该添加线程(因为自上次检查以来,已有的线程已死亡)或自进入此方法后线程池已关闭。所以我们重新检查状态,如果线程池停止则回滚队列,如果没有线程则启动新线程(非核心线程)。如果都不满足,则什么都不做,相当于只是将任务放到了队列中。
4、如果无法对任务排队,则尝试添加新任务线。如果失败了,我们知道我们已经关闭线程池或队列饱和了,所以拒绝这个任务。
ThreadPoolTaskExecutor是Spring的线程池。以下是ThreadPoolTaskExecutor的部分属性。
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
private int maxPoolSize = 2147483647;
private int keepAliveSeconds = 60;
private int queueCapacity = 2147483647;
private boolean allowCoreThreadTimeOut = false;
private TaskDecorator taskDecorator;
private ThreadPoolExecutor threadPoolExecutor;
}
可以看到Spring线程池中是包含了ThreadPoolExecutor 线程池的。由此可见Spring线程池ThreadPoolTaskExecutor其实是对JDK线程池ThreadPoolExecutor的一个封装。
ThreadPoolTaskExecutor 的创建过程如下。
public ThreadPoolTaskExecutor taskExecutor() {
log.info("初始化ThreadPoolTaskExecutor线程池!");
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.setQueueCapacity(queueCapacity);
taskExecutor.setKeepAliveSeconds(keepAliveSeconds);
taskExecutor.setThreadNamePrefix("Executor-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
RejectedExecutionHandler callerRunsRejectedExecutionHandler = new DefinedRejectedExecutionHandler();
taskExecutor.setRejectedExecutionHandler(callerRunsRejectedExecutionHandler);
return taskExecutor;
}
我们看一下setCorePoolSize方法。
public void setCorePoolSize(int corePoolSize) {
synchronized(this.poolSizeMonitor) {
this.corePoolSize = corePoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setCorePoolSize(corePoolSize);
}
}
}
1、该方法用来设置线程池的核心线程数,首先加锁
2、将ThreadPoolTaskExecutor线程池中的核心线程数属性赋值
3、判断如果JDK线程池对象不为空,则给线程池赋值核心线程数
我们在看下ThreadPoolTaskExecutor提交任务的操作。
public <T> Future<T> submit(Callable<T> task) {
ThreadPoolExecutor executor = this.getThreadPoolExecutor();
try {
return executor.submit(task);
} catch (RejectedExecutionException var4) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
}
}
1、获取到JDK的线程池对象
2、提交到JDK线程池中执行
由此可见对ThreadPoolTaskExecutor的赋值操作以及执行任务,其实是对JDK线程池的操作。
但是JDK线程池在ThreadPoolTaskExecutor的属性中开始是一个空值,从ThreadPoolTaskExecutor的构造方法中看到并没有对ThreadPoolExecutor进行初始化。那么是什么时候初始化的ThreadPoolExecutor对象呢?
在代码中可以搜索到以下代码。
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { public void execute(Runnable command) { super.execute(ThreadPoolTaskExecutor.this.taskDecorator.decorate(command)); } }; } else { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
可以看到是在执行initializeExecutor
方法的时候,Spring创建了一个ThreadPoolExecutor线程池并赋值给threadPoolExecutor
属性。但是此方法在本类中并没有调用的地方,一直往上跟发现该方法是在执行以下方法时调用的。
public void afterPropertiesSet() {
this.initialize();
}
而执行该方法是因为Spring线程池实现了接口InitializingBean
,InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。因此Spring的线程池在初始化对象时,也便初始化了JDK的线程池并赋值,从而操作Spring线程池处理多线程任务时,其实本质上还是JDK的线程池在执行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。