異步編程的幾種方式,你知道幾種?

作者:Eric Fu
鏈接:https://ericfu.me/several-ways-to-aync/

近期嘗試在搬磚專用語言 Java 上實現異步,起因和過程就不再詳述了,總而言之,心中一萬頭草泥馬奔過。但這個過程也沒有白白浪費,趁機回顧了一下各種異步編程的實現。

這篇文章會涉及到回調、Promise、反應式、async/await、用戶態線程等異步編程的實現方案。如果你熟悉它們中的一兩種,那應該也能很快理解其他幾個。

爲什麼需要異步?

操作系統可以看作是個虛擬機(VM),進程生活在操作系統創造的虛擬世界裏。進程不用知道到底有多少 core 多少內存,只要進程不要索取的太過分,操作系統就假裝有無限多的資源可用。

基於這個思想,線程(Thread)的個數並不受硬件限制:你的程序可以只有一個線程、也可以有成百上千個。操作系統會默默做好調度,讓諸多線程共享有限的 CPU 時間片。這個調度的過程對線程是完全透明的。

那麼,操作系統是怎樣做到在線程無感知的情況下調度呢?答案是上下文切換(Context Switch),簡單來說,操作系統利用軟中斷機制,把程序從任意位置打斷,然後保存當前所有寄存器——包括最重要的指令寄存器 PC 和棧頂指針 SP,還有一些線程控制信息(TCB),整個過程會產生數個微秒的 overhead。

然而作爲一位合格的程序員,你一定也聽說過,線程是昂貴的:

這兩個原因驅使我們儘可能避免創建太多的線程,而異步編程的目的就是消除 IO wait 阻塞——絕大多數時候,這是我們創建一堆線程、甚至引入線程池的罪魁禍首。

Continuation

回調函數知道的人很多,但瞭解 Continuation 的人不多。Continuation 有時被晦澀地翻譯成 “計算續體”,咱們還是直接用單詞好了。

把一個計算過程在中間打斷,剩下的部分用一個對象表示,這就是 Continuation。操作系統暫停一個線程時保存的那些現場數據,也可以看作一個 Continuation。有了它,我們就能在這個點接着剛剛的斷點繼續執行。

打斷一個計算過程聽起來很厲害吧!實際上它每時每刻都在發生——假設函數 f() 中間調用了 g(),那 g() 運行完成時,要返回到 f() 剛剛調用 g() 的地方接着執行。這個過程再自然不過了,以至於所有編程語言(彙編除外)都把它掩藏起來,讓你在編程中感覺不到調用棧的存在。

操作系統用昂貴的軟中斷機制實現了棧的保存和恢復。那有沒有別的方式實現 Continuation 呢?最樸素的想法就是,把所有用得到的信息包成一個函數對象,在調用 g() 的時候一起傳進去,並約定:一旦 g() 完成,就拿着結果去調用這個 Continuation。

這種編程模式被稱爲 Continuation-passing style(CPS):

  1. 把調用者 f() 還未執行的部分包成一個函數對象 cont,一同傳給被調用者 g()

  2. 正常運行 g() 函數體;

  3. g() 完成後,連同它的結果一起回調 cont,從而繼續執行 f() 裏剩餘的代碼。

再拿 Wikipedia 上的定義鞏固一下:

A function written in continuation-passing style takes an extra argument: an explicit "continuation", i.e. a function of one argument. When the CPS function has computed its result value, it "returns" it by calling the continuation function with this value as the argument.

CPS 風格的函數帶一個額外的參數:一個顯式的 Continuation,具體來說就是個僅有一個參數的函數。當 CPS 函數計算完返回值時,它 “返回” 的方式就是拿着返回值調用那個 Continuation。

你應該已經發現了,這也就是回調函數,我只是換了個名字而已。

異步的樸素實現:Callback

光有回調函數其實並沒有卵用。對於純粹的計算工作,Call Stack 就很好,爲何要費時費力用回調來做 Continuation 呢?你說的對,但僅限於沒有 IO 的情況。我們知道 IO 通常要比 CPU 慢上好幾個數量級,在 BIO 中,線程發起 IO 之後只能暫停,然後等待 IO 完成再由操作系統喚醒。

