赞
踩
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
@Test public void test3() throws Exception { //创建异步执行任务 ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Double> cf = executorService.submit(() -> { System.out.println(Thread.currentThread() + "start,time->" + System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if (false) { throw new RuntimeException("test"); } else { System.out.println(Thread.currentThread() + "exit,time->" + System.currentTimeMillis()); return 1.2; } }); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任务执行完成,如果已完成则直接返回结果 //如果执行任务异常,则get方法会把之前捕获的异常重新抛出 System.out.println("run result->"+cf.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
@Test public void test2() throws Exception { // 创建异步执行任务,有返回值 CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); return 1.2; } }); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任务执行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); } @Test public void test4() throws Exception { // 创建异步执行任务,无返回值 CompletableFuture cf = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(false){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); } }); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任务执行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中
@Test public void test5() throws Exception { ForkJoinPool pool=new ForkJoinPool(); // 创建异步执行任务: CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; },pool); //cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中 //thenApply这里实际创建了一个新的CompletableFuture实例 CompletableFuture<String> cf2=cf.thenApply((result)->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return "test:"+result; }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); //等待子任务执行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis()); System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是无返回值;
thenRun 的方法没有入参,也买有返回值
@Test public void test6() throws Exception { ForkJoinPool pool=new ForkJoinPool(); // 创建异步执行任务: CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis()); return 1.2; },pool); //cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中 CompletableFuture cf2=cf.thenApply((result)->{ System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis()); return "test:"+result; }).thenAccept((result)-> { //接收上一个任务的执行结果作为入参,但是没有返回值 System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println(result); System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis()); }).thenRun(()->{ //无入参,也没有返回值 System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("thenRun do something"); System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis()); }); System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis()); //等待子任务执行完成 System.out.println("run result->"+cf.get()); System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis()); //cf2 等待最后一个thenRun执行完成 System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果
@Test public void test2() throws Exception { ForkJoinPool pool=new ForkJoinPool(); // 创建异步执行任务: CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } if(true){ throw new RuntimeException("test"); }else{ System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis()); return 1.2; } },pool); //cf执行异常时,将抛出的异常作为入参传递给回调方法 CompletableFuture<Double> cf2= cf.exceptionally((param)->{ System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("error stack trace->"); param.printStackTrace(); System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis()); return -1.1; }); //cf正常执行时执行的逻辑,如果执行异常则不调用此逻辑 CompletableFuture cf3=cf.thenAccept((param)->{ System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis()); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("param->"+param); System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis()); }); System.out.println("main thread start,time->"+System.currentTimeMillis()); //等待子任务执行完成,此处无论是job2和job3都可以实现job2退出,主线程才退出,如果是cf,则主线程不会等待job2执行完成自动退出了 //cf2.get时,没有异常,但是依然有返回值,就是cf的返回值 System.out.println("run result->"+cf2.get()); System.out.println("main thread exit,time->"+System.currentTimeMillis()); }
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法:
如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致;
如果该任务正常执行,则get方法返回执行结果;
如果是执行异常,则get方法抛出异常
跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务
区别在于:
thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;
thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;
runAfterBoth没有入参,也没有返回值。
注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务
其区别在于:
applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;
acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;
runAfterEither没有方法入参,也没有返回值。
注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例:
如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;
如果该CompletableFuture实例为null,则,然后执行这个新任务
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。