当前位置:   article > 正文

CompletableFuture的cancel和handleAsync的一个小坑_completablefuture.cancel

completablefuture.cancel

环境

  • Ubuntu 22.04
  • Java 17.0.3.1
  • IntelliJ IDEA 2022.1.3

背景

最近遇到了一个CompletableFuture的一个小坑,记录一下,顺便复习一下CompletableFuture。

简单的说,就是调用CompletableFuture的 cancel() 方法之后,貌似没有触发 handleAsync() 所指定的回调方法。经过一番测试和研究,最终发现是我自己的问题。

测试1

首先写了一个最简单的测试程序:通过CompletableFuture的 supplyAsync() 方法运行一个task runTask() ,并通过CompletableFuture的 handleAsync() 方法添加一个回调函数 handleResult() ,用于处理task的运行结果。代码如下:

public class Test0524 {
    private String runTask() {
        System.out.println(Thread.currentThread().getName() + ": runTask start " + System.currentTimeMillis());

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("runTask interrupted! " + System.currentTimeMillis());
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + ": runTask end " + System.currentTimeMillis());

        return "task result: success";
    }

    public void test() {
        System.out.println(Thread.currentThread().getName() + ": test start " + System.currentTimeMillis());

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> runTask());

        future.handleAsync((result, throwable) -> handleResult(result, throwable));

        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + ": test end " + System.currentTimeMillis());
    }

    private String handleResult(String result, Throwable throwable) {
        System.out.println(Thread.currentThread().getName() + ": handleResult start " + System.currentTimeMillis());

        System.out.println(Thread.currentThread().getName() + ": result = " + result + ", throwable = " + throwable);

        System.out.println(Thread.currentThread().getName() + ": handleResult end " + System.currentTimeMillis());

        return "handle result: success";
    }

    public static void main(String[] args) {
        Test0524 obj = new Test0524();

        obj.test();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

注意:在task里sleep了3秒钟,以模拟task需要做一些事情。同时,在主线程里sleep了4秒钟,以确保主线程在task之后结束。如果主线程比task早结束,则整个程序随着主线程一起结束了。

运行程序,结果如下:

main: test start 1684930792470
ForkJoinPool.commonPool-worker-1: runTask start 1684930792495
ForkJoinPool.commonPool-worker-1: runTask end 1684930795496
ForkJoinPool.commonPool-worker-2: handleResult start 1684930795498
ForkJoinPool.commonPool-worker-2: result = task result: success, throwable = null
ForkJoinPool.commonPool-worker-2: handleResult end 1684930795510
main: test end 1684930796496
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

可见,主线程、task、result handler分别为3个不同的线程,这是因为使用了 supplyAsync()handleAsync() 。顾名思义,它们都是以“Async”结尾,表示异步线程。

本次测试没有问题,程序工作正常。

测试2

在task运行的过程中,把task cancel掉。

修改 test() 方法如下:

    public void test() {
        System.out.println(Thread.currentThread().getName() + ": test start " + System.currentTimeMillis());

        CompletableFuture<String> future;
        future = CompletableFuture.supplyAsync(() -> runTask());

        future.handleAsync((result, throwable) -> handleResult(result, throwable));

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + ": cancel start " + System.currentTimeMillis());
        boolean cancel = future.cancel(true);
        System.out.println(Thread.currentThread().getName() + ": cancel end " + System.currentTimeMillis() + ", result = " + cancel);

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + ": test end " + System.currentTimeMillis());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

注意:因为task持续运行3秒钟,所以在主线程里先sleep了2秒钟,以确保task运行起来,然后cancel task,最后再sleep 2秒钟,确保task有充足的时间运行结束。

运行程序,结果如下:

main: test start 1684931916879
ForkJoinPool.commonPool-worker-1: runTask start 1684931916899
main: cancel start 1684931918901
ForkJoinPool.commonPool-worker-2: handleResult start 1684931918902
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CancellationException
ForkJoinPool.commonPool-worker-2: handleResult end 1684931918914
main: cancel end 1684931918902, result = true
ForkJoinPool.commonPool-worker-1: runTask end 1684931919900
main: test end 1684931920917
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

从结果可见, cancel() 操作立即触发了 handleAsync() 回调方法,二者只差1毫秒。

然而,我们可以看到,实际上task并没有受到影响,它仍然在运行。尽管给 cancel() 方法的参数(表示 mayInterruptIfRunning )传了true值,但是并没有给task发中断信号(如果有中断信号,会被task的sleep方法捕获)。

JDK文档里是这样说明的:

mayInterruptIfRunning – this value has no effect in this implementation because interrupts are not used to control processing.

所以,实际的行为是:在运行的task上调用 cancel() 方法时,task运行本身并不受影响,而会立即触发其 handle() / handleAsync() 所指定的回调方法。当然,task运行结束后,就不再触发回调方法了。

handle() / handleAsync() 的回调方法有两个参数:一个是task的运行结果,另一个是error(Throwable)。正常情况下error是null,异常情况下运行结果是null。

从运行结果可见,cancel task之后,Throwable是 CancellationException ,而运行结果是null。

测试3

task运行过程中出现异常。

修改 runTask() 方法如下:

    private String runTask() {
        System.out.println(Thread.currentThread().getName() + ": runTask start " + System.currentTimeMillis());

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println("runTask interrupted! " + System.currentTimeMillis());
            e.printStackTrace();
        }

        throw new RuntimeException("runTask exception!");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

