赞
踩
在 Java 8 函数式编程之前,操作多个线程是一件复杂且头疼的事情,代码写的很是不优雅。
当来到 Java 8 函数式编程时代,CompletableFuture 使我们的代码变得很优雅且易懂,今天我来介绍下 CompletableFuture 在我们项目中的实际使用场景。
除了 CompletableFuture 接收线程的返回值之外,还有 Future 和 FutureTask。
Future 用来接收线程池
的返回值
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt();
}
});
executor.shutdown();
try {
System.out.println("result:" + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
FutureTask 用来接收线程
的返回值
FutureTask<Integer> task = new FutureTask<>(() -> new Random().nextInt());
new Thread(task).start();
try {
System.out.println("result: " + task.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
下面4种是我们常用的静态方法
1、CompletableFuture.runAsync(Runnable runnable);
2、CompletableFuture.runAsync(Runnable runnable, Executor executor);
3、CompletableFuture.supplyAsync(Supplier<U> supplier);
4、CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
返回值
CompletableFuture 不论是 runAsync 还是 supplyAsync 都是有返回结果的,
runAsync默认返回 Void
类型,supplyAsync 会根绝线程中返回的参数决定
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> System.out.println("无返回值"));
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "ss")
当线程执行完毕时我们需要拿到返回结果,此时有三种方法可以做到。
get() 方法会抛出经检查的异常,可被捕获,自定义处理或者直接抛出。
而 join() 会抛出未经检查的异常
CompletableFuture.runAsync(() -> { System.out.println("before"); }).thenRun(() -> { System.out.println("after"); }); // output before after CompletableFuture.supplyAsync(() -> "Hello, Completable") .thenApply(String::toUpperCase) .thenAccept(v -> { System.out.println("thenCompose:" + v); }); // output thenCompose:HELLO, COMPLETABLE
thenRun 前一个任务A执行完接着执行后一个任务B,不需要返回值
thenApply 任务A的返回值传递给任务B,任务B将返回值传递给下一个任务C
thenAccept 接收前一个任务B的返回值进行处理,终止操作,不传递给下一个任务
thenAccept、thenApplyAsync,thenRun、thenRunAsync 效果一致
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { print("====thenApply1====="); return 2; }); future.thenApply(res -> { sleep(2000); print("====thenApply2====="); return res + 1; }); future.thenApply(res -> { print("====thenApply3====="); return res + 1; }).thenAccept(System.out::println); sleep(4000); // output [ ForkJoinPool.commonPool-worker-3 ]: ====thenApply1===== [ main ]: ====thenApply2===== [ main ]: ====thenApply3===== 3
thenApply 会阻塞
住当前任务,顺序执行
,且任务在 main 线程
中执行
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { print("====thenApplyAsync1====="); return 2; }); future.thenApplyAsync(res -> { sleep(2000); print("====thenApplyAsync2====="); return res + 1; }); future.thenApplyAsync(res -> { print("====thenApplyAsync3====="); return res + 1; }).thenAccept(System.out::println); sleep(4000); // output [ ForkJoinPool.commonPool-worker-3 ]: ====thenApplyAsync1===== [ ForkJoinPool.commonPool-worker-5 ]: ====thenApplyAsync3===== 3 [ ForkJoinPool.commonPool-worker-3 ]: ====thenApplyAsync2=====
thenApplyAsync 异步执行
,不会阻塞住,且运行在 ForkJoinPool
中
取上一个任务的结果和本次任务的结果进行组合,有返回值
String result3 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Hello";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "world";
}), (s1, s2) -> s1 + " " + s2).join();
print("thenCombine:" + result3);
// output
[ main ]: thenCombine:Hello world
取上一个任务的结果和本次任务的结果进行组合,无返回值
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "world";
}), (s1, s2) -> {
print(s1 + s2);
});
sleep(3000);
// output
[ ForkJoinPool.commonPool-worker-5 ]: Helloworld
不获取两个任务的返回值
CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Hello";
}).runAfterBoth(CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "world";
}), () -> {
print("runAfterBoth");
});
sleep(3000);
// output
[ ForkJoinPool.commonPool-worker-5 ]: runAfterBoth
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> { print("cfA"); return "cfaA"; }); CompletableFuture<Integer> cfB = CompletableFuture.supplyAsync(() -> { sleep(2000); print("cfB"); return 123; }); CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> { print("cfC"); return "cfC"; }); CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC); //阻塞等待 future.join(); // output [ ForkJoinPool.commonPool-worker-3 ]: cfA [ ForkJoinPool.commonPool-worker-3 ]: cfC [ ForkJoinPool.commonPool-worker-5 ]: cfB
阻塞等待所有任务执行完毕
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> { sleep(1000); print("cfA"); return "cfaA"; }); CompletableFuture<Integer> cfB = CompletableFuture.supplyAsync(() -> { sleep(3000); print("cfB"); return 123; }); CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> { print("cfC"); return "cfC"; }); CompletableFuture<Object> anyOf = CompletableFuture.anyOf(cfA, cfB, cfC); //阻塞等待 anyOf.join(); // output [ ForkJoinPool.commonPool-worker-7 ]: cfC
任意一个任务执行完毕就退出
String result4 = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "Hi Boy";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
sleep(300);
return "Hi Girl";
}), (s) -> s.toUpperCase()).join();
print(result4);
// output
[ main ]: HI BOY
XXXEither 表示的意思是两个任务中其中一个执行完成就进行指定的操作,如上面将字符串转换成大写。
acceptEither、acceptEitherAsync,runAfterEither、runAfterEitherAsync,applyToEitherAsync、applyToEither 这些使用方法相似
XXXAsync 表示的异步执行,不阻塞
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> "hello")
.thenCompose(resultA -> CompletableFuture.supplyAsync(() -> resultA + " world"))
.thenCompose(resultAB -> CompletableFuture.supplyAsync(() -> resultAB + " 1024"));
print(result.join());
// output
[ main ]: hello world 1024
通过接收上一个任务返回值,传递给当前任务
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> { print("cfa..."); return "hello"; }); CompletableFuture<Integer> cfB = CompletableFuture.supplyAsync(() -> { print("123..."); return 123; }); CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> { print("cfC..."); return "word!"; }); cfA.thenCombine(cfB, (resultA, resultB) -> { print(resultA + resultB); return resultA + resultB; }).thenCombine(cfC, (resultAB, resultC) -> { print(resultAB + resultC); return resultAB + resultC; }).join(); // output [ ForkJoinPool.commonPool-worker-3 ]: cfa... [ ForkJoinPool.commonPool-worker-5 ]: 123... [ ForkJoinPool.commonPool-worker-7 ]: cfC... [ main ]: hello123 [ main ]: hello123word!
让多个 CompletableFuture 实例形成一个链,沿着这个链接着进行操作
。
首先将 cfA 和 cfB 组合成一个链 cfAB,然后将 cfAB 和 cfC 组合成一个链,类似 Stream 中的流式操作
combine 是把结果进行聚合,但是 compose 更像是把多个已有的 cf 实例组合成一个整体的实例。
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "hello")
.thenApply(cfA -> cfA + " world");
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "hello")
.thenCompose(cfA -> CompletableFuture.supplyAsync(() -> cfA + " world"));
它们都需要接收一个 Function,这个函数的主要的区别在于 thenApply 中返回一个具体的值,而 thenCompose 返回一个新的 cf 实例。
String result5 = CompletableFuture.supplyAsync(() -> {
sleep(100);
if (true) {
throw new RuntimeException("exception test!");
}
return "Hi Boy";
}).exceptionally(e -> {
print(e.getMessage());
return "Hello world!";
}).thenApply((result) -> result + "resultC").join();
print(result5);
// output
[ ForkJoinPool.commonPool-worker-3 ]: java.lang.RuntimeException: exception test!
[ main ]: Hello world!resultC
尽管在第一个任务中抛出异常,但是通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给下一个任务。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") // 任务 C 抛出异常 .thenApply(resultB -> { throw new RuntimeException(); }) // 处理任务 C 的返回值或异常 .handle((re, throwable) -> { if (throwable != null) { return "errorResultC"; } return re; }) .thenApply(resultC -> resultC + " resultD"); print(future.join());
上面的代码使用了 handle 方法来处理任务 C 的执行结果,正常情况下throwable 是空的,当抛出异常时 throwable 非空,通过判断 throwable 是否为空来判断是否发生异常
部分例子和思路来源于 https://javadoop.com/post/completable-future 大佬
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。