Java Async

JDK8 - CompletableFuture 非同步處理簡介

劉明凱 Matt Liu 2019/11/11 15:26:50
4180

前言

在編寫非同步處理的程式時,若遇到較複雜的連續組合或者Exception Handling等等的狀況,

很容易就會寫出可讀性不佳的程式碼。在JDK 8中,java.util.concurret新增了CompletableFuture,

它提供了許多強大的功能讓非同步處理的調用變得更加便利且清晰。

 

使用

1.創建非同步任務

我們可以透過CompletableFuture提供的 runAsync 及 supplyAsync 兩個靜態方法來創建非同步的任務

而兩個方法主要的差別是:

runAsync - 沒有回傳值

supplyAsync - 有回傳值

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

如果我們在調用runAsync及supplyAsync時,若沒有特別指定調用的執行緒池的話,

預設則會使用 ForkJoinPool ,當然我們也可以使用自訂的執行緒池。

如果在方法中帶入我們自訂的Executor,CompletableFuture將會調用我們自定義的執行緒池去創建非同步任務,

並且可以使用 get() 來取得它的回傳結果。

以下是簡單的例子:

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
			LOGGER.info("supplyAsync START");
			LOGGER.info("say Hello World");
			return "Hello World";
		});

		LOGGER.info(hello.get());

 

沒有指定 Executor的話,則會取用預設的Executor,若我們額外設定Executor的話,它將會取用我們所定義的

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
			LOGGER.info("supplyAsync START");
			LOGGER.info("say Hello World");
			return "Hello World";
		}, sampleExecutor);

這樣我們就可以讓 CompletableFuture 使用自定義的Executor執行非同步任務。

 

2.同時執行多個非同步任務

上面已經成功利用 CompletableFuture 創建非同步任務了,

當然也可以同時創建多個非同步的任務來同時執行,

所以利用簡單的範例來測試是否同時使用多個 CompletableFuture 時,可以同時執行不同任務。

	CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
			try {
				LOGGER.info("supplyAsync 1 Sleep");
				Thread.sleep(500);
				LOGGER.info("supplyAsync 1 wake up");
			} catch (InterruptedException e) {
			}
			LOGGER.info("say Hello");
			return "Hello";
		}, sampleExecutor);
	CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
			try {
				LOGGER.info("supplyAsync 2 Sleep");
				Thread.sleep(500);
				LOGGER.info("supplyAsync 2 wake up");
			} catch (InterruptedException e) {
			}
			LOGGER.info("say world");
			return "World";
		}, sampleExecutor);

		LOGGER.info("All OK");

執行後可以看到,CompletableFuture 使用了兩個 sample-thread執行,並且同時正在執行自己的任務,

也發現到主執行緒也同時執行自己的工作,因此如果遇到需要等待子執行緒結束的時候再繼續執行的話,

我們將可以利用CompletableFuture 的allOf以及join的功能來等待它們結束。

		CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
			try {
				LOGGER.info("supplyAsync 1 Sleep");
				Thread.sleep(500);
				LOGGER.info("supplyAsync 1 wake up");
			} catch (InterruptedException e) {
			}
			LOGGER.info("say Hello");
			return "Hello";
		}, sampleExecutor);
		
		CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
			try {
				LOGGER.info("supplyAsync 2 Sleep");
				Thread.sleep(500);
				LOGGER.info("supplyAsync 2 wake up");
			} catch (InterruptedException e) {
			}
			LOGGER.info("say world");
			return "World";
		}, sampleExecutor);
		
		
		CompletableFuture allOf = CompletableFuture.allOf(hello, world);
		allOf.join();
		

		LOGGER.info("All OK");

加上allOf及join之後,就可以使主執行緒等待allOf裡面所帶入的非同步任務結束後再繼續執行,

主執行緒就不會在子執行緒還在執行的時候就搶跑了。

 

3.whenComplete

我們也可以利用CompletableFuture裡的whenComplete來使非同步任務結束之後執行他後續的程式,

像下方簡單的範例可以看到,在whenComplete裡的程式會等到supplyAsync結束之後,

將supplyAsync回傳的內容代入whenComplete裡,並繼續處理後面的程序。

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
			LOGGER.info("say Hello");
			return "Hello";
		}, sampleExecutor).whenComplete((ok , e) -> {
			if(e != null ) {
				LOGGER.error("e : ", e);
			}
			LOGGER.info("whenComplete : " + ok);
			
		});

 

可以看到whenComplete會在supplyAsync結束後取得其回傳值並開始後續的作業,

若希望在完成後繼續以Aysnc執行任務的話,也可以使用whenCompleteAsync來繼續執行,

這樣就可以在完成第一個非同步任務之後繼續處理後續的任務,也不會使程式碼看起來太過雜亂且複雜。

 

結論

CompletableFuture 實現了Future<T>, CompletionStage<T>後,

更進一步的整合及擴展其功能使非同步處理更加得容易。

像是completeExceptionally、whenComplete等等,許多功能讓開發變得更加好控制,並且讓程式更加得簡潔易讀,

也可以結合ThreadPoolTaskExecutor來自行定義專案內的執行緒池,在非同步處理上相當好用且強大。

劉明凱 Matt Liu