響應式編程的複雜度和簡化

響應式系統不是今天的主題,我們要討論更具體的話題,即響應式代碼的編寫會有哪些複雜度,應該如何簡化。 

什麼是響應式編程

什麼是響應式編程,它是一種編程範式?還是一種設計模式?抑或是其他?響應式系統和響應式編程有什麼關係?又比如,響應式編程它適用於什麼場景?解決什麼問題?

微軟於 2011 年率先建設了. Net 上的 Rx 庫,以簡化容易出錯的異步和事件驅動編程,邁出了響應式編程的第一步,隨後業界爲許多編程語言提供了對應的實現。

.Net 上的 Rx 庫地址:https://docs.microsoft.com/en-us/previous-versions/dotnet/reactive-extensions/hh242985(v=vs.103)

什麼是響應式,我們從一個例子開始,

在上面的表格中,建立了單元格之間的關係:A1 = B1 + C1,建立關係之後 A1 將響應任何對於 B1 和 C1 的變化,毫無疑問,這就是一種響應式行爲。

我覺得這個例子很棒的地方在於,它顯然很簡單,同時又足夠深刻,首先,它充分的體現了響應式的概念,其次,變化發生時,肯定觸發了某些過程的執行,說明背後存在關係的建立和沿着關係傳播的變化,再次,稍微深入一點看,B1 和 C1 的變化可以是一系列的變化,可以很自然的引申到流的概念,最後,它有一個很高級的抽象,對使用方來說,整個過程是聲明式的。

當然,舉例子來說明一個概念的時候,本質上是用一個外延在解釋一個概念的內涵,往往是會將內涵縮小的,所以我們可以嘗試推廣這個外延。列出這個例子的特徵:

描述了一個單元格里的整數等於另外兩個單元格里整數相加,當後者每次發生變化時,變化都會傳播到第一個單元格,並進行求值。

關鍵詞爲整數、等於、相加、變化、每次、傳播、求值。前三個關鍵詞僅僅和例子相關,可以直接去掉。變化可以推廣爲數據,每次可以在邏輯上等價於流的概念,流可以有 0 個、1 個或多個數據,傳播可以推廣爲通信(在這個意義上,函數調用、RPC、socket、MQ 都是通信),求值推廣爲執行一個過程。所以我們可以得出響應式編程的定義:

通過聲明式的通信定義,將數據流與過程組合起來,從而實現數據驅動過程的一種複合編程範式。

時至今日,業界對於響應式的定義仍然是不統一的,因此這是我自己的理解。響應式的基礎概念是數據流,理念是過程的執行是通過響應數據來驅動的,核心是構造數據和過程的響應關係,並且能夠讓數據沿着關係傳播驅動過程,因此響應式編程本質上是一種對通信的抽象,說它是一種編程範式,是因爲它提供一種對於數據與過程組合方式的看法,說它是複合範式而不是基本範式,是因爲它不像 OOP 或者 FP 一樣提供的是對於數據和過程的看法,而是以兩者爲基礎,所以可以有對象響應式和函數響應式。

