当前位置:   article > 正文

Future、FutureTask、CompletableFuture简介_completablefuture和futuretask

completablefuture和futuretask


在这里插入图片描述

Future、FutureTask、CompletableFuture简介

历史背景

  • Future、FutureTask在java1.5版本提出,用于实现异步计算;

  • ListenableFuture由Google Guava工具包提供的Future扩展类,便于异步计算;

  • 紧随ListenableFuture,在JDK1.8提出CompletableFuture,是对Future的拓展,便于异步计算;

Future

Future是一个接口,方法如下:

  • get方法,用于获得异步调用返回值
  • isCancelled方法,用于检查执行是否被取消
  • isDone方法,用于判断是否执行是否完成
  • cancel方法,用于取消执行
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
Future<String> submit = threadPoolExecutor.submit(() -> {
    System.out.println("123");
    return "1";
});
submit.get();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

FutureTask

实现了RunnableFuture接口(RunnableFuture接口继承Runnable和Future接口),拥有两个构造方法如下;

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

因为继承于Runnable所以可以直接交给线程池执行;如果使用FutureTask(Runnable runnable, V result)构造方法可以自己指定返回值

FutureTask<String> hello = new FutureTask<>(() -> {
    System.out.println("hello");
}, "123");
threadPoolExecutor.execute(hello);
assertEquals("123",hello.get());
  • 1
  • 2
  • 3
  • 4
  • 5

FutureTask小结

FutureTask能很好的应用于有返回的异步调用;但是如果出现如下需求时则显得捉襟见肘:

  • 无法手动完成:当调用远程服务时,如果发现远程服务出现问题,你需要将最近一次正常结果返回;这时使用Future就无法满足该需求。
  • 无法添加回调方法:当调用远程服务结束后需要调用其它方法时,如果使用Future,则需要不断循环调用isDone方法判断是否完成;然后调用get获得结果,接着调用其它方法。
  • 无法将多任务合并获得结果:当需要并行调用多个远程服务时,在获得返回结果时需要不断循环调用各future的isDone方法。
  • 没有异常处理:Future API没有提供异常处理方法。

CompletableFuture

CompletableFuture是在Java1.8提出,实现了Future接口和CompletionStage接口。其中CompletionStage中包含多种处理方法用于异步计算、异常处理和计算结合等等,接下来将使用CompletableFuture解决上述问题。

问题一:无法手动完成
// 例一
CompletableFuture<String> completableFuture = new CompletableFuture();
completableFuture.complete("ok");
assertEquals(completableFuture.get(), "ok");
//例二
CompletableFuture<String> completableFuture1 = new CompletableFuture();
new Thread(() -> {
    completableFuture1.complete("ok");
}).start();
assertEquals(completableFuture1.get(), "ok");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

completableFuture可调用complete方法手动完成,否则get方法将阻塞直到任务完成(计算完成或调用了completableFuture.complete("ok");

小疑问:如果completableFuture调用了complete方法,原有线程还会继续执行吗?执行完成后的结果会修改complete设置的结果吗?

只要线程没结束,会继续执行!执行完毕后不会影响原有结果;

举例如下:

CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("运行了异步运行方法" + Thread.currentThread().getName());
        return "1231";
    }
});

Thread.sleep(200L);
System.out.println("complete 之前");
future.complete("complete");
System.out.println("complete 之后");
assertEquals(future.get(), "complete");
Thread.sleep(4000L);
assertEquals(future.get(), "complete");
System.out.println("等待sout完成后,再次调用get");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

运行结果:

complete 之前
complete 之后
运行了异步运行方法ForkJoinPool.commonPool-worker-1
等待sout完成后,再次调用get
  • 1
  • 2
  • 3
  • 4

complete之后日志打印以后,还打印了异步线程的名称,这证明主线程调用sout.complete(“123”)方法后异步线程还在继续运行。但是如果去掉睡眠200毫秒,则会出现supplyAsync异步方法不会运行。

问题二:无法添加回调方法

CompletableFuture的thenApply方法可为异步方法添加一个回调方法,在CompletableFuture.supplyAsync中的异步方法执行完毕后,会立即执行thenApply中的同步函数。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName());
    try {
        Thread.sleep(1000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "123";
}).thenApply((string) -> {
    //      Executed in the same thread where the supplyAsync() task is executed
    //      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    System.out.println(Thread.currentThread().getName());
    return string + "123";
});
System.out.println("start get");
System.out.println(future.get());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

注意:thenApply中的代码正常情况会被主线程执行,如果异步方法执行时间较长 ,thenApply中的方法会由CompletableFuture线程执行!

问题三:无法将多任务合并获得结果