注意:task的sleep时间改为1秒,表示在task cancel之前就出现异常。

运行程序,结果如下:

main: test start 1684933211432
ForkJoinPool.commonPool-worker-1: runTask start 1684933211450
ForkJoinPool.commonPool-worker-2: handleResult start 1684933212454
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CompletionException: java.lang.RuntimeException: runTask exception!
ForkJoinPool.commonPool-worker-2: handleResult end 1684933212487
main: cancel start 1684933213452
main: cancel end 1684933213453, result = false
main: test end 1684933215484
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可见:

  1. task抛出异常后,立即触发了 handleAsync() 的回调方法,task是异常结束的,结果为null,Throwable为 CompletionException (caused by RuntimeException )。
  2. task在cancel之前已经结束了,所以cancel task无效, cancel() 方法返回值为false。

测试4

同测试3,唯一区别是先cancel,然后task运行过程中才出现异常。

runTask() 方法里的sleep时间改为3秒。

运行程序,结果如下:

main: test start 1684933790852
ForkJoinPool.commonPool-worker-1: runTask start 1684933790878
main: cancel start 1684933792880
ForkJoinPool.commonPool-worker-2: handleResult start 1684933792884
ForkJoinPool.commonPool-worker-2: result = null, throwable = java.util.concurrent.CancellationException
main: cancel end 1684933792883, result = true
ForkJoinPool.commonPool-worker-2: handleResult end 1684933792904
main: test end 1684933794904
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可见,cancel操作立即触发了 handleAsync() 的回调方法。当然,实际上task还在运行,并且抛出了异常,但是没有被捕获处理。

测试5

前面几个测试,经多次运行,结果都是期望的,没有问题。

那为什么我的代码中,感觉cancel task之后,并没有触发 handleAsync() 的回调方法呢。因为系统比较复杂,不方便调试,我的判断依据是看log,在回调方法中有一些记录log的逻辑,而查看log时,只看到了cancel的log,并没有找到回调方法的log,所以我感觉是回调方法没有被触发。

经过好几天的测试和分析,我始终没有找到原因,甚至开始怀疑是JDK的问题,今天才终于发现,还是我自己的问题。囧……

直接说结论,因为 handleAsync() 是启动异步线程,所以如果在回调方法中出现异常,并没有被捕获。

说白了就是,在回调方法中,出现了异常,没有捕获,所以线程一下子挂了。

那么出现了什么异常了?为什么正常结束时,回调方法里就没有异常了呢,说来惭愧,因为cancel task,导致task结果是null,而我在回调方法里使用了类似 result.getXxx() 的代码,所以出现异常,而正常结束时task结果不是null,所以就没有异常了。

handleResult() 方法最前面添加一行代码:

        System.out.println(result.length());
  • 1

运行程序,结果如下:

main: test start 1684934679760
ForkJoinPool.commonPool-worker-1: runTask start 1684934679788
main: cancel start 1684934681789
main: cancel end 1684934681792, result = true
main: test end 1684934683830
  • 1
  • 2
  • 3
  • 4
  • 5

看,是不是感觉cancel操作没有触发 handleAsync() 的回调方法?

当然,本例是十分简化的模型,看起来一目了然,而在实际项目中,情况要复杂百倍,花费了我好几天时间,才最终找到原因。

测试6

在回调方法中捕获异常。

找到问题原因,接下来就好办了,只需在回调方法中捕获异常。修改 handleResult() 方法如下:

    private String handleResult(String result, Throwable throwable) {
        try {
            System.out.println(result.length());

            System.out.println(Thread.currentThread().getName() + ": handleResult start " + System.currentTimeMillis());

            System.out.println(Thread.currentThread().getName() + ": result = " + result + ", throwable = " + throwable);

            throwable.printStackTrace();

            System.out.println(Thread.currentThread().getName() + ": handleResult end " + System.currentTimeMillis());

            return "handle result: success";
        } catch (Exception e) {
            System.out.println("handleResult exception!");
            e.printStackTrace();
            throw e;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

运行程序,结果如下:

main: test start 1684935436681
ForkJoinPool.commonPool-worker-1: runTask start 1684935436699
main: cancel start 1684935438700
handleResult exception!
java.lang.NullPointerException: Cannot invoke "String.length()" because "result" is null
	at com.example.test0524.Test0524.handleResult(Test0524.java:51)
	at com.example.test0524.Test0524.lambda$test$1(Test0524.java:28)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
main: cancel end 1684935438703, result = true
main: test end 1684935440732
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

OK,现在一切都正常了。

测试7

思考:如果把 handleAsync() 替换成 handle() ,也就是用同步线程,会怎么样呢?

修改 test() 方法,把 handleAsync 替换为 handle

修改 handleResult() 方法,去除try…catch块,在最前面加上: System.out.println(Thread.currentThread().getName() + "====================");

运行程序,结果如下:

main: test start 1684936772747
ForkJoinPool.commonPool-worker-1: runTask start 1684936772769
main: cancel start 1684936774770
main====================
main: cancel end 1684936774776, result = true
main: test end 1684936776796
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

结论:回调方法和主线程使用的是同一个线程,但是如果回调方法有异常,不会被捕获,而且不影响主线程的逻辑。所以还是得捕获回调方法的异常。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/824221
推荐阅读
相关标签
  

闽ICP备14008679号