當我們基於響應式構建系統時,就是響應式系統,響應式系統的構建原則可以參考此處(地址:https://www.reactiveprinciples.org/patterns/communicate-facts.html),總的來說,系統會分割成一個一個的分區,分區內部對狀態進行本地化,分區之間通過通信進行異步解耦,可以通過控制這個通信的過程,實現系統的彈性擴縮容和部分組件失敗的回彈性。

響應式系統不是今天的主題,我們要討論更具體的話題,即響應式代碼的編寫會有哪些複雜度,應該如何簡化。

響應式編程的複雜度

響應式編程的複雜度來自於 4 個方面:

可以有0次、1次或多次數據產生,也就是數據流;

除了數據之外,還有能夠標識錯誤和完成(正常結束);

數據流和數據流、數據流和過程的組合複雜度很高;

在上面的基礎上,需要處理整個過程中線程切換、併發同步、數據緩衝等問題。

爲了支持數據流的概念,可以產生 0 次、1 次或多次數據產生,API 設計需要把數據回調和結果回調分開,通常也會把錯誤回調和完成回調分開,這種接口被稱爲流式接口,一個標準的流式接口設計如下所示:

typealias Func = () -> ()
typealias OnData<Data> = (Data) -> ()
typealias OnError = (Error) -> ()
typealias OnComplete = Func
typealias StreamFunc<Data> = (@escaping OnData<Data>, @escaping OnError, @escaping OnComplete) -> ()

顯然,流式接口是普通異步接口將一次結果向多次結果的推廣,這種推廣同時也增加了邏輯的複雜度。

我們可以通過一個邏輯上簡單的例子來看一下流式接口的使用過程,爲了關注於核心的複雜度,只會體現前 3 個方面,一方面是由於加入第 4 點的話會導致代碼過於冗長混淆關注點,另一方面相信各位對第 4 點本身的複雜度和它引起的衆多問題已經非常熟悉了。

這個例子很簡單,只有三步:

  1. 假設需要爲一個店鋪提供一個訂單展示頁面,這些訂單來自兩個不同的平臺 “鵝鵝鵝” 和“鴨鴨鴨”,他們各自提供了查詢的接口(listOrders,爲了簡單假設他們提供的模型和接口完全一致);  

  2. 訂單列表需要展示用戶的暱稱等信息,需要通過對應平臺的另外一個接口(queryUserInfo)查詢;  

  3. 由於 SDK 緩存、持久化、網絡請求策略,數據無法一次性獲取,這兩個接口可能存在多次數據回調。 

進一步簡化問題,我們忽略變更處理、UI 渲染和用戶交互處理,僅僅考慮數據加載,這需要組合 2 個階段的 4 次接口調用,先分別請求兩個平臺的訂單,使用訂單請求對應平臺的 userInfo,最後合併成完整數據:

// 數據怎麼回調,什麼情況結束,onError和onComplete分別在什麼情況回調,保證有且僅有一次回調
func load(onData : OnData<[OrderObject]>?, onError : OnError?, onComplete : OnComplete?) {
    let orderServices = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
    // 記錄整體請求的完成狀態
    var listOrderFinish = false
    var queryUserFinish = false
    // 記錄各個請求的結果
    var listOrderResults = orderServices.map{_ in false}
    var queryUserResults = [Bool]()
    for (index, orderService) in orderServices.enumerated() {
        orderService.listOrders { orders in
            // 已結束不處理
            if (listOrderFinish) {
                return;
            }
            let index = queryUserResults.count
            queryUserResults[index] = false
            if let userService = getUserService(site: orderService.site){
                let userIds = orders.map { order in
                    order.userId
                }
                userService.queryUserInfo(userIds: userIds) { userInfoDict in
                    if (listOrderFinish && queryUserFinish) {
                        return;
                    }
                    let orderObjects = orders.map { order in
                        OrderObject(order: order, userInfo: userInfoDict[order.userId])
                    }
                    onData?(orderObjects)
                } onError: { error in
                    // 如果是第一個錯誤,直接回調,同時標記爲結束
                    if (!listOrderFinish || !queryUserFinish) {
                        listOrderFinish = true
                        queryUserFinish = true
                        onError?(error)
                    }
                } onComplete: {
                    // 外層結束,內層也結束,纔是最終結束
                    if (!listOrderFinish || !queryUserFinish) {
                        queryUserResults[index] = true
                        // 所有都結束,回調
                        if (listOrderFinish && !queryUserResults.contains(false)) {
                            listOrderFinish = true
                            onComplete?()
                        }
                    }
                }
            } else {
                let orderObjects = orders.map { order in
                    OrderObject(order: order)
                }
                onData?(orderObjects)
                queryUserResults[index] = true
                // 所有都結束,回調
                if (listOrderFinish && !queryUserResults.contains(false)) {
                    listOrderFinish = true
                    onComplete?()
                }
            }
        } onError: { error in
            // 如果是第一個錯誤,直接回調,同時標記爲結束
            if (!listOrderFinish) {
                listOrderFinish = true
                onError?(error)
            }
        } onComplete: {
            // 注意,即使所有的請求都結束了,也不能回調結束,因爲這裏的結束只是代表Order請求結束,userInfo請求不一定結束
            if (!listOrderFinish) {
                listOrderResults[index] = true
                // 所有都結束,回調
                if (!listOrderResults.contains(false)) {
                    listOrderFinish = true
                }
            }
        }
    }
}

在這個接口的實現中,數據回調最簡單,在沒有結束的情況下,多次回調的數據可以直接回調,問題是如何保證錯誤和完成有且僅有一次回調,且結果回調後不再回調數據,即: 

什麼時候回調錯誤?
什麼時候回調完成?

如果我們認爲一個接口出錯,就回調錯誤,這是最簡單的錯誤處理,只需要檢查和設置結束狀態,在沒有結束時的第一個錯誤進行回調即可,注意,我們需要在 userInfo 的請求中也做類似的處理,並保證錯誤回調後不再執行任何回調。

完成的回調要比錯誤複雜的多,我們可以來思考一下:

  1. 首先,我們不能在 listOrders 的 onComplete 裏面取回調完成,因爲這裏不能代表 queryUserInfo 這個接口也完成了;  

  2. 其次,我們也不能簡單的通過所有 queryUserInfo 都完成了就回調完成,因爲 listOrders 在完成前仍然有可能返回新的訂單數據。

也就是說,這裏的完成需要在 queryUserInfo 進行判斷,並且也需要考慮外層請求的完成情況,比普通異步接口的級聯要多了兩個維度。這僅僅是 2 種接口 4 次請求,在真實的編程中,接口數量會多得多,並且需要把第 4 點加進來,線程 / 隊列、併發、同步、緩衝區,還要處理新數據推送響應,再考慮調試、監控、排查,複雜度顯然會繼續大幅增長,保證這個過程的正確性是一件痛苦的事情。

響應式編程的複雜度使用 Rx/Combine 簡化響應式編程

爲了解決這些問題,業界搞出了 Reactive Streams 規範(地址:https://www.reactive-streams.org/),也出現了若干的實現,都以工具庫的形式提供,包括 Rx 系列、Reactor,以及蘋果功能類似的 Combine。作爲一個 iOS 開發,我對 RxSwift 和 Combine 比較瞭解,兩者主要的區別在於 Combine 多了一個 Subscription 的抽象來協調 Publisher 和 Subscriber 之間的行爲,尤其是 Back Pressure 相關的控制,但總的來說,都提供了對於異步數據流的抽象和組合能力,用法上也很類似,這裏以 RxSwift 爲例來重寫上面的過程。

第一步,實現一個將流式函數轉換成 Observable 的工具類,這個是通用的,非常直觀:

func makeObservable<Data>(f : @escaping StreamFunc<Data>) -> Observable<Data> {
    Observable<Data>.create { observer in
        f { data in
            observer.onNext(data)
        } _: { error in
            observer.onError(error)
        } _: {
            observer.onCompleted()
        }
        return Disposables.create()
    }
}

第二步,針對這個例子,將 listOrder 和 queryUserInfo 轉換成 StreamFunc 形式,listOrder 本來就是 StreamFunc,對 queryUserInfo 進行偏應用也可以轉換爲 StreamFunc 形式,這是具體接口相關的:

func makeStreamFunc(orders : [Order], userInfoService : UserService?) -> StreamFunc<[OrderObject]> {
    if let userInfoService = userInfoService {
        // 核心是對queryUserInfo的userIds參數進行偏應用
        let userInfoF : StreamFunc<[OrderObject]> = { onData, onError, onComplete in
            let userIds = orders.map{$0.userId}
            userInfoService.queryUserInfo(userIds: userIds, onData: { userInfoDict in
                let orderObjects = orders.map { order in
                    OrderObject(order: order, userInfo: userInfoDict[order.userId])
                }
                onData(orderObjects)
            }, onError: onError, onComplete: onComplete)
        }
        return userInfoF
    } else {
        return { onData, onError, onComplete in
            onData(orders.map{OrderObject(order: $0)})
            onComplete()
        }
    }
}

第三步,這樣就可以將 load 方法簡化爲:

func rxLoad() -> Observable<[OrderObject]> {
    let orderService = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
    // 通過map構造Observable,通過flatMap對listOrder和queryUserInfo進行復合
    let observables = orderService.map { orderService in
        makeObservable(f: orderService.listOrders).flatMap { (orders) -> Observable<[OrderObject]> in
            let userLoadF = makeStreamFunc(orders: orders, userInfoService: getUserService(site: orderService.site))
            return makeObservable(f: userLoadF)
        }
    }
    // merge兩個平臺的Observable
    return Observable.merge(observables)
}

可以看到,第一步是通用的,實際代碼中只需要做第二步和第三步,這就對上面的接口進行了大量的簡化,並且庫以統一的方式處理掉了合併、級聯、多數據返回的複雜邏輯,我們有相當的把握來保證正確性。當然,除了學習成本較高以外,也還是有缺點的,主要是使用方式仍然是異步形式,在部分環節仍然需要處理異步帶來的複雜度:

// 使用方調用
rxLoad().subscribe { orderObjects in
    // onNext閉包中處理數據
} onError: { error in
    // onError閉包中處理錯誤
} onCompleted: {
    // onCompleted閉包中處理完成
} onDisposed: {
}

Rx 確實大大簡化了異步編程,但是還不夠,因爲它的使用仍然是異步形式。

使用 AsyncSequence 簡化響應式編程

迭代器與序列

迭代器是很多語言都有的一個概念,一個迭代器的核心是 next() 函數,每次調用都會返回下一個數據,這些數據構成了一個序列(Sequence),迭代器也意味着序列可以被遍歷。

異步序列

如果讓迭代器的 next() 方法支持異步,就產生了異步序列。Swift 對此提供了一個 AsyncSequence 的協議,並對它提供了語言級別的支持,使得開發者可以以同步的形式遍歷一個異步序列:

for try await data in asyncDataList {
    print("async get data : + \(data)")
}

實際上,Swift 在 Combine 中支持了 Publisher 的同步遍歷:

// Combine的同步調用
for try await data in publisher.values {
    print("async get publiser value \(data)")
}

不過這個特性需要 iOS15 才能支持,如果說 iOS13 還可以展望的話,iOS15 就是遙遙無期了。

CPS 變換

如果能將流式接口轉換爲異步序列,那麼就可以實現響應式代碼的同步編寫,這個轉換過程可以通過 CPS 變換實現。

CPS 變換全稱 Continuation-Pass-Style,這個概念來自 Lisp 語系,是一種顯式傳遞控制流的編程風格,其傳遞控制流的載體就是 continuation。continuation 可以理解爲當前代碼執行的後續,如果一個函數 f 有一個 continuation 參數,我們就可以把當前的 continuation 傳遞進去,當函數產生結果時,通過 continuation 回到函數 f 外,繼續執行,這種函數調用方式成爲 call/cc(call with current continuation)。

這種變換,稱爲 CPS 變換。

作爲一個類比,我覺得可以將 continuation 理解爲 return 的在兩個方面的推廣形式,首先,continuation 是 first-class 的,可以作爲變量存儲,可以作爲函數的參數和返回值,其次,continuation 可以多次使用,而 return 只能有一次。

響應式編程的同步形式

回頭看最原始的代碼,當我們調用 orderService.listOrders 時,傳進去的 callback,其實就相當於一個弱化版的 continuation。這意味着,如果我們可以將使用 continuation 將數據表示爲 AsyncSequence,那麼就可以將響應式代碼寫成同步形式,從而大幅簡化響應式編程。

Swift 提供了 continuation 的概念,提供了 AsyncStream 和 AsyncThrowingStream 來實現這個過程,對上節 Rx 的實現稍作改動即可。

第一步,實現一個將流式函數轉換成 AsyncThrowingStream 的工具類,這個是通用的:

func makeSequence<Data>(f : StreamFunc<Data>) -> AsyncThrowingStream<Data, Error> {
    AsyncThrowingStream<Data, Error>{ continuation in
        f { data in
            continuation.yield(data)
        } _: { error in
            continuation.finish(throwing: e)
        } _: {
            continuation.finish()
        }
    }
}

第二步,由於 AsyncSequence 還不支持 merge,需要自己實現一個 merge 工具方法來實現多個流的組合,這個也是通用的:

//多個AsyncSequence merge成一個AsyncSequence
func mergeSequence<Seq : AsyncSequence>(seqs : [Seq]) -> AsyncThrowingStream<Seq.Element, Error> {
    makeSequence(f: mergeF(fs: seqs.map(makeLoadFunc)))
}
func makeLoadFunc<Seq : AsyncSequence>(ats : Seq) -> StreamFunc<Seq.Element>{
    { onData, onError, onComplete in
        Task {
            do {
                for try await data in ats {
                    onData(data)
                }
                onComplete()
            } catch {
                onError(error)
            }
        }
    }
}
func mergeF<Data>(fs : [StreamFunc<Data>]) -> StreamFunc<Data> {
    { onData, onError, onComplete in
        var finish = false
        var results = fs.map{_ in false}
        for (index, f) in fs.enumerated() {
            f { data in
                if (!finish) {
                    onData(data)
                }
            } _: { e in
                // 如果是第一個錯誤,直接回調,同時標記爲結束
                if (!finish) {
                    finish = true
                    onError(e)
                }
            } _: {
                // 注意,即使所有的請求都結束了,回調成功
                if (!finish) {
                    results[index] = true
                    // 所有都結束,回調
                    if (!results.contains(false)) {
                        finish = true
                        onComplete()
                    }
                }
            }
        }
    }
}

第三步,將 listOrder 和 queryUserInfo 轉換成 StreamFunc 形式,與 Rx 中的第二步實現完全相同; 

第四步,這樣就可以將 load 方法簡化爲:

func asLoad() -> AsyncThrowingStream<[OrderObject], Error> {
    let orderService = [OrderService("鵝鵝鵝"), OrderService("鴨鴨鴨")]
    // 通過map構造AsyncSequence,通過flatMap對listOrder和queryUserInfo進行復合
    let streams = orderService.map { orderService in
        makeSequence(f: orderService.listOrders).flatMap { (orders) -> AsyncThrowingStream<[OrderObject], Error> in
            makeSequence(f: makeLoadFunc(orders: orders, userInfoService: getUserService(site: orderService.site)))
        }
    }
    // merge兩個平臺的AsyncSequence
    return mergeSequence(seqs: streams)
}

可以發現,代碼與 RxSwift 幾乎是完全相同的,所以我們仍然有對於代碼正確性的信心,不同的是,現在使用方也得以獲得同樣的信心:

for try await orderObject in asLoad() {
    print("async get orderObject \(orderObject.first?.order.orderId)")
}

總結

同步是編程中的田園世界,而流式接口作爲異步接口最複雜的形態,我們通過 CPS 變換的控制流技術,將流式接口表示爲 AsyncSequence,實現了對異步序列遍歷的同步形式,從而將響應式編程在形式上統一回了田園世界。

上面的第一步和第二步實現了 AsyncSequence 和 StreamFunc 的相互轉換,所以實際上我們證明了它們是同構的,更進一步的,我們可以證明它們與 Rx、Combine 也是同構的。換言之,它們是同一個概念的不同形式,理論上它們的表達能力是等價的,這個概念就是數據流,這個概念在 Rx 中叫做 Observable,在 Combine 中叫做 Publisher。

在實際實現上,Rx 和 Combine 提供了大量的操作符,因此目前它們的能力遠遠強於 AsyncSequence 和 StreamFunc,比如 AsyncSequence 居然不支持 merge。

AsyncSequence 的優勢是可以支持同步寫法,在我看來這個優勢是很大的。看到社區有過 AsyncSequence 替換 Combine 的相關的討論,我認爲邏輯上是講得通的。

AsyncSequence 替換 Combine 的相關討論地址:https://forums.swift.org/t/should-asyncsequence-replace-combine-in-the-future-or-should-they-coexist/53370)

團隊介紹

我們是來自淘寶全域觸達 & 用戶互動客戶端團隊,負責包含 Push、POP 彈層和消息溝通三大觸達場景。全域觸達 & 用戶互動客戶端團隊追求極致的性能、流暢的交互體驗和穩定的觸達效率,用智能化的調控策略爲用戶帶來更好的使用體驗。

作者 | 蔣文豪(四點)

編輯 | 橙子君

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/7jUGbSmR87FkTRqsm20NFQ