Java8 異步非阻塞做法:CompletableFuture 兩萬字詳解!


CompletableFuture 實現了 CompletionStage 接口和 Future 接口,前者是對後者的一個擴展,增加了異步回調、流式處理、多個 Future 組合處理的能力,使 Java 在處理多任務的協同工作時更加順暢便利。

一、創建異步任務

1、Future.submit

通常的線程池接口類 ExecutorService,其中 execute 方法的返回值是 void,即無法獲取異步任務的執行狀態,3 個重載的 submit 方法的返回值是 Future,可以據此獲取任務執行的狀態和結果,示例如下:

@Test
public void test3() throws Exception {
    // 創建異步執行任務:
    ExecutorService executorService= Executors.newSingleThreadExecutor();
    Future<Double> cf = executorService.submit(()->{
        System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        if(false){
            throw new RuntimeException("test");
        }else{
            System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
            return 1.2;
        }
    });
    System.out.println("main thread start,time->"+System.currentTimeMillis());
    //等待子任務執行完成,如果已完成則直接返回結果
    //如果執行任務異常,則get方法會把之前捕獲的異常重新拋出
    System.out.println("run result->"+cf.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

執行結果如下:

子線程是異步執行的,主線程休眠等待子線程執行完成,子線程執行完成後喚醒主線程,主線程獲取任務執行結果後退出。

很多博客說使用不帶等待時間限制的 get 方法時,如果子線程執行異常了會導致主線程長期阻塞,這其實是錯誤的,子線程執行異常時其異常會被捕獲,然後修改任務的狀態爲異常結束並喚醒等待的主線程,get 方法判斷任務狀態發生變更,就終止等待了,並拋出異常。將上述用例中 if(false) 改成 if(true) ,執行結果如下:

get 方法拋出異常導致主線程異常終止。

2、supplyAsync / runAsync

supplyAsync 表示創建帶返回值的異步任務的,相當於ExecutorService submit(Callable<T> task) 方法,runAsync 表示創建無返回值的異步任務,相當於ExecutorService submit(Runnable task)方法,這兩方法的效果跟 submit 是一樣的,測試用例如下:

@Test
public void test2() throws Exception {
    // 創建異步執行任務,有返回值
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        if(true){
            throw new RuntimeException("test");
        }else{
            System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
            return 1.2;
        }
    });
    System.out.println("main thread start,time->"+System.currentTimeMillis());
    //等待子任務執行完成
    System.out.println("run result->"+cf.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

Test
public void test4() throws Exception {
    // 創建異步執行任務,無返回值
    CompletableFuture cf = CompletableFuture.runAsync(()->{
        System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        if(false){
            throw new RuntimeException("test");
        }else{
            System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
        }
    });
    System.out.println("main thread start,time->"+System.currentTimeMillis());
    //等待子任務執行完成
    System.out.println("run result->"+cf.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

這兩方法各有一個重載版本,可以指定執行異步任務的 Executor 實現,如果不指定,默認使用ForkJoinPool.commonPool(),如果機器是單核的,則默認使用 ThreadPerTaskExecutor,該類是一個內部類,每次執行 execute 都會創建一個新線程。測試用例如下:

@Test
 public void test2() throws Exception {
     ForkJoinPool pool=new ForkJoinPool();
     // 創建異步執行任務:
     CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
         System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
         }
         if(true){
             throw new RuntimeException("test");
         }else{
             System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
             return 1.2;
         }
     },pool);
     System.out.println("main thread start,time->"+System.currentTimeMillis());
     //等待子任務執行完成
     System.out.println("run result->"+cf.get());
     System.out.println("main thread exit,time->"+System.currentTimeMillis());
 }

@Test
 public void test4() throws Exception {
     ExecutorService executorService= Executors.newSingleThreadExecutor();
     // 創建異步執行任務:
     CompletableFuture cf = CompletableFuture.runAsync(()->{
         System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
         }
         if(false){
             throw new RuntimeException("test");
         }else{
             System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
         }
     },executorService);
     System.out.println("main thread start,time->"+System.currentTimeMillis());
     //等待子任務執行完成
     System.out.println("run result->"+cf.get());
     System.out.println("main thread exit,time->"+System.currentTimeMillis());
 }

二、異步回調

1、thenApply / thenApplyAsync

thenApply 表示某個任務執行完成後執行的動作,即回調方法,會將該任務的執行結果即方法返回值作爲入參傳遞到回調方法中,測試用例如下:

@Test
public void test5() throws Exception {
    ForkJoinPool pool=new ForkJoinPool();
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
        return 1.2;
    },pool);
    //cf關聯的異步任務的返回值作爲方法入參,傳入到thenApply的方法中
    //thenApply這裏實際創建了一個新的CompletableFuture實例
    CompletableFuture<String> cf2=cf.thenApply((result)->{
        System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
        return "test:"+result;
    });
    System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
    //等待子任務執行完成
    System.out.println("run result->"+cf.get());
    System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
    System.out.println("run result->"+cf2.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

其執行結果如下:

job1 執行結束後,將 job1 的方法返回值作爲入參傳遞到 job2 中並立即執行 job2。

thenApplyAsync 與 thenApply 的區別在於,前者是將 job2 提交到線程池中異步執行,實際執行 job2 的線程可能是另外一個線程,後者是由執行 job1 的線程立即執行 job2,即兩個 job 都是同一個線程執行的。將上述測試用例中 thenApply 改成 thenApplyAsync 後,執行結果如下:

從輸出可知,執行 job1 和 job2 是兩個不同的線程。thenApplyAsync 有一個重載版本,可以指定執行異步任務的 Executor 實現,如果不指定,默認使用ForkJoinPool.commonPool()

下述的多個方法,每個方法都有兩個以 Async 結尾的方法,一個使用默認的 Executor 實現,一個使用指定的 Executor 實現,不帶 Async 的方法是由觸發該任務的線程執行該任務,帶 Async 的方法是由觸發該任務的線程將任務提交到線程池,執行任務的線程跟觸發任務的線程不一定是同一個。

2、thenAccept / thenRun

thenAccept 同 thenApply 接收上一個任務的返回值作爲參數,但是無返回值;thenRun 的方法沒有入參,也買有返回值,測試用例如下:

@Test
public void test6() throws Exception {
    ForkJoinPool pool=new ForkJoinPool();
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
        return 1.2;
    },pool);
    //cf關聯的異步任務的返回值作爲方法入參,傳入到thenApply的方法中
    CompletableFuture cf2=cf.thenApply((result)->{
        System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
        return "test:"+result;
    }).thenAccept((result)-> { //接收上一個任務的執行結果作爲入參,但是沒有返回值
        System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(result);
        System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
    }).thenRun(()->{ //無入參,也沒有返回值
        System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println("thenRun do something");
        System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
    });
    System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
    //等待子任務執行完成
    System.out.println("run result->"+cf.get());
    System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
    //cf2 等待最後一個thenRun執行完成
    System.out.println("run result->"+cf2.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

其執行結果如下:

3、 exceptionally

exceptionally 方法指定某個任務執行異常時執行的回調方法,會將拋出異常作爲參數傳遞到回調方法中,如果該任務正常執行則會 exceptionally 方法返回的 CompletionStage 的 result 就是該任務正常執行的結果,測試用例如下:

@Test
public void test2() throws Exception {
    ForkJoinPool pool=new ForkJoinPool();
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        if(true){
            throw new RuntimeException("test");
        }else{
            System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
            return 1.2;
        }
    },pool);
    //cf執行異常時,將拋出的異常作爲入參傳遞給回調方法
    CompletableFuture<Double> cf2= cf.exceptionally((param)->{
         System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println("error stack trace->");
        param.printStackTrace();
        System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
         return -1.1;
    });
    //cf正常執行時執行的邏輯,如果執行異常則不調用此邏輯
    CompletableFuture cf3=cf.thenAccept((param)->{
        System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println("param->"+param);
        System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
    });
    System.out.println("main thread start,time->"+System.currentTimeMillis());
    //等待子任務執行完成,此處無論是job2和job3都可以實現job2退出,主線程才退出,如果是cf,則主線程不會等待job2執行完成自動退出了
    //cf2.get時,沒有異常,但是依然有返回值,就是cf的返回值
    System.out.println("run result->"+cf2.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

其輸出如下:

拋出異常後,只有 cf2 執行了,cf3 沒有執行。將上述示例中的 if(true) 改成 if(false),其輸出如下:

cf2 沒有指定,其 result 就是 cf 執行的結果,理論上 cf2.get 應該立即返回的,此處是等待了 cf3,即 job2 執行完成後才返回,具體原因且待下篇源碼分析時再做探討。

4、whenComplete

whenComplete 是當某個任務執行完成後執行的回調方法,會將執行結果或者執行期間拋出的異常傳遞給回調方法,如果是正常執行則異常爲 null,回調方法對應的 CompletableFuture 的 result 和該任務一致,如果該任務正常執行,則 get 方法返回執行結果,如果是執行異常,則 get 方法拋出異常。測試用例如下:

@Test
public void test10() throws Exception {
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        if(false){
            throw new RuntimeException("test");
        }else{
            System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
            return 1.2;
        }
    });
    //cf執行完成後會將執行結果和執行過程中拋出的異常傳入回調方法,如果是正常執行的則傳入的異常爲null
    CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{
        System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        if(b!=null){
            System.out.println("error stack trace->");
            b.printStackTrace();
        }else{
            System.out.println("run succ,result->"+a);
        }
        System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
    });
    //等待子任務執行完成
    System.out.println("main thread start wait,time->"+System.currentTimeMillis());
    //如果cf是正常執行的,cf2.get的結果就是cf執行的結果
    //如果cf是執行異常,則cf2.get會拋出異常
    System.out.println("run result->"+cf2.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

執行結果如下:

將上述示例中的 if(false) 改成 if(true),其輸出如下:

5、handle

跟 whenComplete 基本一致,區別在於 handle 的回調方法有返回值,且 handle 方法返回的 CompletableFuture 的 result 是回調方法的執行結果或者回調方法執行期間拋出的異常,與原始 CompletableFuture 的 result 無關了。測試用例如下:

 @Test
public void test10() throws Exception {
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        if(true){
            throw new RuntimeException("test");
        }else{
            System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
            return 1.2;
        }
    });
    //cf執行完成後會將執行結果和執行過程中拋出的異常傳入回調方法,如果是正常執行的則傳入的異常爲null
    CompletableFuture<String> cf2=cf.handle((a,b)->{
        System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        if(b!=null){
            System.out.println("error stack trace->");
            b.printStackTrace();
        }else{
            System.out.println("run succ,result->"+a);
        }
        System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
        if(b!=null){
            return "run error";
        }else{
            return "run succ";
        }
    });
    //等待子任務執行完成
    System.out.println("main thread start wait,time->"+System.currentTimeMillis());
    //get的結果是cf2的返回值,跟cf沒關係了
    System.out.println("run result->"+cf2.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

其執行結果如下:

將上述示例中的 if(true) 改成 if(false),其輸出如下:

三、組合處理

1、thenCombine / thenAcceptBoth / runAfterBoth

這三個方法都是將兩個 CompletableFuture 組合起來,只有這兩個都正常執行完了纔會執行某個任務,區別在於,thenCombine 會將兩個任務的執行結果作爲方法入參傳遞到指定方法中,且該方法有返回值;

thenAcceptBoth 同樣將兩個任務的執行結果作爲方法入參,但是無返回值;runAfterBoth 沒有入參,也沒有返回值。注意兩個任務中只要有一個執行異常,則將該異常信息作爲指定任務的執行結果。測試用例如下:

@Test
public void test7() throws Exception {
    ForkJoinPool pool=new ForkJoinPool();
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
        return 1.2;
    });
    CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
        return 3.2;
    });
    //cf和cf2的異步任務都執行完成後,會將其執行結果作爲方法入參傳遞給cf3,且有返回值
    CompletableFuture<Double> cf3=cf.thenCombine(cf2,(a,b)->{
        System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
        System.out.println("job3 param a->"+a+",b->"+b);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
        return a+b;
    });

    //cf和cf2的異步任務都執行完成後,會將其執行結果作爲方法入參傳遞給cf3,無返回值
    CompletableFuture cf4=cf.thenAcceptBoth(cf2,(a,b)->{
        System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
        System.out.println("job4 param a->"+a+",b->"+b);
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
    });

    //cf4和cf3都執行完成後,執行cf5,無入參,無返回值
    CompletableFuture cf5=cf4.runAfterBoth(cf3,()->{
        System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("cf5 do something");
        System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
    });

    System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
    //等待子任務執行完成
    System.out.println("cf run result->"+cf.get());
    System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
    System.out.println("cf5 run result->"+cf5.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

其運行結果如下:

job1 和 job2 幾乎同時運行,job2 比 job1 先執行完成,等 job1 退出後,job3 和 job4 幾乎同時開始運行,job4 先退出,等 job3 執行完成,job5 開始了,等 job5 執行完成後,主線程退出。

2、applyToEither / acceptEither / runAfterEither

這三個方法都是將兩個 CompletableFuture 組合起來,只要其中一個執行完了就會執行某個任務,其區別在於 applyToEither 會將已經執行完成的任務的執行結果作爲方法入參,並有返回值;

acceptEither 同樣將已經執行完成的任務的執行結果作爲方法入參,但是沒有返回值;runAfterEither 沒有方法入參,也沒有返回值。注意兩個任務中只要有一個執行異常,則將該異常信息作爲指定任務的執行結果。測試用例如下:

@Test
public void test8() throws Exception {
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
        return 1.2;
    });
    CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
        return 3.2;
    });
    //cf和cf2的異步任務都執行完成後,會將其執行結果作爲方法入參傳遞給cf3,且有返回值
    CompletableFuture<Double> cf3=cf.applyToEither(cf2,(result)->{
        System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
        System.out.println("job3 param result->"+result);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
        return result;
    });

    //cf和cf2的異步任務都執行完成後,會將其執行結果作爲方法入參傳遞給cf3,無返回值
    CompletableFuture cf4=cf.acceptEither(cf2,(result)->{
        System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
        System.out.println("job4 param result->"+result);
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
    });

    //cf4和cf3都執行完成後,執行cf5,無入參,無返回值
    CompletableFuture cf5=cf4.runAfterEither(cf3,()->{
        System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("cf5 do something");
        System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
    });

    System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
    //等待子任務執行完成
    System.out.println("cf run result->"+cf.get());
    System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
    System.out.println("cf5 run result->"+cf5.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

其運行結果如下:

job1 和 job2 同時開始運行,job2 先執行完成,然後 job4 開始執行,理論上 job3 和 job4 應該同時開始運行,但是此時只有 job4 開始執行了,job3 是等待 job1 執行完成後纔開始執行,job4 先於 job3 執行完成,然後 job5 開始執行,等 job5 執行完成後,主線程退出。上述差異且到下篇源碼分析時再做探討。

3、thenCompose

thenCompose 方法會在某個任務執行完成後,將該任務的執行結果作爲方法入參然後執行指定的方法,該方法會返回一個新的 CompletableFuture 實例,如果該 CompletableFuture 實例的 result 不爲 null,則返回一個基於該 result 的新的 CompletableFuture 實例;如果該 CompletableFuture 實例爲 null,則,然後執行這個新任務,測試用例如下:

@Test
public void test9() throws Exception {
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
        return 1.2;
    });
    CompletableFuture<String> cf2= cf.thenCompose((param)->{
        System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
        return CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
            return "job3 test";
        });
    });
    System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
    //等待子任務執行完成
    System.out.println("cf run result->"+cf.get());
    System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
    System.out.println("cf2 run result->"+cf2.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

其輸出如下:

job1 執行完成後 job2 開始執行,等 job2 執行完成後會把 job3 返回,然後執行 job3,等 job3 執行完成後,主線程退出。

4、allOf / anyOf

allOf 返回的 CompletableFuture 是多個任務都執行完成後纔會執行,只有有一個任務執行異常,則返回的 CompletableFuture 執行 get 方法時會拋出異常,如果都是正常執行,則 get 返回 null。

@Test
public void test11() throws Exception {
    // 創建異步執行任務:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
        return 1.2;
    });
    CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
        }
        System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
        return 3.2;
    });
    CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
        try {
            Thread.sleep(1300);
        } catch (InterruptedException e) {
        }
          throw new RuntimeException("test");
        System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
        return 2.2;
    });
    //allof等待所有任務執行完成才執行cf4,如果有一個任務異常終止,則cf4.get時會拋出異常,都是正常執行,cf4.get返回null
    //anyOf是隻有一個任務執行完成,無論是正常執行或者執行異常,都會執行cf4,cf4.get的結果就是已執行完成的任務的執行結果
    CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{
       if(b!=null){
           System.out.println("error stack trace->");
           b.printStackTrace();
       }else{
           System.out.println("run succ,result->"+a);
       }
    });

    System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis());
    //等待子任務執行完成
    System.out.println("cf4 run result->"+cf4.get());
    System.out.println("main thread exit,time->"+System.currentTimeMillis());
}

其輸出如下:

主線程等待最後一個 job1 執行完成後退出。anyOf 返回的 CompletableFuture 是多個任務只要其中一個執行完成就會執行,其 get 返回的是已經執行完成的任務的執行結果,如果該任務執行異常,則拋出異常。將上述測試用例中 allOf 改成 anyOf 後,其輸出如下:

轉自:孫大聖 666

鏈接:https://blog.csdn.net/qq_31865983/article/details/106137777

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/W3qlMyPc_DZhhy86u18ivw