赞
踩
由于默认情况下多线程执行任务都是异步的,且线程之间任务执行没有先后顺序,都是哪个线程先抢到任务,哪个线程先执行,所以当一个业务请求需要多线程完成(假如需要 A、B、C 三个线程)且线程AB之间存在前后的依赖关系,而C线程和AB线程无关(例如:A线程启动依赖于B线程的执行返回结果,C线程自个玩) ,所以A和B线程就需要线程编排。来使AB线程执行由原先的并行变为串行;多线程异步编排可以根据线程之间的依赖关系来规划线程之间的执行先后循序。
相信使用过Vue的人都知道Promise对象,Promise就可以将异步操作以同步操作的流程表达出来,其结构如下:
- function f1(){
- .....
- return new Promise ((resolve,reject)=>{
- //执行方法
- //eg.
- console.log('测试')
- .then(response=>{
- resolve(response)
- //数据的其他处理等操作
- })
- .catch(error=>{
- reject(error);
- })
- })
- }
CompletableFuture 的作用类似 Promise,在写法上也是相近的
- ExecutorService service = Executors.newFixedThreadPool(10);
- ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 10000, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000));
- //异步起线程执行业务 有返回值
- CompletableFuture<Integer> completableFuture = CompletableFuture
- .supplyAsync(() -> {
- System.out.println("开始执行任务----");
- return 1 + 2;
- }, executor)
- .whenComplete((response, exception) -> {
- // 可以接收到返回值和异常类型,但是无法处理异常
- System.out.println("上面任务执行完的响应对象:" + response);
- })
- .exceptionally((throwable) -> {
- // 处理异常,返回一个自定义的值,和上边返回值无关。
- System.out.println("此处专门处理异常");
- return 10;
- })
- .handle((response, throwable) -> {
- // 无论线程是否正确执行,都会执行这里,可以对返回值进行操作。
- return 10;
- });
CompletableFuture异步编排的方式有四种,分别是 runAsync、supplyAsync方法,这两个方法分别有两种入参,常用的是使用自定义线程池的那两种。示例如下:
- ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 10000, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000));
- // runAsync 方法无返回值,所以 CompletableFuture<Void> 中的泛型为 Void
- CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
- // 这里执行一个异步任务,这个lambda表达式是一个Runnable的匿名实现
- System.out.println("[runAsync] 这是一个Runnable异步任务,该任务交由executor线程池执行");
- }, executor);
-
- // supplyAsync 方法有返回值,返回值类型将决定泛型,下例中lambda表达式返回String类型,所以 CompletableFuture<String> 中泛型为String
- CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
- // // 这里执行一个异步任务,这个lambda表达式有返回值,应该对应Callable的匿名实现
- System.out.println("[supplyAsync] 这是一个生产型异步任务,猜测对应Callable+FutureTask组合模式,该任务交由executor线程池执行");
- return "hello world";
- }, executor);
runAsync 演示效果如下:
- private final ExecutorService executor = Executors.newFixedThreadPool(10);
-
- @Test
- public void runAsyncTest() {
- // 当这个方法被调用时,可定就需要一个线程来执行,假如这个线程的名字叫做 runAsyncTest
- System.out.println("【runAsyncTest】线程开始执行runAsyncTest方法");
-
- // 当 runAsync 线程执行到下面时,发现需要开启一个异步线程来处理(假设这个线程叫做 runAsync),任务不归自己处理,自己接着往下执行
- CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
- // 这里执行一个异步任务,这个lambda表达式是一个Runnable的匿名实现
- System.out.println("【runAsync】开启一个异步线程开始处理Runnable异步任务");
- System.out.println("【runAsync】这是一个Runnable异步任务,处理任务中---,该任务交由executor线程池执行");
- System.out.println("【runAsync】关闭一个异步线程完成处理Runnable异步任务");
- }, executor);
-
- System.out.println("【runAsyncTest】线程结束执行runAsyncTest方法");
- }
执行结果如下:
supplyAsync 演示效果如下:
- private final ExecutorService executor = Executors.newFixedThreadPool(10);
-
- @Test
- public void supplyAsyncTest() throws Exception {
- // 当这个方法被调用时,可定就需要一个线程来执行,假如这个线程的名字叫做 supplyAsyncTest
- System.out.println("【supplyAsyncTest】线程开始执行supplyAsyncTest方法");
-
- // supplyAsync 方法有返回值,返回值类型将决定泛型,下例中lambda表达式返回String类型,所以 CompletableFuture<String> 中泛型为String
- CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
- // 这里执行一个异步任务,这个lambda表达式有返回值,应该对应Callable的匿名实现
- System.out.println("【supplyAsync】开启一个异步线程开始处理Callable异步任务");
- System.out.println("【supplyAsync】这是一个生产型异步任务,猜测对应Callable+FutureTask组合模式,该任务交由executor线程池执行");
- System.out.println("【supplyAsync】关闭一个异步线程完成处理Callable异步任务");
- return "hello world";
- }, executor);
-
- System.out.println("【supplyAsyncTest】线程结束执行supplyAsyncTest方法,supplyAsync的只为:" + supplyAsync.get());
- }
执行结果如下:
下面是一个点击商品后触发的查询商品详情的逻辑,可以看到其中构造了一个SkuItemVO视图层对象来封装商品的各种信息,但我们可以发现这个过程是一个单线程的同步操作,将耗费最长的处理时间,那么我们该怎么通过 CompletableFuture 的异步编排优化呢?
改造前:
- public SkuItemVO getSkuItem(Long skuId) {
- SkuItemVO skuItemVO = new SkuItemVO();
- // 1、获取SKU基本信息,查 pms_sku_info
- SkuInfo skuInfo = this.baseMapper.selectById(skuId);
- skuItemVO.setSkuInfo(skuInfo);
-
- // 2、获取SKU图片信息,查 pms_sku_images
- SkuImages skuImages = skuImagesService.getById(skuId);
- LambdaQueryWrapper<SkuImages> queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(SkuImages::getSkuId,skuId);
- List<SkuImages> skuImagesList = skuImagesService.list(queryWrapper);
- skuItemVO.setImages(skuImagesList);
-
- // 3、获取SPU的销售属性组合
- List<SkuItemSaleAttrVO> saleAttrVOS = skuSaleAttrValueService.getSalesAttrValueBySpuId(skuInfo.getSpuId());
- skuItemVO.setSaleAttrList(saleAttrVOS);
-
- // 4、获取SPU的介绍
- SpuInfo spuInfo = spuInfoService.getById(skuInfo.getSpuId());
- SpuInfoDesc spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId());
- skuItemVO.setSpuInfoDesc(spuInfoDesc);
-
- // 5、获取SPU的规格参数信息
- List<SpuItemAttrGroupVO> spuItemAttrGroupVOS = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId);
- skuItemVO.setGroupAttrVOS(spuItemAttrGroupVOS);
-
- return skuItemVO;
- }
1、创建基于配置文件的可配置化动态线程池
编写线程池参数配置
- thread:
- pool:
- corePoolSize: 20
- maximumPoolSize: 100
- keepAliveTime: 10
- queueCapacity: 100000
编写配置映射文件
- @Getter
- @ConfigurationProperties(prefix = "thread.pool")
- public class ThreadPoolProperties {
-
- /**
- * 核心线程数
- */
- private Integer corePoolSize;
-
- /**
- * 最大线程数
- */
- private Integer maximumPoolSize;
-
- /**
- * 除核心数以外的线程空闲存活时间
- */
- private Integer keepAliveTime;
-
- /**
- * 阻塞队列初始化容量
- */
- private Integer queueCapacity;
- }
-
- // @ConfigurationProperties 使用这个必须要引入 spring-boot-configuration-processor 依赖,然后重启一下服务就有配置智能提示
激活线程池配置映射文件同时向IOC容器注入线程池
- @Configuration
- @EnableConfigurationProperties(value = {ThreadPoolProperties.class})
- public class ThreadPoolConfig {
-
- @Bean
- public ThreadPoolExecutor threadPoolExecutor(ThreadPoolProperties properties) {
- return new ThreadPoolExecutor(properties.getCorePoolSize(), properties.getMaximumPoolSize(), properties.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(properties.getQueueCapacity()));
- }
- }
-
- // 为了让线程池的配置数据能够从配置文件中批量导入,我们可以创建一个映射类并使用配置文件处理器完成
- // 例如:创建线程池配置文件 ThreadPoolProperties
异步编排优化改造
- @Autowired
- private ThreadPoolExecutor executor;
-
- public SkuItemVO getSkuItem(Long skuId) {
- SkuItemVO skuItemVO = new SkuItemVO();
- // 1、获取SKU基本信息,查 pms_sku_info
- CompletableFuture<SkuInfo> skuInfoFuture = CompletableFuture.supplyAsync(() -> {
- SkuInfo skuInfo = this.baseMapper.selectById(skuId);
- skuItemVO.setSkuInfo(skuInfo);
- return skuInfo;
- }, executor);
-
- // 2、获取SKU图片信息,查 pms_sku_images
- CompletableFuture.runAsync(() -> {
- LambdaQueryWrapper<SkuImages> queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(SkuImages::getSkuId, skuId);
- List<SkuImages> skuImagesList = skuImagesService.list(queryWrapper);
- skuItemVO.setImages(skuImagesList);
- },executor);
-
- // 3、获取SPU的销售属性组合
- skuInfoFuture.thenAcceptAsync((response) -> {
- List<SkuItemSaleAttrVO> saleAttrVOS = skuSaleAttrValueService.getSalesAttrValueBySpuId(response.getSpuId());
- skuItemVO.setSaleAttrList(saleAttrVOS);
- },executor);
-
- // 4、获取SPU的介绍
- skuInfoFuture.thenAcceptAsync((response) -> {
- SpuInfoDesc spuInfoDesc = spuInfoDescService.getById(response.getSpuId());
- skuItemVO.setSpuInfoDesc(spuInfoDesc);
- },executor);
-
- // 5、获取SPU的规格参数信息
- skuInfoFuture.thenAcceptAsync((response) -> {
- List<SpuItemAttrGroupVO> spuItemAttrGroupVOS = attrGroupService.getAttrGroupWithAttrsBySpuId(response.getSpuId);
- skuItemVO.setGroupAttrVOS(spuItemAttrGroupVOS);
- },executor);
-
- // CompletableFuture.allOf 将所有的异步编排任务组合起来,感知整个异步任务执行过程,一旦所有异步任务都结束,该CompletableFuture等待结束,如果超过10秒还没执行完直接抛出异常
- CompletableFuture.allOf(skuInfoFuture,skuImagesFuture,saleAttrFuture,spuInfoDescFuture,spuItemAttrGroupFuture).get(10, TimeUnit.SECONDS);
- return skuItemVO;
- }
- // response 就是 skuInfoFuture 中封装的 SkuInfo 返回实例
- // 使用 CompletableFuture.allOf 开启阻塞等待的原因是等待 skuItemVO 中的值能够填充完成,如果不开启等待,skuItemVO 中的值可能来不及填充整个方法就完成了
从异步编排后的结果来看,1、2 操作是并行互不影响;3、4、5 三者需要等待 1 处理完成,使用他的返回值,所以,1 和(3、4、5)是串行,而 3、4、5 三者之间是并行
值得注意的是:在异步编排完后需要使用 CompletableFuture.allOf 开启阻塞等待的原因是等待 skuItemVO 中的值能够填充完成,如果不开启等待,skuItemVO 中的值可能来不及填充整个方法就完成了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。