赞
踩
CompletableFuture是 Java 8 中引入的一个类,它实现了CompletionStage接口,提供了一组丰富的方法来处理异步操作和多个任务的结果。它支持链式操作,可以方便地处理任务的依赖关系和结果转换。相比于传统的Future接口,CompletableFuture更加灵活和强大。
CompletableFuture的使用具有以下优势和特点:
使用CompletableFuture创建异步任务非常简单。可以使用CompletableFuture.supplyAsync() 或CompletableFuture.runAsync() 方法来创建CompletableFuture对象。
使用CompletableFuture.supplyAsync() 方法来创建 CompletableFuture 对象的示例。该方法用于执行具有返回值的任务,并在任务完成时返回结果。
- CompletableFuture<String> future = CompletableFuture.supplyAsync(
- () ->
- { // 执行具有返回值的任务 return "任务结果"; }
- );
在上述示例中,我们使用CompletableFuture.supplyAsync() 方法创建一个具有返回值的 CompletableFuture 对象,任务会在默认的 ForkJoinPool 中异步执行。
除了CompletableFuture.supplyAsync() 方法,CompletableFuture 还提供了CompletableFuture.runAsync() 方法用于执行没有返回值的任务。
- CompletableFuture<Void> future = CompletableFuture.runAsync(
- () ->
- { // 执行没有返回值的任务 }
- );
在上述示例中,我们使用CompletableFuture.runAsync() 方法创建一个没有返回值的 CompletableFuture 对象,任务会在默认的 ForkJoinPool 中异步执行。
我们还可以通过指定自定义线程池来创建 CompletableFuture 对象,以满足特定的并发需求。
- ExecutorService customExecutor = Executors.newFixedThreadPool(10);
- CompletableFuture<String> future = CompletableFuture
- .supplyAsync(
- () ->
- { // 执行任务的代码 },
- customExecutor);
在上述示例中,我们通过Executors.newFixedThreadPool(10) 创建了一个固定大小为 10 的自定义线程池,并将其传递给CompletableFuture.supplyAsync() 方法来执行异步任务。
获取CompletableFuture任务的结果有多种方式。最常用的方式是使用join() 方法阻塞当前线程,直到任务完成并返回结果。
join() 方法是 CompletableFuture 类提供的一种获取任务结果的方式,它会阻塞当前线程,直到任务完成并返回结果。
- CompletableFuture<String> future = CompletableFuture
- .supplyAsync(() -> { // 执行任务的代码 return "任务结果"; });
- String result = future.join();
在上述示例中,我们使用join() 方法获取任务的结果,并将结果赋值给result变量。如果任务还未完成,join() 方法会阻塞当前线程,直到任务完成。
join() 方法和get() 方法非常相似,但join() 方法不会抛出InterruptedException和ExecutionException异常,而是将异常包装在CompletionException中抛出。因此,它更适合在 Lambda 表达式或流式操作中使用。
get() 方法也是 CompletableFuture 类提供的一种获取任务结果的方式,它会阻塞当前线程,直到任务完成并返回结果。与join() 方法不同的是,get() 方法会抛出InterruptedException和ExecutionException异常,需要进行异常处理。
- CompletableFuture<String> future = CompletableFuture
- .supplyAsync(() -> { // 执行任务的代码 return "任务结果"; });
- try { String result = future.get(); }
- catch (InterruptedException | ExecutionException e)
- { // 异常处理逻辑 }
在上述示例中,我们使用get() 方法获取任务的结果,并在可能抛出异常的情况下进行异常处理。如果任务还未完成,get() 方法会阻塞当前线程,直到任务完成。
get() 方法的异常处理较为繁琐,需要捕获InterruptedException和ExecutionException异常,并进行相应的处理。因此,在 Lambda 表达式或流式操作中,推荐使用join() 方法。
CompletableFuture 提供了一系列方法来处理任务的完成事件,实现异步回调。我们将逐一介绍这些方法的区别和用法。
方法签名:thenApply(Function<? super T, ? extends U> fn)
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
- .thenApply(result -> result * 2).thenApply(result -> result + 1);
在上述示例中,我们使用 thenApply()方法对上一阶段的结果进行转换,将结果乘以 2,并将转换后的结果加 1。每个 thenApply()方法都返回一个新的 CompletableFuture 对象,可以继续链式调用。
方法签名:thenAccept(Consumer<? super T> action)
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
- .thenAccept(result -> System.out.println("任务结果:" + result));
在上述示例中,我们使用 thenAccept()方法对上一阶段的结果进行消费,将结果打印输出。thenAccept()方法没有返回值,仅用于消费任务结果。
方法签名:thenRun(Runnable action)
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
- .thenRun(() -> System.out.println("任务执行完毕"));
在上述示例中,我们使用 thenRun()方法在上一阶段任务完成后执行一个 Runnable 任务,输出一条任务执行完毕的消息。
CompletableFuture 还提供了一些方法来组合多个任务的结果,实现更复杂的异步处理逻辑。
方法签名:thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
- CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
- CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
- CompletableFuture<Integer> combinedFuture = future1
- .thenCombine(future2, (result1, result2) -> result1 + result2);
在上述示例中,我们使用 thenCombine()方法将两个任务的结果进行组合,将它们的结果相加并返回新的 CompletableFuture 对象。
方法签名:thenCompose(Function<? super T, ? extends CompletionStage> fn)
- CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
- CompletableFuture<Integer> future2 = future1.thenCompose(result ->
- CompletableFuture.supplyAsync(() -> result * 2));
在上述示例中,我们使用 thenCompose()方法将上一阶段的结果传递给一个 Function 函数,该函数返回一个新的 CompletionStage 对象。新阶段的任务结果为上一阶段结果的两倍。
方法签名:allOf(CompletableFuture<?>... cfs)
- CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
- CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
- CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
在上述示例中,我们使用 allOf()方法等待所有的 CompletableFuture 对象都完成,返回一个新的 CompletableFuture 对象。这样我们就可以在该对象上进行进一步的处理,例如获取各个 CompletableFuture 的结果。
CompletableFuture 提供了多种方法来处理异步任务执行中可能发生的异常。常用的方法有:
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
- {
- throw new RuntimeException("任务执行异常");
- });
- CompletableFuture<Integer> handledFuture = future.exceptionally(ex ->
- {
- System.out.println("异常处理:" + ex.getMessage());
- return 0;
- // 默认值
- });
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
- CompletableFuture<String> handledFuture = future.handle((result, ex) ->
- { if (ex != null)
- {
- System.out.println("异常处理:" + ex.getMessage());
- return "默认值";
- } else {
- return "结果:" + result;
- } });
CompletableFuture 还支持异常链,可以将多个 CompletableFuture 的异常连接起来,形成异常链。可以使用exceptionally() 或handle() 方法来实现异常链的处理。
- CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
- {
- throw new RuntimeException("任务执行异常");
- });
- CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
- System.out.println("异常处理:" + ex.getMessage());
- throw new CustomException("自定义异常", ex);
- });
在上述示例中,我们通过exceptionally() 方法处理任务的异常,并抛出一个自定义异常,并将原始异常作为异常链的一部分传递下去。
CompletableFuture 默认使用 ForkJoinPool 线程池来执行异步任务。ForkJoinPool 是一种基于工作窃取算法的线程池,适用于任务分解和并行计算。
- CompletableFuture<String> future = CompletableFuture
- .supplyAsync(() -> { // 异步任务的代码 });
在上述代码中,CompletableFuture 会在默认的 ForkJoinPool 中异步执行任务。
除了使用默认线程池,我们还可以自定义线程池来满足特定的需求。自定义线程池可以通过Executors类来创建。
- ExecutorService customExecutor = Executors.newFixedThreadPool(10);
- CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
- // 异步任务的代码
- }, customExecutor);
在上述代码中,我们创建了一个固定大小为 10 的自定义线程池,并将其传递给 CompletableFuture 来执行异步任务。
在使用自定义线程池时,需要注意及时关闭线程池以释放资源。可以使用ExecutorService的shutdown() 或shutdownNow() 方法来关闭线程池。
- // 异步任务代码
- ExecutorService customExecutor = Executors.newFixedThreadPool(10);
- customExecutor.shutdown();
在上述代码中,我们在任务完成后调用了shutdown() 方法来关闭线程池。
CompletableFuture 允许我们控制并发任务的执行数量。可以通过自定义线程池的大小来限制并发度。
- ExecutorService customExecutor = Executors.newFixedThreadPool(5);
- CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
- // 异步任务1的代码 },
- customExecutor);
- CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
- // 异步任务2的代码 },
- customExecutor); // ...
在上述代码中,我们通过自定义线程池的大小为 5 来限制并发任务的数量。
CompletableFuture 还提供了超时处理的功能,可以控制任务的最大执行时间。可以使用completeOnTimeout() 方法来实现超时处理。
- CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
- // 异步任务的代码
- }).completeOnTimeout("默认值", 5, TimeUnit.SECONDS);
在上述代码中,我们指定任务的最大执行时间为 5 秒,如果任务在规定时间内没有完成,将返回默认值。
在某些情况下,我们可能需要中断或取消正在执行的任务。CompletableFuture 提供了cancel() 方法来取消任务的执行。
- CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
- // 异步任务的代码 });
- boolean canceled = future.cancel(true);
在上述代码中,我们调用cancel() 方法来取消任务的执行,并传递一个布尔值表示是否中断正在执行的任务。
CompletableFuture在处理IO操作时非常有用。可以将IO操作封装为CompletableFuture任务,利用CompletableFuture的异步特性提高IO操作的效率。
- CompletableFuture<String> readData = CompletableFuture.supplyAsync(() -> {
- // 执行读取数据的IO操作
- return "读取的数据";
- });
-
- CompletableFuture<Void> processData = readData.thenAccept(data -> {
- // 处理读取到的数据
- System.out.println("读取到的数据:" + data);
- // 执行处理数据的操作
- });
-
- CompletableFuture<Void> writeData = processData.thenRun(() -> {
- // 执行写入数据的IO操作
- System.out.println("数据写入完成");
- });
-
- writeData.join();

