赞
踩
本文我们来了解下 Java 8 引入的 CompletableFuture 类,了解下该类提供的功能和用例。
异步计算很难推理的,因为我们的大脑是同步的,会将任何计算看成是一系列的同步计算。
我们在实现异步计算时,往往会把回调的动作分散在代码中或者深深地嵌套在彼此内部,这种情况下,当我们需要处理其中一个步骤中可能发生的错误时,情况变得更糟。
人生的一大悲剧是,尽管 Java 5 已经看到了这种恶性循环,提供了Future
接口作为异步计算的结果,但它没有提供任何方法来组合这些计算或处理可能的错误。
直到 Java 8,才引入了 CompletableFuture 类。该类不仅实现了 Future
接口,还实现了 CompletionStage
接口。此接口定义了可与异步计算步骤组合的异步计算步骤契约。
官方文档真是拗口,简单来说,CompletionStage
接口规范了一个异步计算步骤如何与另一个异步计算步骤组合。
CompletableFuture 类还是一个集大成者,即是一个构建块,也是一个框架,提供了大约 50 种不同的方法来构造,组合,执行异步计算步骤和处理错误。
API 数量如此之多,第一眼看到简直就傻眼了,不过好在它们可以分门别类,因为它们大多属于几个明确且不同的用例。
为什么可以 ?
因为 CompletableFuture 类实现了 Future 接口,因此我们可以将其用作 Future 实现,但需要自己实现额外的完成逻辑。
例如,我们可以使用无任何参数的构造函数来创建此类的实例,用于表示未来的某些结果,然后将其交给使用者,并在将来的某个时间调用 complete()
方法完成。消费者可以使用 get()
方法来阻止当前线程,直到提供此结果。
- public Future<String> calculateAsync() throws InterruptedException {
- CompletableFuture<String> completableFuture
- = new CompletableFuture<>();
-
- Executors.newCachedThreadPool().submit(() -> {
- Thread.sleep(500);
- completableFuture.complete("Hello");
- return null;
- });
-
- return completableFuture;
- }
上面的实例中,我们创建了一个创建 CompletableFuture
实例的方法,把计算分离到另一个线程中并立即返回 Future。当计算完成后,该方法通过将结果提供给 complete()
方法来完成 Future。
为了分离计算,我们使用了前几章节 一文秒懂 Java 线程池 ( Thread Pool ) (上) 中提到的 Executor API。这种创建和完成 CompletableFuture 的方法可以与任何并发机制或 API( 包括原始线程 )一起使用。
请注意,calculateAsync() 方法返回 Future 实例。
接下来,我们只要调用此方法,接收 Future 实例并在我们准备阻塞结果时调用它的 get() 方法。
- Future<String> completableFuture = calculateAsync();
-
- // ...
-
- String result = completableFuture.get();
- assertEquals("Hello", result);
注意: get()
方法会抛出一些已检查的异常,即 ExecutionException
( 封装计算期间发生的异常 )和 InterruptedException
( 表示执行方法的线程被中断的异常 )。
如果你已经知道计算的结果,可以将表示此计算的结果作为参数传递给 completedFuture()
静态方法,这样,Future 的 get() 方法永远不会阻塞,而是立即返回此结果。
- Future<String> completableFuture =
- CompletableFuture.completedFuture("Hello");
-
- // ...
-
- String result = completableFuture.get();
- assertEquals("Hello", result);
当然了,有时候,你可能希望取消 Future 的执行。
假设我们没有找到结果并决定完全取消异步执行,这可以通过调用 Future 的 cancel() 方法完成。此方法接收一个布尔参数 mayInterruptIfRunning
。
- public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
- CompletableFuture<String> completableFuture = new CompletableFuture<>();
-
- Executors.newCachedThreadPool().submit(() -> {
- Thread.sleep(500);
- completableFuture.cancel(false);
- return null;
- });
-
- return completableFuture;
- }
在上面这个异步方法的修改版本的范例中,当我们使用 Future.get()
方法阻塞结果时,如果 future 取消了,那么将抛出CancellationException 异常。
但是,在类型为 CompletableFuture 的情况下,cancel() 方法没有任何效果,因为 CompletableFuture 并不会响应中断也不会处理中断。
上面讲解的这些代码,都允许我们选择任何并发执行机制,但是,如果我们想跳过这个样板代码并简单地异步执行一些代码呢?
CompletableFuture
的静态方法 runAsync()
和 supplyAsync()
允许我们从 Runnable 和 Supplier 中创建 CompletableFuture 实例。 Runnable 和 Supplier 都是功能接口,由 Java 8 的新功能,可以将它们的实例作为lambda 表达式传递:
下面的代码演示了如何将 Supplier 的实例作为 lambda 表达式参数,该表达式执行计算并返回结果
- CompletableFuture<String> future
- = CompletableFuture.supplyAsync(() -> "Hello");
-
- // ...
-
- assertEquals("Hello", future.get());
处理计算结果的最通用方法是将其提供给函数。CompletableFuture.thenApply()
方法就是这样做的: 接受一个 Function 实例,用它来处理结果并返回一个用于保存函数返回的值 Future。
- CompletableFuture<String> completableFuture
- = CompletableFuture.supplyAsync(() -> "Hello");
-
- CompletableFuture<String> future = completableFuture
- .thenApply(s -> s + " World");
-
- assertEquals("Hello World", future.get());
如果你不需要在 Future 链中返回值,则可以使用 Consumer 功能接口的实例。它只有一个方法,该方法接受一个参数并返回 void。而相应的,CompletableFuture 也提供了一个使用 Consumer 实例的方法 thenAccept()
。该方法接收一个 Consumer 并将其传递给计算结果。
- CompletableFuture<String> completableFuture
- = CompletableFuture.supplyAsync(() -> "Hello");
-
- CompletableFuture<Void> future = completableFuture
- .thenAccept(s -> System.out.println("Computation returned: " + s));
-
- future.get();
上面这个示例中,最后的 future.get()
方法会返回空值 Void
。
最后,如果你既不需要计算的值也不想在链的末尾返回一些值,那么你可以将 Runnable
lambda 传递给 thenRun()
方法。
- CompletableFuture<String> completableFuture
- = CompletableFuture.supplyAsync(() -> "Hello");
-
- CompletableFuture<Void> future = completableFuture
- .thenRun(() -> System.out.println("计算完成"));
-
- future.get();
上面这个示例中,调用 future.get()
方法之后会在控制台打印一行 计算完成
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。