当前位置:   article > 正文

SpringBoot 异步线程,确定不试试?_taskexecutorbuilder

taskexecutorbuilder

背景

在开发一个Springboot的项目中,需要批量的快速的插入数据库,于是想到了多线程分配次的插入数据,由于我们项目使用的springboot项目,查看官网支持线程池

开启异步线程

@EnableAsync

默认线程池

ThreadPoolTaskExecutor

查看源码发现自动装配原理

@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(TaskExecutionProperties.class)
public class TaskExecutionAutoConfiguration {

	/**
	 * Bean name of the application {@link TaskExecutor}.
	 */
	public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";

	@Bean
	@ConditionalOnMissingBean
	public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties,
			ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
			ObjectProvider<TaskDecorator> taskDecorator) {
		TaskExecutionProperties.Pool pool = properties.getPool();
		TaskExecutorBuilder builder = new TaskExecutorBuilder();
		builder = builder.queueCapacity(pool.getQueueCapacity());
		builder = builder.corePoolSize(pool.getCoreSize());
		builder = builder.maxPoolSize(pool.getMaxSize());
		builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
		builder = builder.keepAlive(pool.getKeepAlive());
		Shutdown shutdown = properties.getShutdown();
		builder = builder.awaitTermination(shutdown.isAwaitTermination());
		builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
		builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
		builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
		builder = builder.taskDecorator(taskDecorator.getIfUnique());
		return builder;
	}

	@Lazy
	@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
			AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
	@ConditionalOnMissingBean(Executor.class)
	public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
		return builder.build();
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

TaskExecutionProperties主要做配置文件参数映射类

