赞
踩
ThreadPoolExecutor是Java原生的线程池类,而ThreadPoolTaskExecutor是Spring推出的线程池工具
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = 2147483647; private int keepAliveSeconds = 60; private int queueCapacity = 2147483647; private boolean allowCoreThreadTimeOut = false; @Nullable private TaskDecorator taskDecorator; @Nullable private ThreadPoolExecutor threadPoolExecutor; private final Map<Runnable, Object> decoratedTaskMap; public ThreadPoolTaskExecutor() { this.decoratedTaskMap = new ConcurrentReferenceHashMap(16, ReferenceType.WEAK); } }
ThreadPoolTaskExecutor的核心参数共有四个,分别是corePoolSize
,maxPoolSize
,keepAliveSeconds
以及queueCapacity
。
corePoolSize
)maximumPoolSize
)keepAliveTime
)workQueue
的容量)从ThreadPoolTaskExecutor的唯一带参构造方法可以看出,似乎并没有对上述四个核心参数做自定义初始化的工作,实际上,ThreadPoolTaskExecutor在底层依然依赖ThreadPoolExecutor本身,也就是说该工具更关注于扩展的内容,执行任务依然交由ThreadPoolExecutor去处理。
从上述代码中可以看到,ThreadPoolTaskExecutor继承了ExecutorConfigurationSupport类,该类不仅体现了Spring自身的设计思想,也是ThreadPoolTaskExecutor底层调用ThreadPoolExecutor的具体实现。
在ExecutorConfigurationSupport中有这么两个关键参数:threadFactory
和rejectedExecutionHandler
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean {
private ThreadFactory threadFactory = this;
private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();
}
threadFactory
)rejectedExecutionHandler
)threadFactory
之所以初始化为this
,是因为ExecutorConfigurationSupport本身就继承了ThreadFactory的实现类之一CustomizableThreadFactory。
自此,ThreadPoolTaskExecutor继承ExecutorConfigurationSupport之后,与ThreadPoolExecutor已经极为接近了,但还差最为关键的一步,如何创建ThreadPoolExecutor来执行任务?
事实上,ThreadPoolTaskExecutor有两种方式来创建ThreadPoolExecutor,一种是手动创建,一种是注解装配。
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean { public void initialize() { if (this.logger.isDebugEnabled()) { this.logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { this.setThreadNamePrefix(this.beanName + "-"); } this.executor = this.initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); } protected abstract ExecutorService initializeExecutor(ThreadFactory var1, RejectedExecutionHandler var2); }
可以看到,ThreadPoolTaskExecutor执行父类的initialize方法时,将threadFactory和rejectedExecutionHander作为传参,而initializeExecutor方法是一个抽象方法,在ThreadPoolTaskExecutor有以下实现:
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { public void execute(Runnable command) { Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command); if (decorated != command) { ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; } protected BlockingQueue<Runnable> createQueue(int queueCapacity) { return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue()); } }
通过initializeExecutor方法,创建一个容量为queueCapacity
的阻塞队列作为工作队列,再以这六个参数(外加一个TimeUnit
恒定为SECONDS
)来构造ThreadPoolExecutor。
对于Spring而言,更简洁的方式无疑是通过注解来实现自动装配,而ExecutorConfigurationSupport恰好实现了InitailizingBean接口,因此也实现了该接口唯一的一个方法:afterPropertiesSet()
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean {
public void afterPropertiesSet() {
this.initialize();
}
}
这意味着,当初始化代码中配置的ThreadPoolTaskExecutor这个bean时,便会调用这个方法,从而实现自动创建ThreadPoolExecutor。
接下来,本文将实现一个简单的线程池配置类,采用的是读取配置文件+注解配置bean注入的方式来创建Spring线程池。
# 线程池配置
thread:
pool:
# 核心线程数
core-pool-size: 50
# 最大线程数
max-pool-size: 200
# 工作队列容量
queue-capacity: 1000
# 线程池维护线程所允许的空闲时间
keep-alive-seconds: 300
# 拒绝策略
rejected-execution-handler: CallerRunsPolicy
package com.ponder.core.config; import com.ponder.common.utils.ThreadUtil; import lombok.Data; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.lang.reflect.InvocationTargetException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * 线程池配置 * * @author Ponder Yao * @version 1.0.0 2021/7/29 13:39 */ @Data @Component @ConfigurationProperties(prefix = "thread.pool") public class ThreadPoolConfig { /** 核心线程数 */ private int corePoolSize; /** 最大线程数 */ private int maxPoolSize; /** 工作队列容量 */ private int queueCapacity; /** 线程池维护线程所允许的空闲时间 */ private int keepAliveSeconds; /** 拒绝策略 */ private String rejectedExecutionHandler; @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(this.corePoolSize); executor.setMaxPoolSize(this.maxPoolSize); executor.setQueueCapacity(this.queueCapacity); executor.setKeepAliveSeconds(this.keepAliveSeconds); try { // 反射加载拒绝策略类 Class clazz = Class.forName("java.util.concurrent.ThreadPoolExecutor$" + this.rejectedExecutionHandler); executor.setRejectedExecutionHandler((RejectedExecutionHandler) clazz.newInstance()); } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) { e.printStackTrace(); // 默认使用CallerRunsPolicy策略:直接在execute方法的调用线程中运行被拒绝的任务 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } return executor; } }
没错,就是这么简单,轻轻松松搞定线程池初始化工作,后面就可以随时从Spring容器取出使用啦
所以,为何Spring要自己写一个ThreadPoolTaskExecutor并推荐代替直接使用ThreadPoolExecutor呢?
其实最主要的原因很直观:ThreadPoolExecutor是一个不受Spring管理生命周期、参数装配的Java类,而有了ThreadPoolTaskExecutor的封装,线程池才有Spring“内味”。
当然,本文只是简要分析了一下两个类的本质区别与关系,很多具体细节还未探讨,而从这次分析中也可以得到以下结论:ThreadPoolTaskExecutor本质依然是ThreadPoolExecutor来实现基本的线程池工作,不同的是前者更关注自己实现的增强扩展部分,让线程池具有更多特性可供使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。