赞
踩
线程池ThreadPoolExecutor和ThreadPoolTaskExecutor的父类不都是Executor吗?那为什么不用Executors去创建线程池呢?
首先来看看Executors大部分方法多数只定义了核心线程数和ThreadFactory, 其他的线程池核心参数都已经在方法内部定义死了。
可以看到默认阻塞队列用的是无界的LinkedBlockingQueue,可能存在一个致命的问题那就是内存溢出问题。当定义的总线程被沾满了后,那么请求就会被列入阻塞队列中。当处理器条件不好或内存设置不大时。大量请求进来导致阻塞队列的容量过大就只能说是使用不当!!!线程池核心参数需要根据实际情况来定义的。
- public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- threadFactory);
- }
而ThreadPoolExecutor(jkd提供)和ThreadPoolTaskExecutor(spring提供)是对基础线程池Executors进行封了装和优化
ThreadPoolExecutor是jkd提供为我们提供的创建线程池实现类。更友好的为开发者提供可扩展性的开发策略
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
队列策略 | 说明 |
---|---|
ArrayBlockingQueue | 基于数组的有界阻塞队列,如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。 |
LinkedBlockingQueue | 基于链表的无界阻塞队列,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,因此使用该工作队列时,参数maxPoolSize没有作用。 |
SynchronousQuene | 一个不缓存任务的有界阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。 |
PriorityBlockingQueue | 具有优先级的无界阻塞队列,优先级通过参数Comparator实现。 |
拒绝策略 | 说明 |
---|---|
CallerRunsPolicy | 线程池shutdown的话直接不管,没有shutdown在调用者线程中直接执行被拒绝任务的run方法。 |
AbortPolicy | 丢任务,抛出RejectedExecutionException(默认) |
DiscardPolicy | 直接丢任务 |
DiscardOldestPolicy | 抛队列头部任务,把这次拒绝的任务加入队列 |
- public static ThreadPoolExecutor threadPoolExecutorConfig() {
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
- 5,
- 10, 3000,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue(100)
- , Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy());
- return threadPoolExecutor;
- }
-
- @Test
- public void startFun() {
- ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorConfig();
- for (int i = 0; i < 10; i++) {
- threadPoolExecutor.execute(() -> System.out.println("当前执行的线程:" + Thread.currentThread().getName()));
- }
- }
ThreadPoolExecutor执行任务有submit
和execute
两种方法,这两种方法区别在于
- //execute
- @Test
- public void executeStartFun() {
- ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorConfig();
- for (int i = 0; i < 10; i++) {
- if (i == 5) {
- threadPoolExecutor.execute(() -> System.out.println("当前执行的线程:" + Thread.currentThread().getName()));
- int sum = 1 / 0;
- }
- threadPoolExecutor.execute(() -> System.out.println("当前执行的线程:" + Thread.currentThread().getName()));
- }
- }
- //submit
- @Test
- public void submitStartFun() {
- ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorConfig();
- List<Future> futures = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- futures.add(threadPoolExecutor.submit(() -> "当前执行的线程:" + Thread.currentThread().getName()));
- }
- futures.stream().forEach(future -> {
- try {
- System.out.println("future的返回值:" + future.get());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- });
- }
ThreadPoolTaskExecutor 是spring提供的线程池实现类,基于ThreadPoolExecutor之上进行封装使得方便与整合spring框架
因为基于ThreadPoolExecutor之上进行封装,上面已经对线程池核心进行了阐述,这里就直接写实践。ThreadPoolExecutor对spring支持所以直接封装成一个Bean.然后要用的时候直接将Bean依赖注入使用就行
- @Configuration
- @EnableAsync// 开启线程池
- public class ThreadPoolConfig {
-
- /**
- * 每秒需要多少个线程处理
- * tasks/(1/taskcost)
- */
- private int corePoolSize = 10;
-
-
- /**
- * 线程池维护线程的最大数量
- * (max(tasks)- queueCapacity)/(1/taskcost)
- */
- private int maxPoolSize = 20;
-
- /**
- * 缓存队列
- * (coreSizePool/taskcost)*responsetime
- */
- private int queueCapacity = 100;
-
- /**
- * 允许的空闲时间
- * 默认为60
- */
- private int keepAlive = 100;
-
-
- /**
- * 配置默认线程池
- */
- @Bean("defaultThreadPool")
- public TaskExecutor defaultThread() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- //设置核心线程数
- executor.setCorePoolSize(corePoolSize);
- //设置最大线程数
- executor.setMaxPoolSize(maxPoolSize);
- //设置队列容量
- executor.setQueueCapacity(queueCapacity);
- //设置允许的空闲时间(秒)
- executor.setKeepAliveSeconds(keepAlive);
- //设置默认的线程名称
- executor.setThreadNamePrefix("defaultThreadPool-Async");
- //设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- // 调度器shutdown被调用时等待当前被调度的任务完成
- executor.setWaitForTasksToCompleteOnShutdown(true);
- // 等待时长
- executor.setAwaitTerminationSeconds(60);
- return executor;
- }
可以结合@Async使用线程池异步执行
- @Component
- public class DefaultThreadPoolHandler{
- @Async("defaultThreadPool") //异步使用配置好的ThreadPoolTaskExecutor线程池
- public void defaultThreadPoolFun() {
- System.out.println("defaultThreadPool执行=======================》");
- }
- }
-
- //后续使用通过@Autowired依赖注入
- @Autowired
- private DefaultThreadPoolHandler defaultThreadPoolHandler;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。