響應式流的核心機制 —— 背壓機制

一、響應式流是什麼?

Reactive Streams 是 2013 年底由 Netflix、Lightbend 和 Pivotal(Spring 背後的公司)的工程師發起的一項計劃,響應式流旨在爲無阻塞異步流處理提供一個標準。它旨在解決處理元素流的問題——如何將元素流從發佈者傳遞到訂閱者,而不需要發佈者阻塞,或訂閱者有無限制的緩衝區或丟棄。

響應式流模型存在兩種基本的實現機制。一種就是傳統開發模式下的 “拉” 模式,即消費者主動從生產者拉取元素;而另一種就是 “推” 模式,在這種模式下,生產者將元素推送給消費者。相較於 “拉” 模式,“推”模式下的數據處理的資源利用率更好,下圖所示的就是一種典型的推模式處理流程。


上圖中,數據流的生產者會持續地生成數據並推送給消費者。這裏就引出了流量控制問題,即如果數據的生產者和消費者處理數據的速度是不一致的,我們應該如何確保系統的穩定性呢?

二、流量控制

2.1 生產者生產數據的速率小於消費者的場景

這種場景對於消費者來說沒啥壓力,正常消費就好了,這裏也就不需要所謂的流量控制了。

2.2 生產者生產數據的速率大於消費者的場景

生產者生產數據的速率大於消費者的場景,應該是我們業務中經常遇到的場景了,這種場景由於消費者處理不過來導致崩潰,業界通常的做法是在生產者與消費者之間加一個隊列做緩衝。我們知道隊列具有存儲與轉發的功能,所以可以用它來進行一定的流量控制。


如何對於流量進行很好的控制?這就轉變到了如何設計好一個隊列了,目前 Java 業界主流的隊列有以下三種:

2.2.1 無界隊列

見名知意,無界隊列在原則上是擁有無線大小容量的隊列,可以存放生產者產生的所有消息。

2.2.2 有界丟棄隊列

爲了避免上面無界隊列的弊端,有界丟棄隊列採用的是如果隊列滿了,就會採用丟棄後面傳入的值,這裏可以設置一些丟棄策略,比如說按照優先級或先進先出等。

2.2.3 有界阻塞隊列

像一些支付金融級別的場景,是不允許丟數據的,所以我們引出有界阻塞隊列,我們會在隊列消息數量達到上限後阻塞生產者,而不是直接丟棄消息。

所以,無論從回彈性、彈性還是即時響應性出發,上述的隊列都不是響應式流的上佳解決辦法。

三、背壓機制

上面說的那幾種隊列純 “推” 模式下的數據流量會有很多不可控制的因素,並不能直接應用,而是需要在 “推” 模式和 “拉” 模式之間考慮一定的平衡性,從而優雅地實現流量控制。這就需要引出響應式系統中非常重要的一個概念——背壓機制(Backpressure)。

什麼是背壓?簡單來說就是下游能夠向上遊反饋流量請求的機制。通過前面的分析,我們知道如果消費者消費數據的速度趕不上生產者生產數據的速度時,它就會持續消耗系統的資源,直到這些資源被消耗殆盡。

這個時候,就需要有一種機制使得消費者可以根據自身當前的處理能力通知生產者來調整生產數據的速度,這種機制就是背壓。採用背壓機制,消費者會根據自身的處理能力來請求數據,而生產者也會根據消費者的能力來生產數據,從而在兩者之間達成一種動態的平衡,確保系統的即時響應性。

四、響應式流規範

有了背壓機制,我們再來看下響應式流是如何基於這種機制去設計的一套規範,規範詳情請參考:Reactive Streams

Java API 的響應式流只定義了四個核心接口:

4.1 Publisher<T>

Publisher 代表的就是一種可以生產無限數據的發佈者,接口如下:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

可以看到,Publisher 裏的 subscribe 方法傳入的是 Subscriber 接口,其實這裏用的是回調,Publisher 根據收到的請求向當前訂閱者 Subscriber 發送元素。

4.2 Subscriber<T>

Subscriber 代表的是一種可以從發佈者那裏訂閱並接收元素的訂閱者,接口如下:

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscriber 接口定義的這組方法構成了數據流請求和處理的基本流程,其中,onSubscribe() 從命名上看就是一個回調方法,當發佈者的 subscribe() 方法被調用時就會觸發這個回調。而在該方法中有一個參數 Subscription,可以把這個 Subscription 看作是一種用於訂閱的上下文對象。Subscription 對象中包含了這次回調中訂閱者想要向發佈者請求的數據個數。

當訂閱關係已經建立,那麼發佈者就可以調用訂閱者的 onNext() 方法向訂閱者發送一個數據。這個過程是持續不斷的,直到所發送的數據已經達到 Subscription 對象中所請求的數據個數。這時候 onComplete() 方法就會被觸發,代表這個數據流已經全部發送結束。而一旦在這個過程中出現了異常,那麼就會觸發 onError() 方法,我們可以通過這個方法捕獲到具體的異常信息進行處理,而數據流也就自動終止了。

4.3 Subscription

Subscription 代表的就是一種訂閱上下文對象,它在訂閱者和發佈者之間進行傳輸,從而在兩者之間形成一種契約關係,接口如下:

public interface Subscription {
    public void request(long n);
    public void cancel();
}

這裏的 request() 方法用於請求 n 個元素,訂閱者可以通過不斷調用該方法來向發佈者請求數據;而 cancel() 方法顯然是用來取消這次訂閱。請注意,Subscription 對象是確保生產者和消費者針對數據處理速度達成一種動態平衡的基礎,也是流量控制中實現背壓機制的關鍵所在

4.4 Processor<T,R>

Processor 代表的就是訂閱者和發佈者的處理階段,Processor 接口繼承了 Publisher 和 Subscriber 接口。它用於轉換髮布者——訂閱者管道中的元素。Processor 訂閱類型 T 的數據元素,接收並轉換爲類型 R 的數據,併發布變換後的數據。接口如下:

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

下圖顯示了處理者在發佈者——訂閱和管道中作爲轉換器的作用,可以擁有多個處理者。

五、總結

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/0tKvQKYSeX-A0uIFqZ_pVg