在上述代码中,我们使用CompletableFuture处理了一个包含读取数据、处理数据和写入数据的IO操作流程。通过异步执行和链式操作,可以有效地利用CPU和IO资源,提高程序的响应性和吞吐量。
CompletableFuture也可以很好地与网络请求结合使用。我们可以使用CompletableFuture发起多个网络请求,并在所有请求完成后处理结果。
- CompletableFuture<String> request1 = CompletableFuture.supplyAsync(() -> {
- // 发起网络请求1
- return "请求1结果";
- });
-
- CompletableFuture<String> request2 = CompletableFuture.supplyAsync(() -> {
- // 发起网络请求2
- return "请求2结果";
- });
-
- CompletableFuture<String> request3 = CompletableFuture.supplyAsync(() -> {
- // 发起网络请求3
- return "请求3结果";
- });
-
- CompletableFuture<Void> allRequests = CompletableFuture.allOf(request1,
- request2, request3);
-
- allRequests.thenRun(() -> {
- // 所有请求完成后的处理逻辑
- String result1 = request1.join();
- String result2 = request2.join();
- String result3 = request3.join();
- // 对请求结果进行处理
- });

在上述代码中,我们使用CompletableFuture发起了三个网络请求,并通过allOf() 方法等待所有请求完成。在所有请求完成后,我们可以使用join() 方法获取各个请求的结果,并进行后续处理。
业务背景: 在电商项目的售后业务中,当客服接收到用户的售后申请时,需要进行一系列操作,包括查询订单信息、查询 ERP 中的商品信息、查询用户信息,以及创建售后工单。
- public CompletableFuture<Void> processAfterSalesRequest(String orderId,
- String customerId) {
- CompletableFuture<Order> orderFuture = CompletableFuture
- .supplyAsync(() -> getOrderInfo(orderId));
- CompletableFuture<Inventory> inventoryFuture = CompletableFuture
- .supplyAsync(() -> getInventoryInfo(orderId));
- CompletableFuture<User> userFuture = CompletableFuture
- .supplyAsync(() -> getUserInfo(customerId));
-
- return CompletableFuture.allOf(orderFuture, inventoryFuture, userFuture)
- .thenApplyAsync(ignored -> {
- Order order = orderFuture.join();
- Inventory inventory = inventoryFuture.join();
- User user = userFuture.join();
-
- // 创建售后工单
- createAfterSalesTicket(order, inventory, user);
-
- return null;
- });
- }
-
- private Order getOrderInfo(String orderId) {
- // 查询订单信息的逻辑
- // ...
- return order;
- }
-
- private Inventory getInventoryInfo(String orderId) {
- // 查询ERP中商品信息的逻辑
- // ...
- return inventory;
- }
-
- private User getUserInfo(String customerId) {
- // 查询用户信息的逻辑
- // ...
- return user;
- }
-
- private void createAfterSalesTicket(Order order, Inventory inventory, User user) {
- // 创建售后工单的逻辑
- // ...
- }

在上述代码中,我们使用CompletableFuture.supplyAsync() 方法分别查询订单信息、ERP 中的商品信息和用户信息,然后使用CompletableFuture.allOf() 方法等待所有查询任务完成。完成后,我们可以通过join() 方法获取各个查询任务的结果,并将结果传递给createAfterSalesTicket() 方法来创建售后工单。
CompletableFuture 是提供了丰富的功能和方法。它能简化并发任务处理,提高系统性能和响应性。通过了解其基本用法、进阶应用和最佳实践,我们可以灵活处理异步回调、任务组合、异常处理和资源管理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。