@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {

	private final Pool pool = new Pool();
  • 1
  • 2
  • 3
  • 4

对应的properties文件

#核心线程数
spring.task.execution.pool.core-size=8
#存活时间
spring.task.execution.pool.keep-alive=60s
#最大线程数
spring.task.execution.pool.max-size=
#阻塞队列大小
spring.task.execution.pool.queue-capacity=
#
spring.task.execution.thread-name-prefix=名称前缀
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

具体可以查看springboot的参数配置 《Springboot参数配置

自定义线程池

从上面的默认使用的线程池的来看,缺少拒绝策略和其他的一些重要参数,为了满足我们日常的需求我们需要自定义线程吹来满足业务场景

自定义线程池

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {



    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor----------------");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

配置文件

# 配置核心线程数
async:
  executor:
    thread:
      # 配置核心线程数
      core_pool_size: 5
      # 配置最大线程数
      max_pool_size: 5
      # 配置队列大小
      queue_capacity: 100
      # 配置线程池中的线程的名称前缀
      name:
       prefix: async-service-
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

测试验证

AsyncService

@Async
@Slf4j
@Service
public class AsyncService {


    /**
     * 指定线程池,当有多个线程池,需要指定name
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor")
    public void executeAsync(int num) throws InterruptedException {
        log.info("start executeAsync");
        Thread.sleep(2000);
        System.out.println("异步线程执行......+"+num);
        log.info("end executeAsync");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

Controller

   @Autowired
    AsyncServiceImpl asyncService;

    @GetMapping
    public String testAsync(@RequestParam("num")int num) throws InterruptedException {
        for (int i = 0; i < num; i++) {
            asyncService.executeAsync(i);
        }
        return "ok";
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

通过postman测试结果如下,通过结果可以看到,我们一共请求了3次,每个任务执行的时间为2秒,但是接口会立刻返回数据OK到客户端,实现了多线程异步处理的功能

22:30:08.059 [async-service-3] INFO  c.r.r.s.i.AsyncServiceImpl - [executeAsync,21] - start executeAsync
22:30:08.059 [async-service-2] INFO  c.r.r.s.i.AsyncServiceImpl - [executeAsync,21] - start executeAsync
22:30:08.059 [async-service-1] INFO  c.r.r.s.i.AsyncServiceImpl - [executeAsync,21] - start executeAsync
异步线程执行......+1
异步线程执行......+0
异步线程执行......+2
22:30:10.072 [async-service-3] INFO  c.r.r.s.i.AsyncServiceImpl - [executeAsync,24] - end executeAsync
22:30:10.072 [async-service-2] INFO  c.r.r.s.i.AsyncServiceImpl - [executeAsync,24] - end executeAsync
22:30:10.072 [async-service-1] INFO  c.r.r.s.i.AsyncServiceImpl - [executeAsync,24] - end executeAsync
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

自定义可视化线程池

虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来

@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {


    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if (null == threadPoolExecutor) {
            return;
        }
        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

替换默认的线程池

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor----------------");
        //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //使用可视化运行状态的线程池
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        log.info("end asyncServiceExecutor------------");
        return executor;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了2个任务,完成了0个,当前有2个线程在处理任务,还剩0个任务在队列中等待,线程池的基本情况一路了然;

async-service-, 2. do submit,taskCount [2], completedTaskCount [0], activeCount [2], queueSize [0]
  • 1

返回值

我们经常会使用线程池的返回值,可以使用Future或者CompletableFuture接收

AsyncService

   /**
     * 指定线程池,当有多个线程池,需要指定name
     * 带有返回值
     * @throws InterruptedException
     */
    @Async("asyncServiceExecutor")
    public CompletableFuture<String> executeValueAsync() throws InterruptedException {
        log.info("start executeValueAsync");
        Thread.sleep(200);
        System.out.println("异步线程执行返回结果......+");
        log.info("end executeValueAsync");
        return CompletableFuture.completedFuture("你好");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Controller

    @GetMapping("/result")
    public String testVlaueAsync(@RequestParam("num")int num) throws InterruptedException, ExecutionException {
        String result="";
        List<CompletableFuture> list=new ArrayList<>();
        for (int i = 0; i < num; i++) {
            CompletableFuture<String> completableFuture = asyncService.executeValueAsync();
            list.add(completableFuture);
        }
        for (CompletableFuture completableFuture : list) {
            result=result+completableFuture.get();
        }
        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

测试结果返回正常的数据

22:53:42.443 [http-nio-8081-exec-2] INFO  c.r.r.c.VisiableThreadPoolTaskExecutor - [showThreadPoolInfo,23] - async-service-, 1. do execute,taskCount [0], completedTaskCount [0], activeCount [0], queueSize [0]
22:53:42.447 [http-nio-8081-exec-2] INFO  c.r.r.c.VisiableThreadPoolTaskExecutor - [showThreadPoolInfo,23] - async-service-, 1. do execute,taskCount [1], completedTaskCount [0], activeCount [1], queueSize [0]
22:53:42.448 [http-nio-8081-exec-2] INFO  c.r.r.c.VisiableThreadPoolTaskExecutor - [showThreadPoolInfo,23] - async-service-, 1. do execute,taskCount [2], completedTaskCount [0], activeCount [2], queueSize [0]
22:53:42.449 [async-service-1] INFO  c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,34] - start executeValueAsync
22:53:42.449 [async-service-3] INFO  c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,34] - start executeValueAsync
22:53:42.449 [async-service-2] INFO  c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,34] - start executeValueAsync
异步线程执行返回结果......+
异步线程执行返回结果......+
异步线程执行返回结果......+
22:53:42.662 [async-service-3] INFO  c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,37] - end executeValueAsync
22:53:42.662 [async-service-2] INFO  c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,37] - end executeValueAsync
22:53:42.662 [async-service-1] INFO  c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,37] - end executeValueAsync
22:53:42.663 [http-nio-8081-exec-2] INFO  c.r.r.c.Test2Controller - [testVlaueAsync,44] - result:你好你好你好
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

注意

使用异步线程返回类型有2中void和Future,其他情况都会报错
关注走一波

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号