var input = recv_from_socket()  // Block at syscall recv()
var result = calculator.calculate(input)
send_to_socket(result) // Block at syscall send()

而異步 IO 中,進程發起 IO 操作時也會一併輸入回調(也就是 Continuation),這大大解放了生產力——現場無需等待,可以立即返回去做其他事情。一旦 IO 成功後,AIO 的 Event Loop 會調用剛剛設置的回調函數,把剩下的工作完成。這種模式有時也被稱爲 Fire and Forget。

recv_from_socket((input) -> {
    var result = calculator.calculate(input)
    send_to_socket(result) // ignore result
})

就這麼簡單,通過我們自己實現的 Continuation,線程不再受 IO 阻塞,可以自由自在地跑滿 CPU。

一顆語法糖:Promise

回調函數哪裏都好,就是不大好用,以及太醜了。

第一個問題是可讀性大大下降,由於我們繞開操作系統自制 Continuation,所有函數調用都要傳入一個 lambda 表達式,你的代碼看起來就像要起飛一樣,縮進止不住地往右挪(the "Callback Hell")。

第二個問題是各種細節處理起來很麻煩,比如,考慮下異常處理,看來傳一個 Continuation 還不夠,最好再傳個異常處理的 callback。

Promise 是對異步調用結果的一個封裝,在 Java 中它叫作 CompletableFuture (JDK8) 或者 ListenableFuture (Guava)。Promise 有兩層含義:

第一層含義是:我現在還不是真正的結果,但是承諾以後會拿到這個結果。這很容易理解,異步的任務遲早會完成,調用者如果比較蠢萌,他也可以用 Promise.get() 強行要拿到結果,順便阻塞了當前線程,異步變成了同步。

第二層含義是:如果你(調用者)有什麼吩咐,就告訴我好了。這就有趣了,換句話說,回調函數不再是傳給 g(),而是 g() 返回的 Promise,比如之前那段代碼,我們用 Promise 來書寫,看起來順眼了不少。

var promise_input = recv_from_socket()
promise_input.then((input) -> {
    var result = calculator.calculate(input)
    send_to_socket(result) // ignore result
})

Promise 改善了 Callback 的可讀性,也讓異常處理稍稍優雅了些,但終究是顆語法糖。

反應式編程

反應式(Reactive)最早源於函數式編程中的一種模式,隨着微軟發起 ReactiveX 項目並一步步壯大,被移植到各種語言和平臺上。Reactive 最初在 GUI 編程中有廣泛的應用,由於異步調用的高性能,很快也在服務器後端領域遍地開花。

Reactive 可以看作是對 Promise 的極大增強,相比 Promise,反應式引入了流(Flow)的概念。ReactiveX 中的事件流從一個 Observable 對象流出,這個對象可以是一個按鈕,也可以是 Restful API,總之,它能被外界觸發。與 Promise 不同的是,事件可能被觸發多次,所以處理代碼也會被多次調用。

一旦允許調用多次,從數據流動的角度看,事實上模型已經是 Push 而非 Pull。那麼問題來了,如果調用頻率非常高,以至於我們處理速度跟不上了怎麼辦?所以 RX 框架又引入了 Backpressure 機制來進行流控,最簡單的流控方式就是:一旦 buffer 滿,就丟棄掉之後的事件。

ReactiveX 框架的另一個優點是內置了很多好用的算子,比如:merge(Flow 合併),debounce(開關除顫)等等,方便了業務開發。下面是一個 RxJava 的例子:

CPS 變換:Coroutine 與 async/await

無論是反應式還是 Promise,說到底仍然沒有擺脫手工構造 Continuation:開發者要把業務邏輯寫成回調函數。對於線性的邏輯基本可以應付自如,但是如果邏輯複雜一點呢?(比如,考慮下包含循環的情況)

有些語言例如 C#,JavaScript 和 Python 提供了 async/await 關鍵字。與 Reactive 一樣,這同樣出自微軟 C# 語言。在這些語言中,你會感到前所未有的爽感:異步編程終於擺脫了回調函數!唯一要做的只是在異步函數調用時加上 await,編譯器就會自動把它轉化爲協程(Coroutine),而非昂貴的線程。

