当前位置:   article > 正文

并发编程之ThreadPoolTaskExecutor与ThreadPoolExecutor的区别_threadpooltaskexecutor.getthreadpoolexcutor

threadpooltaskexecutor.getthreadpoolexcutor

1. ThreadPoolExecutor

ThreadPoolExecutor 是JDK自1.5添加的线程池。以下是初始化线程池的构造方法。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

int corePoolSize:核心线程数
int maximumPoolSize:最大线程数
long keepAliveTime:空闲线程的存活时间
TimeUnit unit:时间单位
BlockingQueue workQueue:工作队列,待执行的任务会放到此队列中
ThreadFactory threadFactory:线程工厂,用来创建线程池中的线程
RejectedExecutionHandler handler:拒绝策略,当线程池关闭或者任务队列已满并且达到最大线程数时,需要执行的拒绝任务的策略

线程池中执行一个任务的代码如下。

public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	
	int c = ctl.get();
	if (workerCountOf(c) < corePoolSize) {
		if (addWorker(command, true))
			return;
		c = ctl.get();
	}
	if (isRunning(c) && workQueue.offer(command)) {
		int recheck = ctl.get();
		if (! isRunning(recheck) && remove(command))
			reject(command);
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	}
	else if (!addWorker(command, false))
		reject(command);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

从代码中可以看到线程池执行一个线程的流程。

1、校验任务是否为空,如果任务为空则报空指针异常

2、获取线程池中的线程数,如果运行的线程少于corePoolSize数量,会尝试以给定命令作为第一个线程启动新线程(核心线程)的任务。

3、如果任务可以成功加入队列,那么我们仍然需要再次检查是否应该添加线程(因为自上次检查以来,已有的线程已死亡)或自进入此方法后线程池已关闭。所以我们重新检查状态,如果线程池停止则回滚队列,如果没有线程则启动新线程(非核心线程)。如果都不满足,则什么都不做,相当于只是将任务放到了队列中。

4、如果无法对任务排队,则尝试添加新任务线。如果失败了,我们知道我们已经关闭线程池或队列饱和了,所以拒绝这个任务。

2. ThreadPoolTaskExecutor

ThreadPoolTaskExecutor是Spring的线程池。以下是ThreadPoolTaskExecutor的部分属性。

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
    private final Object poolSizeMonitor = new Object();
    private int corePoolSize = 1;
    private int maxPoolSize = 2147483647;
    private int keepAliveSeconds = 60;
    private int queueCapacity = 2147483647;
    private boolean allowCoreThreadTimeOut = false;
    private TaskDecorator taskDecorator;
    private ThreadPoolExecutor threadPoolExecutor;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

可以看到Spring线程池中是包含了ThreadPoolExecutor 线程池的。由此可见Spring线程池ThreadPoolTaskExecutor其实是对JDK线程池ThreadPoolExecutor的一个封装。

ThreadPoolTaskExecutor 的创建过程如下。

public ThreadPoolTaskExecutor taskExecutor() {
	log.info("初始化ThreadPoolTaskExecutor线程池!");
	ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
	taskExecutor.setCorePoolSize(corePoolSize);
	taskExecutor.setMaxPoolSize(maxPoolSize);
	taskExecutor.setQueueCapacity(queueCapacity);
	taskExecutor.setKeepAliveSeconds(keepAliveSeconds);
	taskExecutor.setThreadNamePrefix("Executor-");
	taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
	taskExecutor.setAwaitTerminationSeconds(60);

	RejectedExecutionHandler callerRunsRejectedExecutionHandler = new DefinedRejectedExecutionHandler();
	taskExecutor.setRejectedExecutionHandler(callerRunsRejectedExecutionHandler);
	return taskExecutor;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

我们看一下setCorePoolSize方法。

public void setCorePoolSize(int corePoolSize) {
	synchronized(this.poolSizeMonitor) {
		this.corePoolSize = corePoolSize;
		if (this.threadPoolExecutor != null) {
			this.threadPoolExecutor.setCorePoolSize(corePoolSize);
		}

	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

1、该方法用来设置线程池的核心线程数,首先加锁

2、将ThreadPoolTaskExecutor线程池中的核心线程数属性赋值

3、判断如果JDK线程池对象不为空,则给线程池赋值核心线程数

我们在看下ThreadPoolTaskExecutor提交任务的操作。

public <T> Future<T> submit(Callable<T> task) {
	ThreadPoolExecutor executor = this.getThreadPoolExecutor();

	try {
		return executor.submit(task);
	} catch (RejectedExecutionException var4) {
		throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, var4);
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

1、获取到JDK的线程池对象

2、提交到JDK线程池中执行

由此可见对ThreadPoolTaskExecutor的赋值操作以及执行任务,其实是对JDK线程池的操作。

但是JDK线程池在ThreadPoolTaskExecutor的属性中开始是一个空值,从ThreadPoolTaskExecutor的构造方法中看到并没有对ThreadPoolExecutor进行初始化。那么是什么时候初始化的ThreadPoolExecutor对象呢?

在代码中可以搜索到以下代码。

protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
	BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
	ThreadPoolExecutor executor;
	if (this.taskDecorator != null) {
		executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
			public void execute(Runnable command) {
				super.execute(ThreadPoolTaskExecutor.this.taskDecorator.decorate(command));
			}
		};
	} else {
		executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
	}

	if (this.allowCoreThreadTimeOut) {
		executor.allowCoreThreadTimeOut(true);
	}

	this.threadPoolExecutor = executor;
	return executor;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

可以看到是在执行initializeExecutor方法的时候,Spring创建了一个ThreadPoolExecutor线程池并赋值给threadPoolExecutor属性。但是此方法在本类中并没有调用的地方,一直往上跟发现该方法是在执行以下方法时调用的。

public void afterPropertiesSet() {
	this.initialize();
}
  • 1
  • 2
  • 3

而执行该方法是因为Spring线程池实现了接口InitializingBean,InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。因此Spring的线程池在初始化对象时,也便初始化了JDK的线程池并赋值,从而操作Spring线程池处理多线程任务时,其实本质上还是JDK的线程池在执行。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/104698
推荐阅读
  

闽ICP备14008679号