赞
踩
CompletableFuture是java.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。
最简单的方式就是通过构造函数创建一个CompletableFuture
实例。如下代码所示。由于新创建的CompletableFuture
还没有任何计算结果,这时调用join
,当前线程会一直阻塞在这里。
- CompletableFuture<String> future = new CompletableFuture();
- String result = future.join();
- System.out.println(result);
此时,如果在另外一个线程中,主动设置该CompletableFuture
的值,则上面线程中的结果就能返回。
future.complete("test");
CompletableFuture.supplyAsync()
也可以用来创建CompletableFuture
实例。通过该函数创建的CompletableFuture
实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join
获取最终计算结果。
supplyAsync
有两种签名:
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
第一种只需传入一个Supplier
实例(一般使用lamda
表达式),此时框架会默认使用ForkJoin
线程池来执行被提交的任务。
第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。
CompletableFuture
在创建时,如果传入线程池,那么会去指定的线程池工作。如果没传入,那么回去默认的ForkJoinPool。
ForkJoinPool
的优势在于,可以充分利用多cpu
,多核cpu
的优势,把一个任务拆分成多个小任务
,把多个小任务
放到多个处理器核心上并行执行;当多个小任务
执行完成之后,再将这些执行结果合并起来即可。
ForkJoinPool
是ExecutorService
的实现类,因此是一种特殊的线程池。
使用方法:创建了ForkJoinPool
实例之后,就可以调用ForkJoinPool
的submit(ForkJoinTask<T> task)
或invoke(ForkJoinTask<T> task)
方法来执行指定任务了。
其中ForkJoinTask
代表一个可以并行、合并的任务。ForkJoinTask
是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask
。其中RecusiveTask
代表有返回值的任务,而RecusiveAction
代表没有返回值的任务
下面为使用supplyAsync
创建CompletableFuture
的示例:
- CompletableFuture<String> future
- = CompletableFuture.supplyAsync(()->{
- System.out.println("compute test");
- return "test";
- });
-
- String result = future.join();
- System.out.println("get result: " + result);
-
- // 最终的打印信息为get result: test
CompletableFuture.runAsync()
也可以用来创建CompletableFuture
实例。与supplyAsync()
不同的是,runAsync()
传入的任务要求是Runnable
类型的,所以没有返回值。因此,runAsync
适合创建不需要返回值的计算任务。同supplyAsync()
类似,runAsync()
也有两种签名:
- public static CompletableFuture<Void> runAsync(Runnable runnable)
-
- public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
下面为使用runAsync()
的例子:
- CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
- System.out.println("compute test");
- });
-
- System.out.println("get result: " + future.join());
-
- // 由于任务没有返回值, 所以最后的打印结果是"get result: null"。
同Future
相比,CompletableFuture
最大的不同是支持流式(Stream
)的计算处理,多个任务之间,可以前后相连,从而形成一个计算流。比如:任务1产生的结果,可以直接作为任务2的入参,参与任务2的计算,以此类推。
CompletableFuture
中常用的流式连接函数包括:
其中,带Async
后缀的函数表示需要连接的后置任务会被单独提交到线程池中
,从而相对前置任务来说是异步运行
的。除此之外,两者没有其他区别。因此,为了快速理解,在接下来的介绍中,我们主要介绍不带Async的版本。
这里将thenApply / thenAccept / thenRun
放在一起讲,因为这几个连接函数之间的唯一区别是提交的任务类型不一样 :
thenApply
提交的任务类型需遵从Function
签名,也就是有入参和返回值
,其中入参为前置任务的结果thenAccept
提交的任务类型需遵从Consumer
签名,也就是有入参但是没有返回值
,其中入参为前置任务的结果thenRun
提交的任务类型需遵从Runnable
签名,即没有入参也没有返回值
通过thenApply / thenAccept / thenRun
连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算
。因此,这组函数主要用于连接前后有依赖的任务链。
- CompletableFuture<Integer> future1
- = CompletableFuture.supplyAsync(()->{
- System.out.println("compute 1");
- return 1;
- });
- CompletableFuture<Integer> future2
- = future1.thenApply((p)->{
- System.out.println("compute 2");
- return p+10;
- });
- System.out.println("result: " + future2.join());
-
- // 该示例的最终打印结果为11
- @Test
- public void test5() throws Exception {
- ForkJoinPool pool=new ForkJoinPool();
- // 创建异步执行任务:
- CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
- System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- }
- System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
- return 1.2;
- },pool);
-
- // cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
- // thenApply这里实际创建了一个新的CompletableFuture实例
- CompletableFuture<String> cf2=cf.thenApply((result)->{
- System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。