CompletableFuture是1.8引入的新特性,一些比較複雜的非同步計算場景,尤其是需要串連多個非同步運算單元的場景,可以考慮使用CompletableFuture 來實作。
在現實世界中,我們需要解決的複雜問題都是要分為若干步驟。就像我們的程式碼一樣,在一個複雜的邏輯方法中,會呼叫多個方法來一步一步實作。
設想如下場景,植樹節要進行植樹,分為下面幾個步驟:
#挖洞10 分鐘
拿樹苗5 分鐘
種樹苗20 分鐘
#澆水5 分鐘
40 * 100 = 4000 分鐘。這種方式對應到程序,就是單執行緒同步執行。
前文說到我們不可能通過100個線程並發來執行任務,所以一般情況下我們都會使用線程池,這和上面的設計思想不謀而合。使用線程池後,由於第四種方式把步驟拆的更細,提高了並發的可能性。因此速度會比第二種方式更快。那麼和第三種比起來,哪種比較快呢?如果執行緒數可以無窮大,這兩個方法所能達到的最短時間是一樣的,都是 35 分鐘。不過在線程有限的情況下,第四種方式對線程的使用率會更高,因為每個步驟都可以並行執行(參與種樹的人完成自己的工作後,都可以去幫助其他人),線程的調度更為靈活,所以執行緒池中的執行緒很難閒下來,一直保持在運作中。是的,誰都不能偷懶。而第三種由於只能並發在 plantTree 方法及挖坑和拿樹苗,所以不如第四種方式靈活
上文講了這麼多,主要是要說明 CompletableFuture 出現的原因。他用來把複雜任務拆解為一個個銜接的非同步執行步驟,從而提升整體的效率。我們回一下小節題目:誰都不能偷懶。沒錯,這就是 CompletableFuture 要達到的效果,透過計算單元的抽象,讓執行緒能夠有效率的並發參與每一個步驟。同步的程式碼透過 CompletableFuture 可以完全改造為非同步程式碼。下面我們就來看看如何使用 CompletableFuture。
CompletableFuture 實作了 Future 介面並且實作了 CompletionStage 介面。 Future 介面我們已經很熟悉了,而CompletionStage 介面定了非同步運算步驟之間的規範,這樣確保一步一步能夠銜接上。 CompletionStage 定義了38 個 public 的方法用於非同步計算步驟間的銜接。接下來我們會挑選一些常用的,相對使用頻率較高的方法,來看看如何使用。
如果你已經知道 CompletableFuture 的計算結果,可以使用靜態方法 completedFuture。傳入計算結果,聲明CompletableFuture 物件。呼叫 get 方法時會立即傳回傳入的計算結果,不會被阻塞,如下程式碼:
public static void main(String[] args) throws Exception{ CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Hello World"); System.out.println("result is " + completableFuture.get()); } // result is Hello World
是不是覺得這種用法沒有什麼意義?既然知道計算結果了,直接使用就好了,為什麼還要透過 CompletableFuture 包裝呢?這是因為非同步計算單元需要透過 CompletableFuture 進行銜接,所以有的時候我們即使已經知道計算結果,也需要包裝為 CompletableFuture,才能融入到非同步計算的流程之中。
這是我們最常用的方式。把需要非同步計算的邏輯封裝為一個計算單元,交由 CompletableFuture 運作。如下面的程式碼:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成"); System.out.println("result is " + completableFuture.get()); } // result is 挖坑完成
這裡我們使用了 CompletableFuture 的 supplyAsync 方法,以 lambda 表達式的方式向其傳遞了一個 supplier 介面的實作。
可見 completableFuture.get()
拿到的計算結果就是你傳入函數執行後 return 的值。那如果你有需要非同步運算的邏輯,那就可以放到 supplyAsync 傳入的函數體中。這段函數是如何被非同步執行的呢?如果你跟入程式碼可以看到其實 supplyAsync 是透過 Executor,也就是線程池來運行這段函數的。 completableFuture 預設使用的是ForkJoinPool,當然你也可以透過為 supplyAsync 指定其他 Excutor,透過第二個參數傳入 supplyAsync 方法。
supplyAsync 使用場景非常多,舉個簡單的例子,主程式需要呼叫多個微服務的介面請求數據,那麼就可以啟動多個CompletableFuture,呼叫supplyAsync,函數體中是關於不同介面的調用邏輯。這樣不同的介面請求就可以非同步同時運行,最後再等全部介面返回時,執行後面的邏輯。
supplyAsync 接收的函數是有傳回值的。有些情況我們只是一段計算過程,並不需要回傳值。這就像 Runnable 的run 方法,並沒有回傳值。這個情況我們可以使用 runAsync方法,如下面的程式碼:
public static void main(String[] args) throws Exception { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> System.out.println("挖坑完成")); completableFuture.get(); } // 挖坑完成
runAsync 接收 runnable 介面的函數。所以並無回傳值。栗子中的邏輯只是列印“挖坑完成”。
當我們透過supplyAsync 完成了非同步計算,傳回CompletableFuture,此時可以繼續對傳回結果進行加工,如下面的程式碼:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenApply(s -> s + ", 并且归还铁锹") .thenApply(s -> s + ", 全部完成。"); System.out.println("result is " + completableFuture.get()); } // result is 挖坑完成, 并且归还铁锹, 全部完成。
在呼叫supplyAsync 後,我們兩次鍊式呼叫thenApply 方法。 s 是前一步 supplyAsync 回傳的計算結結果,我們對結算結果進行了兩次再加工。我們可以透過 thenApply 不斷對計算結果進行加工處理。如果想要非同步運行 thenApply 的邏輯,可以使用 thenApplyAsync。使用方法相同,只不過會透過執行緒池異步運行。
这种场景你可以使用thenApply。这个方法可以让你处理上一步的返回结果,但无返回值。参照如下代码:
public static void main(String[] args) throws Exception { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenAccept(s -> System.out.println(s + ", 并且归还铁锹")); completableFuture.get(); }
这里可以看到 thenAccept 接收的函数没有返回值,只有业务逻辑。处理后返回 CompletableFuture 类型对象。
此时你可以使用 thenRun 方法,他接收 Runnable 的函数,没有输入也没有输出,仅仅是在异步计算结束后回调一段逻辑,比如记录 log 等。参照下面代码:
public static void main(String[] args) throws Exception { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenAccept(s -> System.out.println(s + ", 并且归还铁锹")) .thenRun(() -> System.out.println("挖坑工作已经全部完成")); completableFuture.get(); } // 挖坑完成, 并且归还铁锹 // 挖坑工作已经全部完成
可以看到在 thenAccept 之后继续调用了 thenRun,仅仅是打印了日志而已
我们可以把两个 CompletableFuture 组合起来使用,如下面的代码:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ", 并且归还铁锹")); System.out.println("result is " + completableFuture.get()); } // result is 挖坑完成, 并且归还铁锹
thenApply 和 thenCompose 的关系就像 stream中的 map 和 flatmap。从上面的例子来看,thenApply 和thenCompose 都可以实现同样的功能。但是如果你使用一个第三方的库,有一个API返回的是CompletableFuture 类型,那么你就只能使用 thenCompose方法。
如果你有两个异步操作互相没有依赖,但是第三步操作依赖前两部计算的结果,那么你可以使用 thenCombine 方法来实现,如下面代码:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "挖坑完成") .thenCombine(CompletableFuture.supplyAsync(() -> ", 拿树苗完成"), (x, y) -> x + y + "植树完成"); System.out.println("result is " + completableFuture.get()); } // result is 挖坑完成, 拿树苗完成植树完成
挖坑和拿树苗可以同时进行,但是第三步植树则祖尧前两步完成后才能进行。
可以看到符合我们的预期。使用场景之前也提到过。我们调用多个微服务的接口时,可以使用这种方式进行组合。处理接口调用间的依赖关系。 当你需要两个 Future 的结果,但是不需要再加工后向下游传递计算结果时,可以使用 thenAcceptBoth,用法一样,只不过接收的函数没有返回值。
假如我们对微服务接口的调用不止两个,并且还有一些其它可以异步执行的逻辑。主流程需要等待这些所有的异步操作都返回时,才能继续往下执行。此时我们可以使用 CompletableFuture.allOf 方法。它接收 n 个 CompletableFuture,返回一个 CompletableFuture。对其调用 get 方法后,只有所有的 CompletableFuture 全完成时才会继续后面的逻辑。我们看下面示例代码:
public static void main(String[] args) throws Exception { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("挖坑完成"); }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("取树苗完成"); }); CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("取肥料完成"); }); CompletableFuture.allOf(future1, future2, future3).get(); System.out.println("植树准备工作完成!"); } // 挖坑完成 // 取肥料完成 // 取树苗完成 // 植树准备工作完成!
在异步计算链中的异常处理可以采用 handle 方法,它接收两个参数,第一个参数是计算及过,第二个参数是异步计算链中抛出的异常。使用方法如下:
public static void main(String[] args) throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { if (1 == 1) { throw new RuntimeException("Computation error"); } return "挖坑完成"; }).handle((result, throwable) -> { if (result == null) { return "挖坑异常"; } return result; }); System.out.println("result is " + completableFuture.get()); } // result is 挖坑异常
代码中会抛出一个 RuntimeException,抛出这个异常时 result 为 null,而 throwable 不为null。根据这些信息你可以在 handle 中进行处理,如果抛出的异常种类很多,你可以判断 throwable 的类型,来选择不同的处理逻辑。
以上是java多執行緒怎麼透過CompletableFuture組裝異步計算單元的詳細內容。更多資訊請關注PHP中文網其他相關文章!