首頁 > Java > java教程 > Java8 CompletableFuture 如何實作非同步多執行緒程式設計?

Java8 CompletableFuture 如何實作非同步多執行緒程式設計?

WBOY
發布: 2023-04-27 08:22:06
轉載
2391 人瀏覽過

1、一個範例回顧Future

一些業務場景我們需要使用多執行緒非同步執行任務,加快任務執行速度。

JDK5新增了Future接口,用來描述一個非同步計算的結果。

雖然Future 以及相關使用方法提供了異步執行任務的能力,但是對於結果的獲取卻是很不方便,我們必須使用Future.get()的方式阻塞調用線程,或者使用輪詢方式判斷Future.isDone 任務是否結束,再取得結果。

這兩種處理方式都不是很優雅,相關程式碼如下:

    @Test
    public void testFuture() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<String> future = executorService.submit(() -> {
            Thread.sleep(2000);
            return "hello";
        });
        System.out.println(future.get());
        System.out.println("end");
    }
登入後複製

同時,Future無法解決多個非同步任務需要相互依賴的場景,簡單點說就是,主執行緒需要等待子執行緒任務執行完畢之後在進行執行,這個時候你可能想到了「CountDownLatch」,沒錯確實可以解決,程式碼如下。

這裡定義兩個Future,第一個透過使用者id取得使用者訊息,第二個透過商品id取得商品資訊。

    @Test
    public void testCountDownLatch() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CountDownLatch downLatch = new CountDownLatch(2);
        long startTime = System.currentTimeMillis();
        Future<String> userFuture = executorService.submit(() -> {
            //模拟查询商品耗时500毫秒
            Thread.sleep(500);
            downLatch.countDown();
            return "用户A";
        });
 
        Future<String> goodsFuture = executorService.submit(() -> {
            //模拟查询商品耗时500毫秒
            Thread.sleep(400);
            downLatch.countDown();
            return "商品A";
        });
 
        downLatch.await();
        //模拟主程序耗时时间
        Thread.sleep(600);
        System.out.println("获取用户信息:" + userFuture.get());
        System.out.println("获取商品信息:" + goodsFuture.get());
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
 
    }
登入後複製

「運行結果」

取得使用者資訊:用戶A
取得商品資訊:商品A
總共用時1110ms

從執行結果可以看出結果都已經獲取,而且如果我們不用非同步操作,執行時間應該是:500 400 600 = 1500,用非同步操作後實際只用1110。

但Java8以後我不在認為這是一種優雅的解決方式,接下來來了解下CompletableFuture的使用。

2、透過CompletableFuture實作上面範例

    @Test
    public void testCompletableInfo() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
 
        //调用用户服务获取用户基本信息
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
                //模拟查询商品耗时500毫秒
        {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "用户A";
        });
 
        //调用商品服务获取商品基本信息
        CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
                //模拟查询商品耗时500毫秒
        {
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品A";
        });
 
        System.out.println("获取用户信息:" + userFuture.get());
        System.out.println("获取商品信息:" + goodsFuture.get());
 
        //模拟主程序耗时时间
        Thread.sleep(600);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }
登入後複製

運行結果

取得使用者資訊:使用者A
取得商品資訊:商品A
總共用時1112ms

透過CompletableFuture可以很輕鬆的實作CountDownLatch的功能,你以為這就結束了,遠遠不止,CompletableFuture比這要強多了。

例如可以實現:任務1執行完了再執行任務2,甚至任務1執行的結果,作為任務2的入參數等等強大功能,下面就來學學CompletableFuture的API。

3、CompletableFuture建立方式

3.1、常用的4種建立方式

CompletableFuture原始碼中有四個靜態方法用來執行非同步任務

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
登入後複製

一般我們用上面的靜態方法來建立CompletableFuture,這裡也解釋下他們的差異:

  • #「supplyAsync」執行任務,支援傳回值。

  • 「runAsync」執行任務,沒有回傳值。

3.1.1、「supplyAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定义线程,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
登入後複製

3.1.2、「runAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) 
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)
登入後複製

3.2、結果取得的4種方式

對於結果的取得CompltableFuture類別提供了四種方式

//方式一
public T get()
//方式二
public T get(long timeout, TimeUnit unit)
//方式三
public T getNow(T valueIfAbsent)
//方式四
public T join()
登入後複製

說明:

  • 「get()和get(long timeout , TimeUnit unit)” => 在Future中就已經提供了,後者提供超時處理,如果在指定時間內未獲取結果將拋出超時異常

  • “getNow” => 立即取得結果不阻塞,結果計算已完成將傳回結果或計算過程中的異常,如果未計算完成將傳回設定的valueIfAbsent值

  • #「join」 => 方法裡不會拋出例外

