当前位置:   article > 正文

Spring自带的线程池ThreadPoolTaskExecutor_org.springframework.scheduling.concurrent.threadpo

org.springframework.scheduling.concurrent.threadpooltaskexecutor

ThreadPoolTaskExecutor类,其本质是对java.util.concurrent.ThreadPoolExecutor的包装。这个类则是spring包下的,是Spring为我们开发者提供的线程池类
Spring提供了xml给我们配置ThreadPoolTaskExecutor线程池,但是现在普遍都在用SpringBoot开发项目,所以直接上yaml或者properties配置即可,或者也可以使用@Configuration配置也行,下面演示配置和使用。

配置

application.properties

# 核心线程池数
spring.task.execution.pool.core-size=5
# 最大线程池数
spring.task.execution.pool.max-size=10
# 任务队列的容量
spring.task.execution.pool.queue-capacity=5
# 非核心线程的存活时间
spring.task.execution.pool.keep-alive=60
# 线程池的前缀名称
spring.task.execution.thread-name-prefix=线程前缀名
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

application.yaml

spring:
  task:
    execution:
      pool:
        #核心线程数
        core-size: 5
        #最大线程数
        max-size: 20
        #任务队列容量
        queue-capacity: 10
        #非核心线程存活时间
        keep-alive: 60
      #线程池前缀名称
      thread-name-prefix: 线程前缀名
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

config类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/4/26 9:38
 * @Description:
 */
@Configuration
public class AsyncScheduledTaskConfig {

    @Value("${spring.task.execution.pool.core-size}")
    private int corePoolSize;
    @Value("${spring.task.execution.pool.max-size}")
    private int maxPoolSize;
    @Value("${spring.task.execution.pool.queue-capacity}")
    private int queueCapacity;
    @Value("${spring.task.execution.thread-name-prefix}")
    private String namePrefix;
    @Value("${spring.task.execution.pool.keep-alive}")
    private int keepAliveSeconds;
    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutorInit() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //核心线程数
        executor.setCorePoolSize(corePoolSize);
        //任务队列的大小
        executor.setQueueCapacity(queueCapacity);
        //线程前缀名
        executor.setThreadNamePrefix(namePrefix);
        //线程存活时间
        executor.setKeepAliveSeconds(keepAliveSeconds);

        /**
         * 拒绝处理策略
         * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
         * AbortPolicy():直接抛出异常。
         * DiscardPolicy():直接丢弃。
         * DiscardOldestPolicy():丢弃队列中最老的任务。
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //线程初始化
        executor.initialize();
        return executor;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

使用

测试使用类如下,需在启动类加上@EnableAsync和@EnableScheduling两个注解

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author: tanghaizhi
 * @CreateTime: 2022/4/26 9:42
 * @Description:
 */
@Component
public class ScheduleTask {

    @Qualifier("threadPoolTaskExecutorInit")
    @Autowired
    ThreadPoolTaskExecutor executor;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Async("threadPoolTaskExecutorInit")
    @Scheduled(fixedRate = 2000)
    public void test1() {
        try {
            Thread.sleep(6000);
            System.out.println("线程池名称:" + Thread.currentThread().getName() + "-" + sdf.format(new Date()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Scheduled(cron = "*/1 * * * * ?")
    public void test2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> childThread = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+" start,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" exit,time->"+System.currentTimeMillis());
            return Thread.currentThread().getName() + "任务结束";
        },executor);
        System.out.println("child thread result->"+childThread.get());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

运行结果如下
在这里插入图片描述
可以看到
在使用@Async(“threadPoolTaskExecutorInit”)指定线程池后,Quartz的test1定时任务执行所使用线程已变为ThreadPoolTaskExecutor中获取到的线程。
定时任务test2中为CompletableFuture任务指定了使用ThreadPoolTaskExecutor作为线程池,使用的线程也是在我们配置的ThreadPoolTaskExecutor中获取的。

拒绝策略

使用线程池时获取流程的流程

  1. 查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第二步。
  2. 查看任务队列是否已满,不满就将任务存储在任务队列中,否则执行第三步。
  3. 查看线程池是否已满,即就是是否达到最大线程池数,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。

简单来说,就是核心线程有空闲线程就使用核心线程,否则就放到任务队列中排队。如果任务队列也满了,就创建一条新的线程执行,如果创建的线程达到了配置的最大线程数,则会弃用拒绝策略

rejectedExectutionHandler参数字段用于配置绝策略,常用拒绝策略如下

  • AbortPolicy:用于被拒绝任务的处理程序,它将抛出RejectedExecutionException
  • CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在execute方法的调用线程中 运行被拒绝的任务。
  • DiscardOldestPolicy:用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试execute。
  • DiscardPolicy:用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。

文章参考

https://zhuanlan.zhihu.com/p/346086161

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/104704
推荐阅读
相关标签
  

闽ICP备14008679号