当前位置:   article > 正文

Java多线程工具CompletableFuture介绍_completablefuture 工具类

completablefuture 工具类

1. 简介

CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。

2. 如何创建CompletableFuture

2.1 构造函数创建

最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。

  1. CompletableFuture<String> future = new CompletableFuture();
  2. String result = future.join();
  3. System.out.println(result);

此时,如果在另外一个线程中,主动设置该CompletableFuture的值,则上面线程中的结果就能返回。

future.complete("test");

2.2 supplyAsync创建 (有返回值)

CompletableFuture.supplyAsync()也可以用来创建CompletableFuture实例。通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。

supplyAsync有两种签名:

  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  2. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

第一种只需传入一个Supplier实例(一般使用lamda表达式),此时框架会默认使用ForkJoin线程池来执行被提交的任务。
第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。

CompletableFuture 在创建时,如果传入线程池,那么会去指定的线程池工作。如果没传入,那么回去默认的 ForkJoinPool。
 

ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个小任务,把多个小任务放到多个处理器核心上并行执行;当多个小任务执行完成之后,再将这些执行结果合并起来即可。

ForkJoinPoolExecutorService的实现类,因此是一种特殊的线程池。
使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPoolsubmit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。
其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务

下面为使用supplyAsync创建CompletableFuture的示例:

  1. CompletableFuture<String> future
  2. = CompletableFuture.supplyAsync(()->{
  3. System.out.println("compute test");
  4. return "test";
  5. });
  6. String result = future.join();
  7. System.out.println("get result: " + result);
  8. // 最终的打印信息为get result: test

2.3 runAsync创建 (无返回值)

CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。同supplyAsync()类似,runAsync()也有两种签名:

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

下面为使用runAsync()的例子:

  1. CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
  2. System.out.println("compute test");
  3. });
  4. System.out.println("get result: " + future.join());
  5. // 由于任务没有返回值, 所以最后的打印结果是"get result: null"。

3. 异步回调方法

Future相比,CompletableFuture最大的不同是支持流式(Stream)的计算处理,多个任务之间,可以前后相连,从而形成一个计算流。比如:任务1产生的结果,可以直接作为任务2的入参,参与任务2的计算,以此类推。

CompletableFuture中常用的流式连接函数包括:

  • thenApply——有入参有返回
    thenApplyAsync
  • thenAccept——有入参无返回
    thenAcceptAsync
  • thenRun——无入参无返回
    thenRunAsync
  • thenCombine——组合两个CompletableFuture,有入参有返回
    thenCombineAsync
  • thenCompose——与thenApply类型,展开嵌套
    thenComposeAsync
  • whenComplete——任务完成时的回调通知,无返回值
    whenCompleteAsync
  • handle——与whenComplete的作用有些类似,有返回值
    handleAsync

其中,带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行。除此之外,两者没有其他区别。因此,为了快速理解,在接下来的介绍中,我们主要介绍不带Async的版本。

3.1 thenApply / thenAccept / thenRun互相依赖

这里将thenApply / thenAccept / thenRun放在一起讲,因为这几个连接函数之间的唯一区别是提交的任务类型不一样 :

  • thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果
  • thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果
  • thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值

通过thenApply / thenAccept / thenRun连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。

  1. CompletableFuture<Integer> future1
  2. = CompletableFuture.supplyAsync(()->{
  3. System.out.println("compute 1");
  4. return 1;
  5. });
  6. CompletableFuture<Integer> future2
  7. = future1.thenApply((p)->{
  8. System.out.println("compute 2");
  9. return p+10;
  10. });
  11. System.out.println("result: " + future2.join());
  12. // 该示例的最终打印结果为11

3.1.1 thenApply

  1. @Test
  2. public void test5() throws Exception {
  3. ForkJoinPool pool=new ForkJoinPool();
  4. // 创建异步执行任务:
  5. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
  6. System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
  7. try {
  8. Thread.sleep(2000);
  9. } catch (InterruptedException e) {
  10. }
  11. System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
  12. return 1.2;
  13. },pool);
  14. // cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
  15. // thenApply这里实际创建了一个新的CompletableFuture实例
  16. CompletableFuture<String> cf2=cf.thenApply((result)->{
  17. System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
  18. try {
  19. Thread.sleep(2000);
  20. } catch (InterruptedException e
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/824223
推荐阅读
相关标签
  

闽ICP备14008679号