範例

    @Test
    public void testCompletableGet() throws InterruptedException, ExecutionException {
 
        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品A";
        });
 
        // getNow方法测试 
        System.out.println(cp1.getNow("商品B"));
 
        //join方法测试 
        CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));
        System.out.println(cp2.join());
        System.out.println("-----------------------------------------------------");
        //get方法测试
        CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));
        System.out.println(cp3.get());
    }
登入後複製

「運行結果」:

  • 第一個執行結果為 「商品B」,因為要先睡1秒結果不能立即取得

  • join方法取得結果方法裡不會拋異常,但是執行結果會拋出異常,拋出的異常為CompletionException

  • get方法取得結果方法裡將拋出異常,執行結果拋出的異常為ExecutionException

4、非同步回呼方法

Java8 CompletableFuture 如何實作非同步多執行緒程式設計?

4.1、thenRun/thenRunAsync

通俗點講就是,「做完第一個任務後,再做第二個任務,第二個任務也沒有回傳值」

範例

    @Test
    public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
 
        CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {
            try {
                //执行任务A
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
 
        });
 
        CompletableFuture<Void> cp2 = cp1.thenRun(() -> {
            try {
                //执行任务B
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
 
        // get方法测试
        System.out.println(cp2.get());
 
        //模拟主程序耗时时间
        Thread.sleep(600);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }
 
    //运行结果
    /**
     *  null
     *  总共用时1610ms
     */
登入後複製

「thenRun 和thenRunAsync有什麼差別呢?」

如果你執行第一個任務的時候,傳入了一個自訂執行緒池:

  • 呼叫thenRun方法執行第二個任務時,則第二個任務和第一個任務是共用同一個執行緒池。

  • 呼叫thenRunAsync執行第二個任務時,則第一個任務使用的是你自己傳入的執行緒池,第二個任務使用的是ForkJoin執行緒池。

說明: 後面介紹的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它們之間的差異也是這個。

4.2、thenAccept/thenAcceptAsync

第一個任務執行完成後,執行第二個回呼方法任務,會將該任務的執行結果,作為入參 ,傳遞到回呼方法中,但是回呼方法是沒有回傳值的。

範例

#
    @Test
    public void testCompletableThenAccept() throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
            return "dev";
 
        });
        CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> {
            System.out.println("上一个任务的返回结果为: " + a);
        });
 
        cp2.get();
    }
登入後複製

4.3、 thenApply/thenApplyAsync

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

示例

    @Test
    public void testCompletableThenApply() throws ExecutionException, InterruptedException {
        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
            return "dev";
 
        }).thenApply((a) -> {
            if (Objects.equals(a, "dev")) {
                return "dev";
            }
            return "prod";
        });
 
        System.out.println("当前环境为:" + cp1.get());
 
        //输出: 当前环境为:dev
    }
登入後複製

5、异常回调

当CompletableFuture的任务不论是正常完成还是出现异常它都会调用「whenComplete」这回调函数。

  • 「正常完成」:whenComplete返回结果和上级任务一致,异常为null;

  • 「出现异常」:whenComplete返回结果为null,异常为上级任务的异常;

即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。

下面来看看示例

5.1、只用whenComplete

    @Test
    public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
 
            if (Math.random() < 0.5) {
                throw new RuntimeException("出错了");
            }
            System.out.println("正常结束");
            return 0.11;
 
        }).whenComplete((aDouble, throwable) -> {
            if (aDouble == null) {
                System.out.println("whenComplete aDouble is null");
            } else {
                System.out.println("whenComplete aDouble is " + aDouble);
            }
            if (throwable == null) {
                System.out.println("whenComplete throwable is null");
            } else {
                System.out.println("whenComplete throwable is " + throwable.getMessage());
            }
        });
        System.out.println("最终返回的结果 = " + future.get());
    }
登入後複製

正常完成,没有异常时:

正常结束
whenComplete aDouble is 0.11
whenComplete throwable is null
最终返回的结果 = 0.11

出现异常时:get()会抛出异常

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了

java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

5.2、whenComplete + exceptionally示例

    @Test
    public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("出错了");
            }
            System.out.println("正常结束");
            return 0.11;
 
        }).whenComplete((aDouble, throwable) -> {
            if (aDouble == null) {
                System.out.println("whenComplete aDouble is null");
            } else {
                System.out.println("whenComplete aDouble is " + aDouble);
            }
            if (throwable == null) {
                System.out.println("whenComplete throwable is null");
            } else {
                System.out.println("whenComplete throwable is " + throwable.getMessage());
            }
        }).exceptionally((throwable) -> {
            System.out.println("exceptionally中异常:" + throwable.getMessage());
            return 0.0;
        });
 
        System.out.println("最终返回的结果 = " + future.get());
    }