魔法的背後是 CPS 變換,CPS 變換把普通函數轉換成一個 CPS 的函數,即 Continuation 也能作爲一個調用參數。函數不僅能從頭運行,還能根據 Continuation 的指示繼續某個點(比如調用 IO 的地方)運行。

可以看到,函數已經不再是一個函數了,而是變成一個狀態機。每次 call 它、或者它 call 其他異步函數時,狀態機都會做一些計算和狀態輪轉。說好的 Continuation 在哪呢?就是對象自己(this)啊。

CPS 變換實現非常複雜,尤其是考慮到 try-catch 之後。但是沒關係,複雜性都在編譯器裏,用戶只要學兩個關鍵詞即可。這個特性非常優雅,比 Java 那個廢柴的 CompletableFuture 不知道高到哪去了

JVM 上也有一個實現:electronicarts/ea-async,原理和 C# 的 async/await 類似,在編譯期修改 Bytecode 實現 CPS 變換。

終極方案:用戶態線程

有了 async/await,代碼已經簡潔很多了,基本上和同步代碼無異。是否有可能讓異步代碼和同步代碼完全一樣呢?聽起來就像免費午餐,但是的確可以做到!

用戶態線程的代表是 Golang。JVM 上也有些實現,比如 Quasar,不過因爲 JDBC、Spring 這些周邊生態(它們佔據了大部分 IO 操作)的缺失基本沒有什麼用。

關注公衆號 Java 技術棧,在後臺回覆:面試,可以獲取我整理的 Java 多線程系列面試題和答案,非常齊全。

用戶態線程是把操作系統提供的線程機制完全拋棄,換句話說,不去用這個 VM 的虛擬化機制。比如硬件有 8 個核心,那就創建 8 個系統線程,然後把 N 個用戶線程調度到這 8 個系統線程上跑。N 個用戶線程的調度在用戶進程裏實現,由於一切都在進程內部,切換代價要遠遠小於操作系統 Context Switch。

另一方面,所有可能阻塞系統級線程的事情,例如 sleep()recv() 等,用戶態線程一定不能碰,否則它一旦阻塞住也就帶着那 8 個系統線程中的一個阻塞了。Go Runtime 接管了所有這樣的系統調用,並用一個統一的 Event loop 來輪詢和分發。

另外,由於用戶態線程很輕量,我們完全沒必要再用線程池,如果需要開線程就直接創建。比如 Java 中的 WebServer 幾乎一定有個線程池,而 Go 可以給每個請求開闢一個 goroutine 去處理。併發編程從未如此美好!

總結

以上方案中,Promise、Reactive 本質上還是回調函數,只是框架的存在一定程度上降低了開發者的心智負擔。而 async/await 和用戶態線程的解決方案要優雅和徹底的多,前者通過編譯期的 CPS 變換幫用戶創造出 CPS 式的函數調用;後者則繞開操作系統、重新實現一套線程機制,一切調度工作由 Runtime 接管。

不知道是不是因爲歷史包袱太重,Java 語言本身提供的異步編程支持弱得可憐,即便是 CompletableFuture 還是在 Java 8 才引入,其後果就是很多庫都沒有異步的支持。雖然 Quasar 在沒有語言級支持的情況下引入了 CPS 變換,但是由於缺少周邊生態的支持,實際很難用在項目中。

最後,關注公衆號 Java 技術棧,在後臺回覆:面試,可以獲取我整理的 Java 多線程系列面試題和答案,非常齊全。

References

  1. https://blog.tsunanet.net/2010/11/how-long-does-it-take-to-make-context.html

  2. http://reactivex.io/

  3. https://zhuanlan.zhihu.com/p/25964339

  4. http://docs.paralleluniverse.co/quasar/

  5. http://morsmachine.dk/go-scheduler

  6. https://medium.com/@ThatGuyTinus/callbacks-vs-promises-vs-async-await-f65ed7c2b9b4

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/LByOV2FzsvyBcmLUjlRncw