CompletableFuture的静态方法allOf可以监控多个任务,使用方法如下:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "234");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "123");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "345");
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(future, future1,future2);
completableFuture.get();
  • 1
  • 2
  • 3
  • 4
  • 5

如果只需要任意一个任务结束就返回?

CompletableFuture的静态方法anyO可以监控任意一个CompletableFuture完成,并立即执行后续方法,代码如下:

//public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "234");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "123");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "345");
CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future, future1, future2);
completableFuture.get();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

当任何一个CompletableFuture完成后,completableFuture.get(),都会立刻返回结果,否则主线程会一直阻塞

问题四:没有异常处理

CompletableFuture处理异常可采用handle方法来处理,该方法无论是否异常都会调用,使用方法如下:

CompletableFuture<String> handle = CompletableFuture.runAsync(() -> {
    System.out.println("123");
}).handle(new BiFunction<Void, Throwable, String>() {

    @Override
    public String apply(Void unused, Throwable throwable) {
        if (null == throwable) {
            return "ok";
        } else {
            return "error";
        }
    }
});
assertEquals("ok",handle.get());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

如果只需要在发生异常时调用?

可采用exceptionally方法来处理,使用方法如下:

CompletableFuture<String> exceptionally2 = CompletableFuture.supplyAsync(() -> {
    int i = 1 / 0;
    return "123";
}).exceptionally(new Function<Throwable, String>() {
    @Override
    public String apply(Throwable throwable) {
        return "error";
    }
});
assertEquals(exceptionally2.get(), "error");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
其他功能点
runAsync方法和supplyAsync方法有何区别?

runAsync是不带返回值的异步方法;supplyAsync是带返回值的异步方法。

异步调用时,可以指定线程池吗?默认使用的什么线程?

默认使用的```ForkJoinPool.commonPool()``线程池;也可以在supplyAsync或runAsync中指定线程池。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
  • 1
  • 2
如果需要等待两个任务完成,然后运行其它方法,如何处理?

首先可以使用allof方法,此外CompletableFuture还提供了thenCombine方法,该方法可以等待两个异步方法结束后立刻运行传入的函数,具体使用方法如下:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "123");
CompletableFuture<String> other = CompletableFuture.supplyAsync(() -> "456");
CompletableFuture<String> future = future1.thenCombine(other, (a, b) -> {
    return a + b;
});
assertEquals(future.get(), "123456");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
如果回调方法不需要返回值,使用哪个方法?

thenAccept()可接受一个入参,没有返回值;thenRun()不接受入参,也没有返回值;示例代码如下:

CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
    return "123";
}).thenRun(new Runnable() {
    @Override
    public void run() {

    }
});
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
    return "123";
}).thenAccept(new Consumer<String>() {
    @Override
    public void accept(String s) {
        System.out.println(s);
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
thenApply可以添加回调方法,thenCompose也可以添加回调方法,他们有什么区别?

具体详见代码如下,thenCompose接受的函数的返回值必须是CompletionStage的子类,而thenApply接受的函数的返回值可以是任意类型,但是thenApply会将该返回值再次包装到一个新的CompletableFuture中,所以下面两个代码都返回的CompletableFuture类型的数据。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "123").thenCompose(new Function<String, CompletionStage<String>>() {
    @Override
    public CompletionStage<String> apply(String s) {
        return CompletableFuture.supplyAsync(() -> {
            return "456";
        });
    }
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "123").thenApply(new Function<String, String>() {
    @Override
    public String apply(String s) {
        return "456";
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

总结

  • CompletableFuture初始化使用supplyAsync、runAsync或其他静态方法
  • 如果回调函数不需要返回值且接受传参使用thenAccept方法
  • 如果回调函数不需要返回值不接受传参使用thenRun方法
  • 如果结合两个有依赖的futures使用thenCompose方法
  • 如果结合两个独立的futures使用thenCombine方法
  • 如果需要结合多个futures使用allof方法
  • 如果回调函数不在主线程运行,则调用各自的 Async方法 例如thenApplyAsync、thenAcceptAsync、thenCombineAsync等
  • 使用手动调用complete,CompletableFuture中的异步方法还会运行;且在complete调用后会立即运行该CompletableFuture接下来的操作

参考资料

completablefuture指导

https://www.baeldung.com/java-completablefuture

更加详细的举例

https://www.callicoder.com/java-8-completablefuture-tutorial/

关于listenableFuture的讲解

https://michaelbfullan.com/the-joys-of-guava-listenablefuture/

客官且慢,点赞、收藏+关注 谢谢~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/828773
推荐阅读
相关标签
  

闽ICP备14008679号