赞
踩
Java JDK中提供了线程池类ThreadPoolExecutor,但在实际开发中多使用SpringBoot来开发,Spring默认也是自带了一个线程池方便我们开发,它就是ThreadPoolTaskExecutor;翻看了好多讲ThreadPoolTaskExecutor的文章,这些文章多从原理和配置来进行介绍,但是实际写代码的时候还要考虑怎么设计使用的问题,这对于老手来说可能没什么,但是对于刚开始使用的新手来说就有可能一头雾水。
下面我就从我实际开发过程中使用的方式来介绍。
ThreadPoolTaskExecutor类
Spring异步线程池的接口类是TaskExecutor,本质还是java.util.concurrent.Executor,没有配置的情况下,默认使用的是simpleAsyncTaskExecutor。但是Spring更加推荐我们开发者使用ThreadPoolTaskExecutor类来创建线程池,其本质是对java.util.concurrent.ThreadPoolExecutor的包装。
实际编程过程中我们可能遇到许多计算密集型或IO密集型的操作,很多时候是要对一个Collection中的数据都进行操作,这个时候我们就想到如果能用线程池操作就好了。然后就有了这篇文章。
结果获取
多线程操作结果的获取使用Future类来获取。Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。ThreadPoolTaskExecutor作为对ThreadPoolExecutor的包装,自然也会提供这三个submit方法。
// 提交Runnable任务
Future<?>
submit(Runnable task);
// 提交Callable任务
<T> Future<T>
submit(Callable<T> task);
// 提交Runnable任务及结果引用
<T> Future<T>
submit(Runnable task, T result);
因为Runable接口的Run方法是没有返回值的,所以其future方法获取的只能用来断言任务是否已经结束。Callable具有返回值,我们可以使用这个接口来获取运行结果。第三个接口稍有些复杂,不符合我们简单应用的理念。至于FutureTask后续再说。
package com.lordum.threadpoolpractice.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @EnableAsync public class TaskPoolConfig { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(20); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("taskExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); return taskExecutor; } }
package com.lordum.threadpoolpractice.tasks;
import java.util.concurrent.Callable;
public abstract class AbstractTask implements Callable<Result> {
}
package com.lordum.threadpoolpractice.tasks; import lombok.AllArgsConstructor; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; import java.util.Date; import java.util.concurrent.TimeUnit; @AllArgsConstructor @NoArgsConstructor @Slf4j public class DemoTask extends AbstractTask{ private String name; @Override public Result call() throws Exception { Result result = new Result(); log.info(Thread.currentThread().getName() + "接收到任务:" + name + ". 当前时间:" + LocalDateTime.now().toString()); result.setData("Call " + name); TimeUnit.SECONDS.sleep(2); return result; } }
结果接收类
package com.lordum.threadpoolpractice.tasks;
import lombok.Data;
@Data
public class Result {
private Integer code = 1;//1:成功 0:失败
private String desc;//描述
private Object data;
}
package com.lordum.threadpoolpractice.service; import com.lordum.threadpoolpractice.tasks.DemoTask; import com.lordum.threadpoolpractice.tasks.Result; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @Slf4j @Service public class DemoService { @Autowired @Qualifier("taskExecutor") private ThreadPoolTaskExecutor taskExecutor; public List<String> useTaskPoolDemo() throws ExecutionException, InterruptedException { List<String> strings = Arrays.asList("Spider man", "Haoke", "Mary", "Lilei", "Han Meimei"); long start = System.currentTimeMillis(); List<String> resultList = new ArrayList<>(); List<Future<Result>> futureList = new ArrayList<>(); for (String string : strings) { DemoTask demoTask = new DemoTask(string); Future<Result> result = this.taskExecutor.submit(demoTask); futureList.add(result); log.info("##" + string +"## 添加到结果: " + string + "结果时间: " + LocalDateTime.now()); } // future.get() 会阻塞调用线程(主线程),如果在上面的循环中获取,整个服务就会变成并行,失去使用线程池的意义 for (Future<Result> resultFuture : futureList) { Result data = resultFuture.get(); log.info("## 获取到future结果: " + data.getData().toString() + "结果时间: " + LocalDateTime.now()); resultList.add(data.getData().toString()); } long end = System.currentTimeMillis(); log.info("程序执行时间:" + String.valueOf(end - start)); return resultList; } }
优点
注意
future.get() 会阻塞调用线程(主线程),如果在调用线程池的循环中获取,整个服务就会变成并行,失去使用线程池的意义
结果展示
这里我们的并行服务每个用时2S,如果没有用线程池需要执行10S多,这里使用了线程池之后值使用了2s11ms,可见使用线程池并行处理确实提高了执行速度。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。