響應式編程的複雜度和簡化
響應式系統不是今天的主題,我們要討論更具體的話題,即響應式代碼的編寫會有哪些複雜度,應該如何簡化。
什麼是響應式編程
什麼是響應式編程,它是一種編程範式?還是一種設計模式?抑或是其他?響應式系統和響應式編程有什麼關係?又比如,響應式編程它適用於什麼場景?解決什麼問題?
微軟於 2011 年率先建設了. Net 上的 Rx 庫,以簡化容易出錯的異步和事件驅動編程,邁出了響應式編程的第一步,隨後業界爲許多編程語言提供了對應的實現。
.Net 上的 Rx 庫地址:https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242985(v=vs.103)
什麼是響應式,我們從一個例子開始,
在上面的表格中,建立了單元格之間的關係:A1 = B1 + C1
,建立關係之後 A1 將響應任何對於 B1 和 C1 的變化,毫無疑問,這就是一種響應式行爲。
我覺得這個例子很棒的地方在於,它顯然很簡單,同時又足夠深刻,首先,它充分的體現了響應式的概念,其次,變化發生時,肯定觸發了某些過程的執行,說明背後存在關係的建立和沿着關係傳播的變化,再次,稍微深入一點看,B1 和 C1 的變化可以是一系列的變化,可以很自然的引申到流的概念,最後,它有一個很高級的抽象,對使用方來說,整個過程是聲明式的。
當然,舉例子來說明一個概念的時候,本質上是用一個外延在解釋一個概念的內涵,往往是會將內涵縮小的,所以我們可以嘗試推廣這個外延。列出這個例子的特徵:
描述了一個單元格里的整數等於另外兩個單元格里整數相加,當後者每次發生變化時,變化都會傳播到第一個單元格,並進行求值。
關鍵詞爲整數、等於、相加、變化、每次、傳播、求值。前三個關鍵詞僅僅和例子相關,可以直接去掉。變化可以推廣爲數據,每次可以在邏輯上等價於流的概念,流可以有 0 個、1 個或多個數據,傳播可以推廣爲通信(在這個意義上,函數調用、RPC、socket、MQ 都是通信),求值推廣爲執行一個過程。所以我們可以得出響應式編程的定義:
通過聲明式的通信定義,將數據流與過程組合起來,從而實現數據驅動過程的一種複合編程範式。
時至今日,業界對於響應式的定義仍然是不統一的,因此這是我自己的理解。響應式的基礎概念是數據流,理念是過程的執行是通過響應數據來驅動的,核心是構造數據和過程的響應關係,並且能夠讓數據沿着關係傳播驅動過程,因此響應式編程本質上是一種對通信的抽象,說它是一種編程範式,是因爲它提供一種對於數據與過程組合方式的看法,說它是複合範式而不是基本範式,是因爲它不像 OOP 或者 FP 一樣提供的是對於數據和過程的看法,而是以兩者爲基礎,所以可以有對象響應式和函數響應式。
當我們基於響應式構建系統時,就是響應式系統,響應式系統的構建原則可以參考此處(地址:https://www.reactiveprinciples.org/patterns/communicate-facts.html),總的來說,系統會分割成一個一個的分區,分區內部對狀態進行本地化,分區之間通過通信進行異步解耦,可以通過控制這個通信的過程,實現系統的彈性擴縮容和部分組件失敗的回彈性。
響應式系統不是今天的主題,我們要討論更具體的話題,即響應式代碼的編寫會有哪些複雜度,應該如何簡化。
響應式編程的複雜度
響應式編程的複雜度來自於 4 個方面:
可以有0次、1次或多次數據產生,也就是數據流;
除了數據之外,還有能夠標識錯誤和完成(正常結束);
數據流和數據流、數據流和過程的組合複雜度很高;
在上面的基礎上,需要處理整個過程中線程切換、併發同步、數據緩衝等問題。
爲了支持數據流的概念,可以產生 0 次、1 次或多次數據產生,API 設計需要把數據回調和結果回調分開,通常也會把錯誤回調和完成回調分開,這種接口被稱爲流式接口,一個標準的流式接口設計如下所示:
typealias Func = () -> ()
typealias OnData<Data> = (Data) -> ()
typealias OnError = (Error) -> ()
typealias OnComplete = Func
typealias StreamFunc<Data> = (@escaping OnData<Data>, @escaping OnError, @escaping OnComplete) -> ()
顯然,流式接口是普通異步接口將一次結果向多次結果的推廣,這種推廣同時也增加了邏輯的複雜度。
我們可以通過一個邏輯上簡單的例子來看一下流式接口的使用過程,爲了關注於核心的複雜度,只會體現前 3 個方面,一方面是由於加入第 4 點的話會導致代碼過於冗長混淆關注點,另一方面相信各位對第 4 點本身的複雜度和它引起的衆多問題已經非常熟悉了。
這個例子很簡單,只有三步:
-
假設需要爲一個店鋪提供一個訂單展示頁面,這些訂單來自兩個不同的平臺 “鵝鵝鵝” 和“鴨鴨鴨”,他們各自提供了查詢的接口(listOrders,爲了簡單假設他們提供的模型和接口完全一致);
-
訂單列表需要展示用戶的暱稱等信息,需要通過對應平臺的另外一個接口(queryUserInfo)查詢;
-
由於 SDK 緩存、持久化、網絡請求策略,數據無法一次性獲取,這兩個接口可能存在多次數據回調。
進一步簡化問題,我們忽略變更處理、UI 渲染和用戶交互處理,僅僅考慮數據加載,這需要組合 2 個階段的 4 次接口調用,先分別請求兩個平臺的訂單,使用訂單請求對應平臺的 userInfo,最後合併成完整數據:
// 數據怎麼回調,什麼情況結束,onError和onComplete分別在什麼情況回調,保證有且僅有一次回調
func load(onData : OnData<[OrderObject]>?, onError : OnError?, onComplete : OnComplete?) {
let orderServices = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
// 記錄整體請求的完成狀態
var listOrderFinish = false
var queryUserFinish = false
// 記錄各個請求的結果
var listOrderResults = orderServices.map{_ in false}
var queryUserResults = [Bool]()
for (index, orderService) in orderServices.enumerated() {
orderService.listOrders { orders in
// 已結束不處理
if (listOrderFinish) {
return;
}
let index = queryUserResults.count
queryUserResults[index] = false
if let userService = getUserService(site: orderService.site){
let userIds = orders.map { order in
order.userId
}
userService.queryUserInfo(userIds: userIds) { userInfoDict in
if (listOrderFinish && queryUserFinish) {
return;
}
let orderObjects = orders.map { order in
OrderObject(order: order, userInfo: userInfoDict[order.userId])
}
onData?(orderObjects)
} onError: { error in
// 如果是第一個錯誤,直接回調,同時標記爲結束
if (!listOrderFinish || !queryUserFinish) {
listOrderFinish = true
queryUserFinish = true
onError?(error)
}
} onComplete: {
// 外層結束,內層也結束,纔是最終結束
if (!listOrderFinish || !queryUserFinish) {
queryUserResults[index] = true
// 所有都結束,回調
if (listOrderFinish && !queryUserResults.contains(false)) {
listOrderFinish = true
onComplete?()
}
}
}
} else {
let orderObjects = orders.map { order in
OrderObject(order: order)
}
onData?(orderObjects)
queryUserResults[index] = true
// 所有都結束,回調
if (listOrderFinish && !queryUserResults.contains(false)) {
listOrderFinish = true
onComplete?()
}
}
} onError: { error in
// 如果是第一個錯誤,直接回調,同時標記爲結束
if (!listOrderFinish) {
listOrderFinish = true
onError?(error)
}
} onComplete: {
// 注意,即使所有的請求都結束了,也不能回調結束,因爲這裏的結束只是代表Order請求結束,userInfo請求不一定結束
if (!listOrderFinish) {
listOrderResults[index] = true
// 所有都結束,回調
if (!listOrderResults.contains(false)) {
listOrderFinish = true
}
}
}
}
}
在這個接口的實現中,數據回調最簡單,在沒有結束的情況下,多次回調的數據可以直接回調,問題是如何保證錯誤和完成有且僅有一次回調,且結果回調後不再回調數據,即:
什麼時候回調錯誤?
什麼時候回調完成?
如果我們認爲一個接口出錯,就回調錯誤,這是最簡單的錯誤處理,只需要檢查和設置結束狀態,在沒有結束時的第一個錯誤進行回調即可,注意,我們需要在 userInfo 的請求中也做類似的處理,並保證錯誤回調後不再執行任何回調。
完成的回調要比錯誤複雜的多,我們可以來思考一下:
-
首先,我們不能在 listOrders 的 onComplete 裏面取回調完成,因爲這裏不能代表 queryUserInfo 這個接口也完成了;
-
其次,我們也不能簡單的通過所有 queryUserInfo 都完成了就回調完成,因爲 listOrders 在完成前仍然有可能返回新的訂單數據。
也就是說,這裏的完成需要在 queryUserInfo 進行判斷,並且也需要考慮外層請求的完成情況,比普通異步接口的級聯要多了兩個維度。這僅僅是 2 種接口 4 次請求,在真實的編程中,接口數量會多得多,並且需要把第 4 點加進來,線程 / 隊列、併發、同步、緩衝區,還要處理新數據推送響應,再考慮調試、監控、排查,複雜度顯然會繼續大幅增長,保證這個過程的正確性是一件痛苦的事情。
響應式編程的複雜度使用 Rx/Combine 簡化響應式編程
爲了解決這些問題,業界搞出了 Reactive Streams 規範(地址:https://www.reactive-streams.org/),也出現了若干的實現,都以工具庫的形式提供,包括 Rx 系列、Reactor,以及蘋果功能類似的 Combine。作爲一個 iOS 開發,我對 RxSwift 和 Combine 比較瞭解,兩者主要的區別在於 Combine 多了一個 Subscription 的抽象來協調 Publisher 和 Subscriber 之間的行爲,尤其是 Back Pressure 相關的控制,但總的來說,都提供了對於異步數據流的抽象和組合能力,用法上也很類似,這裏以 RxSwift 爲例來重寫上面的過程。
第一步,實現一個將流式函數轉換成 Observable 的工具類,這個是通用的,非常直觀:
func makeObservable<Data>(f : @escaping StreamFunc<Data>) -> Observable<Data> {
Observable<Data>.create { observer in
f { data in
observer.onNext(data)
} _: { error in
observer.onError(error)
} _: {
observer.onCompleted()
}
return Disposables.create()
}
}
第二步,針對這個例子,將 listOrder 和 queryUserInfo 轉換成 StreamFunc 形式,listOrder 本來就是 StreamFunc,對 queryUserInfo 進行偏應用也可以轉換爲 StreamFunc 形式,這是具體接口相關的:
func makeStreamFunc(orders : [Order], userInfoService : UserService?) -> StreamFunc<[OrderObject]> {
if let userInfoService = userInfoService {
// 核心是對queryUserInfo的userIds參數進行偏應用
let userInfoF : StreamFunc<[OrderObject]> = { onData, onError, onComplete in
let userIds = orders.map{$0.userId}
userInfoService.queryUserInfo(userIds: userIds, onData: { userInfoDict in
let orderObjects = orders.map { order in
OrderObject(order: order, userInfo: userInfoDict[order.userId])
}
onData(orderObjects)
}, onError: onError, onComplete: onComplete)
}
return userInfoF
} else {
return { onData, onError, onComplete in
onData(orders.map{OrderObject(order: $0)})
onComplete()
}
}
}
第三步,這樣就可以將 load 方法簡化爲:
func rxLoad() -> Observable<[OrderObject]> {
let orderService = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
// 通過map構造Observable,通過flatMap對listOrder和queryUserInfo進行復合
let observables = orderService.map { orderService in
makeObservable(f: orderService.listOrders).flatMap { (orders) -> Observable<[OrderObject]> in
let userLoadF = makeStreamFunc(orders: orders, userInfoService: getUserService(site: orderService.site))
return makeObservable(f: userLoadF)
}
}
// merge兩個平臺的Observable
return Observable.merge(observables)
}
可以看到,第一步是通用的,實際代碼中只需要做第二步和第三步,這就對上面的接口進行了大量的簡化,並且庫以統一的方式處理掉了合併、級聯、多數據返回的複雜邏輯,我們有相當的把握來保證正確性。當然,除了學習成本較高以外,也還是有缺點的,主要是使用方式仍然是異步形式,在部分環節仍然需要處理異步帶來的複雜度:
// 使用方調用
rxLoad().subscribe { orderObjects in
// onNext閉包中處理數據
} onError: { error in
// onError閉包中處理錯誤
} onCompleted: {
// onCompleted閉包中處理完成
} onDisposed: {
}
Rx 確實大大簡化了異步編程,但是還不夠,因爲它的使用仍然是異步形式。
使用 AsyncSequence 簡化響應式編程
迭代器與序列
迭代器是很多語言都有的一個概念,一個迭代器的核心是 next() 函數,每次調用都會返回下一個數據,這些數據構成了一個序列(Sequence),迭代器也意味着序列可以被遍歷。
異步序列
如果讓迭代器的 next() 方法支持異步,就產生了異步序列。Swift 對此提供了一個 AsyncSequence 的協議,並對它提供了語言級別的支持,使得開發者可以以同步的形式遍歷一個異步序列:
for try await data in asyncDataList {
print("async get data : + \(data)")
}
實際上,Swift 在 Combine 中支持了 Publisher 的同步遍歷:
// Combine的同步調用
for try await data in publisher.values {
print("async get publiser value \(data)")
}
不過這個特性需要 iOS15 才能支持,如果說 iOS13 還可以展望的話,iOS15 就是遙遙無期了。
CPS 變換
如果能將流式接口轉換爲異步序列,那麼就可以實現響應式代碼的同步編寫,這個轉換過程可以通過 CPS 變換實現。
CPS 變換全稱 Continuation-Pass-Style,這個概念來自 Lisp 語系,是一種顯式傳遞控制流的編程風格,其傳遞控制流的載體就是 continuation。continuation 可以理解爲當前代碼執行的後續,如果一個函數 f 有一個 continuation 參數,我們就可以把當前的 continuation 傳遞進去,當函數產生結果時,通過 continuation 回到函數 f 外,繼續執行,這種函數調用方式成爲 call/cc(call with current continuation)。
這種變換,稱爲 CPS 變換。
作爲一個類比,我覺得可以將 continuation 理解爲 return 的在兩個方面的推廣形式,首先,continuation 是 first-class 的,可以作爲變量存儲,可以作爲函數的參數和返回值,其次,continuation 可以多次使用,而 return 只能有一次。
響應式編程的同步形式
回頭看最原始的代碼,當我們調用 orderService.listOrders 時,傳進去的 callback,其實就相當於一個弱化版的 continuation。這意味着,如果我們可以將使用 continuation 將數據表示爲 AsyncSequence,那麼就可以將響應式代碼寫成同步形式,從而大幅簡化響應式編程。
Swift 提供了 continuation 的概念,提供了 AsyncStream 和 AsyncThrowingStream 來實現這個過程,對上節 Rx 的實現稍作改動即可。
第一步,實現一個將流式函數轉換成 AsyncThrowingStream 的工具類,這個是通用的:
func makeSequence<Data>(f : StreamFunc<Data>) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream<Data, Error>{ continuation in
f { data in
continuation.yield(data)
} _: { error in
continuation.finish(throwing: e)
} _: {
continuation.finish()
}
}
}
第二步,由於 AsyncSequence 還不支持 merge,需要自己實現一個 merge 工具方法來實現多個流的組合,這個也是通用的:
//多個AsyncSequence merge成一個AsyncSequence
func mergeSequence<Seq : AsyncSequence>(seqs : [Seq]) -> AsyncThrowingStream<Seq.Element, Error> {
makeSequence(f: mergeF(fs: seqs.map(makeLoadFunc)))
}
func makeLoadFunc<Seq : AsyncSequence>(ats : Seq) -> StreamFunc<Seq.Element>{
{ onData, onError, onComplete in
Task {
do {
for try await data in ats {
onData(data)
}
onComplete()
} catch {
onError(error)
}
}
}
}
func mergeF<Data>(fs : [StreamFunc<Data>]) -> StreamFunc<Data> {
{ onData, onError, onComplete in
var finish = false
var results = fs.map{_ in false}
for (index, f) in fs.enumerated() {
f { data in
if (!finish) {
onData(data)
}
} _: { e in
// 如果是第一個錯誤,直接回調,同時標記爲結束
if (!finish) {
finish = true
onError(e)
}
} _: {
// 注意,即使所有的請求都結束了,回調成功
if (!finish) {
results[index] = true
// 所有都結束,回調
if (!results.contains(false)) {
finish = true
onComplete()
}
}
}
}
}
}
第三步,將 listOrder 和 queryUserInfo 轉換成 StreamFunc 形式,與 Rx 中的第二步實現完全相同;
第四步,這樣就可以將 load 方法簡化爲:
func asLoad() -> AsyncThrowingStream<[OrderObject], Error> {
let orderService = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
// 通過map構造AsyncSequence,通過flatMap對listOrder和queryUserInfo進行復合
let streams = orderService.map { orderService in
makeSequence(f: orderService.listOrders).flatMap { (orders) -> AsyncThrowingStream<[OrderObject], Error> in
makeSequence(f: makeLoadFunc(orders: orders, userInfoService: getUserService(site: orderService.site)))
}
}
// merge兩個平臺的AsyncSequence
return mergeSequence(seqs: streams)
}
可以發現,代碼與 RxSwift 幾乎是完全相同的,所以我們仍然有對於代碼正確性的信心,不同的是,現在使用方也得以獲得同樣的信心:
for try await orderObject in asLoad() {
print("async get orderObject \(orderObject.first?.order.orderId)")
}
總結
同步是編程中的田園世界,而流式接口作爲異步接口最複雜的形態,我們通過 CPS 變換的控制流技術,將流式接口表示爲 AsyncSequence,實現了對異步序列遍歷的同步形式,從而將響應式編程在形式上統一回了田園世界。
上面的第一步和第二步實現了 AsyncSequence 和 StreamFunc 的相互轉換,所以實際上我們證明了它們是同構的,更進一步的,我們可以證明它們與 Rx、Combine 也是同構的。換言之,它們是同一個概念的不同形式,理論上它們的表達能力是等價的,這個概念就是數據流,這個概念在 Rx 中叫做 Observable,在 Combine 中叫做 Publisher。
在實際實現上,Rx 和 Combine 提供了大量的操作符,因此目前它們的能力遠遠強於 AsyncSequence 和 StreamFunc,比如 AsyncSequence 居然不支持 merge。
AsyncSequence 的優勢是可以支持同步寫法,在我看來這個優勢是很大的。看到社區有過 AsyncSequence 替換 Combine 的相關的討論,我認爲邏輯上是講得通的。
AsyncSequence 替換 Combine 的相關討論地址:https://forums.swift.org/t/should-asyncsequence-replace-combine-in-the-future-or-should-they-coexist/53370)
團隊介紹
我們是來自淘寶全域觸達 & 用戶互動客戶端團隊,負責包含 Push、POP 彈層和消息溝通三大觸達場景。全域觸達 & 用戶互動客戶端團隊追求極致的性能、流暢的交互體驗和穩定的觸達效率,用智能化的調控策略爲用戶帶來更好的使用體驗。
作者 | 蔣文豪(四點)
編輯 | 橙子君
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/7jUGbSmR87FkTRqsm20NFQ