赞
踩
在开发一个Springboot的项目中,需要批量的快速的插入数据库,于是想到了多线程分配次的插入数据,由于我们项目使用的springboot项目,查看官网支持线程池
@EnableAsync
查看源码发现自动装配原理
@ConditionalOnClass(ThreadPoolTaskExecutor.class) @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(TaskExecutionProperties.class) public class TaskExecutionAutoConfiguration { /** * Bean name of the application {@link TaskExecutor}. */ public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { TaskExecutionProperties.Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator); builder = builder.taskDecorator(taskDecorator.getIfUnique()); return builder; } @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); } }
TaskExecutionProperties主要做配置文件参数映射类
@ConfigurationProperties("spring.task.execution")
public class TaskExecutionProperties {
private final Pool pool = new Pool();
对应的properties文件
#核心线程数
spring.task.execution.pool.core-size=8
#存活时间
spring.task.execution.pool.keep-alive=60s
#最大线程数
spring.task.execution.pool.max-size=
#阻塞队列大小
spring.task.execution.pool.queue-capacity=
#
spring.task.execution.thread-name-prefix=名称前缀
具体可以查看springboot的参数配置 《Springboot参数配置》
从上面的默认使用的线程池的来看,缺少拒绝策略和其他的一些重要参数,为了满足我们日常的需求我们需要自定义线程吹来满足业务场景
自定义线程池
@Configuration @EnableAsync @Slf4j public class ExecutorConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
配置文件
# 配置核心线程数
async:
executor:
thread:
# 配置核心线程数
core_pool_size: 5
# 配置最大线程数
max_pool_size: 5
# 配置队列大小
queue_capacity: 100
# 配置线程池中的线程的名称前缀
name:
prefix: async-service-
测试验证
AsyncService
@Async @Slf4j @Service public class AsyncService { /** * 指定线程池,当有多个线程池,需要指定name * @throws InterruptedException */ @Async("asyncServiceExecutor") public void executeAsync(int num) throws InterruptedException { log.info("start executeAsync"); Thread.sleep(2000); System.out.println("异步线程执行......+"+num); log.info("end executeAsync"); } }
Controller
@Autowired
AsyncServiceImpl asyncService;
@GetMapping
public String testAsync(@RequestParam("num")int num) throws InterruptedException {
for (int i = 0; i < num; i++) {
asyncService.executeAsync(i);
}
return "ok";
}
通过postman测试结果如下,通过结果可以看到,我们一共请求了3次,每个任务执行的时间为2秒,但是接口会立刻返回数据OK到客户端,实现了多线程异步处理的功能
22:30:08.059 [async-service-3] INFO c.r.r.s.i.AsyncServiceImpl - [executeAsync,21] - start executeAsync
22:30:08.059 [async-service-2] INFO c.r.r.s.i.AsyncServiceImpl - [executeAsync,21] - start executeAsync
22:30:08.059 [async-service-1] INFO c.r.r.s.i.AsyncServiceImpl - [executeAsync,21] - start executeAsync
异步线程执行......+1
异步线程执行......+0
异步线程执行......+2
22:30:10.072 [async-service-3] INFO c.r.r.s.i.AsyncServiceImpl - [executeAsync,24] - end executeAsync
22:30:10.072 [async-service-2] INFO c.r.r.s.i.AsyncServiceImpl - [executeAsync,24] - end executeAsync
22:30:10.072 [async-service-1] INFO c.r.r.s.i.AsyncServiceImpl - [executeAsync,24] - end executeAsync
自定义可视化线程池
虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来
@Slf4j public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if (null == threadPoolExecutor) { return; } log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }
替换默认的线程池
@Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.info("start asyncServiceExecutor----------------"); //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //使用可视化运行状态的线程池 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePoolSize); //配置最大线程数 executor.setMaxPoolSize(maxPoolSize); //配置队列大小 executor.setQueueCapacity(queueCapacity); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix(namePrefix); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); log.info("end asyncServiceExecutor------------"); return executor; }
这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了2个任务,完成了0个,当前有2个线程在处理任务,还剩0个任务在队列中等待,线程池的基本情况一路了然;
async-service-, 2. do submit,taskCount [2], completedTaskCount [0], activeCount [2], queueSize [0]
我们经常会使用线程池的返回值,可以使用Future或者CompletableFuture接收
AsyncService
/**
* 指定线程池,当有多个线程池,需要指定name
* 带有返回值
* @throws InterruptedException
*/
@Async("asyncServiceExecutor")
public CompletableFuture<String> executeValueAsync() throws InterruptedException {
log.info("start executeValueAsync");
Thread.sleep(200);
System.out.println("异步线程执行返回结果......+");
log.info("end executeValueAsync");
return CompletableFuture.completedFuture("你好");
}
Controller
@GetMapping("/result")
public String testVlaueAsync(@RequestParam("num")int num) throws InterruptedException, ExecutionException {
String result="";
List<CompletableFuture> list=new ArrayList<>();
for (int i = 0; i < num; i++) {
CompletableFuture<String> completableFuture = asyncService.executeValueAsync();
list.add(completableFuture);
}
for (CompletableFuture completableFuture : list) {
result=result+completableFuture.get();
}
return result;
}
测试结果返回正常的数据
22:53:42.443 [http-nio-8081-exec-2] INFO c.r.r.c.VisiableThreadPoolTaskExecutor - [showThreadPoolInfo,23] - async-service-, 1. do execute,taskCount [0], completedTaskCount [0], activeCount [0], queueSize [0]
22:53:42.447 [http-nio-8081-exec-2] INFO c.r.r.c.VisiableThreadPoolTaskExecutor - [showThreadPoolInfo,23] - async-service-, 1. do execute,taskCount [1], completedTaskCount [0], activeCount [1], queueSize [0]
22:53:42.448 [http-nio-8081-exec-2] INFO c.r.r.c.VisiableThreadPoolTaskExecutor - [showThreadPoolInfo,23] - async-service-, 1. do execute,taskCount [2], completedTaskCount [0], activeCount [2], queueSize [0]
22:53:42.449 [async-service-1] INFO c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,34] - start executeValueAsync
22:53:42.449 [async-service-3] INFO c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,34] - start executeValueAsync
22:53:42.449 [async-service-2] INFO c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,34] - start executeValueAsync
异步线程执行返回结果......+
异步线程执行返回结果......+
异步线程执行返回结果......+
22:53:42.662 [async-service-3] INFO c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,37] - end executeValueAsync
22:53:42.662 [async-service-2] INFO c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,37] - end executeValueAsync
22:53:42.662 [async-service-1] INFO c.r.r.s.i.AsyncServiceImpl - [executeValueAsync,37] - end executeValueAsync
22:53:42.663 [http-nio-8081-exec-2] INFO c.r.r.c.Test2Controller - [testVlaueAsync,44] - result:你好你好你好
注意
使用异步线程返回类型有2中void和Future,其他情况都会报错
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。