登入後複製

当出现异常时,exceptionally中会捕获该异常,给出默认返回值0.0。

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
exceptionally中异常:java.lang.RuntimeException: 出错了
最终返回的结果 = 0.0

6、多任务组合回调

Java8 CompletableFuture 如何實作非同步多執行緒程式設計?

6.1、AND组合关系

thenCombine / thenAcceptBoth / runAfterBoth都表示:「当任务一和任务二都完成再执行任务三」

区别在于:

  • 「runAfterBoth」 不会把执行结果当做方法入参,且没有返回值

  • 「thenAcceptBoth」: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值

  • 「thenCombine」:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值

示例

    @Test
    public void testCompletableThenCombine() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);
 
        //开启异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务2结束");
            return result;
        }, executorService);
 
        //任务组合
        CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {
            System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
            System.out.println("任务1返回值:" + f1);
            System.out.println("任务2返回值:" + f2);
            return f1 + f2;
        }, executorService);
 
        Integer res = task3.get();
        System.out.println("最终结果:" + res);
    }
登入後複製

「运行结果」

异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:18
异步任务2结束
执行任务3,当前线程是:19
任务1返回值:2
任务2返回值:2
最终结果:4

6.2、OR组合关系

applyToEither / acceptEither / runAfterEither 都表示:「两个任务,只要有一个任务完成,就执行任务三」

区别在于:

  • 「runAfterEither」:不会把执行结果当做方法入参,且没有返回值

  • 「acceptEither」: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值

  • 「applyToEither」:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值

示例

    @Test
    public void testCompletableEitherAsync() {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
 
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);
 
        //开启异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务2结束");
            return result;
        }, executorService);
 
        //任务组合
        task.acceptEitherAsync(task2, (res) -> {
            System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
            System.out.println("上一个任务的结果为:" + res);
        }, executorService);
    }
登入後複製

运行结果

//通过结果可以看出,异步任务2都没有执行结束,任务3获取的也是1的执行结果
异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:18
执行任务3,当前线程是:19
上一个任务的结果为:2

注意

如果把上面的核心线程数改为1也就是

ExecutorService executorService = Executors.newFixedThreadPool(1);

运行结果就是下面的了,会发现根本没有执行任务3,显然是任务3直接被丢弃了。

异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:17

6.3、多任务组合

  • 「allOf」:等待所有任务完成

  • 「anyOf」:只要有一个任务完成

示例

allOf:等待所有任务完成

    @Test
    public void testCompletableAallOf() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);
 
        //开启异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务2结束");
            return result;
        }, executorService);
 
        //开启异步任务3
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 3;
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务3结束");
            return result;
        }, executorService);
 
        //任务组合
        CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3);
 
        //等待所有任务完成
        allOf.get();
        //获取任务的返回结果
        System.out.println("task结果为:" + task.get());
        System.out.println("task2结果为:" + task2.get());
        System.out.println("task3结果为:" + task3.get());
    }
登入後複製

anyOf: 只要有一个任务完成

    @Test
    public void testCompletableAnyOf() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 1;
            return result;
        }, executorService);
 
        //开启异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 2;
            return result;
        }, executorService);
 
        //开启异步任务3
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 3;
            return result;
        }, executorService);
 
        //任务组合
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3);
        //只要有一个有任务完成
        Object o = anyOf.get();
        System.out.println("完成的任务的结果:" + o);
    }
登入後複製

7、CompletableFuture使用有哪些注意点

Java8 CompletableFuture 如何實作非同步多執行緒程式設計?

CompletableFuture 使我们的异步编程更加便利的、代码更加优雅的同时,我们也要关注下它,使用的一些注意点。

7.1、Future需要获取返回值,才能获取异常信息

    @Test
    public void testWhenCompleteExceptionally() {
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
            if (1 == 1) {
                throw new RuntimeException("出错了");
            }
            return 0.11;
        });
 
        //如果不加 get()方法这一行,看不到异常信息
        //future.get();
    }
登入後複製

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。

小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。

7.2、CompletableFuture的get()方法是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

//反例
 CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);
登入後複製

7.3、不建议使用默认线程池

CompletableFuture代码中又使用了默认的「ForkJoin线程池」,处理的线程个数是电脑「CPU核数-1」。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

7.4、自定义线程池时,注意饱和策略

CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

以上是Java8 CompletableFuture 如何實作非同步多執行緒程式設計?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:yisu.com
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板