RxJS 入門及應用

RxJS 爲什麼被稱爲是函數響應式編程?

函數式編程(Functional Programming)

函數式編程的特點是允許把函數當成一個實參或返回值,主要思想是想將複雜的運算分解成一系列嵌套的函數,逐層推導,不斷漸進,直至完成運算。常用的數組方法(map,filter 等)就運用了函數式編程的思想。

const arr = [4,1,5,2,3];
const newArr = arr
  .sort((a,b) => a-b)
  .filter(value => value>2);
console.log(newArr);   // [3,4,5]

函數式編程還有一個最重要的特性,那就是純淨性 Purity(純函數)

純函數是不會產生副作用的函數,其中輸出完全由輸入決定,也就是說無論調用多少次,調用f(x)都會得到相同的結果。

函數副作用:指當調用函數時,除了返回函數值之外,還對主調用函數產生附加的影響。例如修改全局變量(函數外的變量),修改參數或改變外部存儲。

純函數滿足以下兩個條件

①函數的執行過程完全由輸入參數決定,不會受除參數之外的任何數據的影響。

②函數不會修改任何外部狀態,比如修改全局變量或傳入的參數對象。

通常如果創建一個非純函數,在這個函數之外使用了共享變量的代碼,會使應用狀態混亂難以維護。

var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', 
  () => console.log(`Clicked ${++count} times`)
);

而使用 RxJS,可以將應用狀態隔離出來,不會被外部環境影響也不會影響外部環境。

import { fromEvent, scan } from 'rxjs'; // 將事件轉換成 observable 序列。
var button = document.querySelector('button');
const example = fromEvent(button, 'click').pipe(
  scan(count => count + 1, 0); //工作原理與數組的 reduce 類似,隨着時間的推移進行歸併
);
example.subscribe(count => console.log(`Clicked ${count} times`));

響應式編程(Reactive Programming)

wiki 百科中的解釋:

在計算中,響應式編程反應式編程(Reactive programming)是一種面向數據流和變化傳播的聲明式編程範式。這意味着可以在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。

什麼是數據流?

數據流(data stream)是數據在系統內傳播的路徑,表示在一定時間範圍內發生的一系列事件。

任何東西都可以是一個 Stream:變量、用戶輸入、網絡響應、定時器、數據結構等等。

什麼是變化傳播?

在數據流傳播的過程中,可能會有一些事件去組合、創建、過濾這些 Streams,從一箇舊的 stream 映射成一個新的 stream。我們不需要去輪詢變化,而是對事件進行監聽,在執行一個事件後,會自動做出相應的響應,這就是變化傳播。

前端框架與 rxjs 的結合

UI = f(data);
data = g(origin data)
UI = f(g(origin data))

RxJS 是用來幹什麼的?

RxJS 是一個用於處理異步事件流的庫,通過使用 observable 序列來編寫異步和基於事件的程序,實際應用場景就是把請求封裝成 observerable,通過一些基本操作符(map、filter 等等)將返回的數據處理並且 catch 錯誤,將異步事件作爲集合來處理。RxJS 實際上是將開發過程中遇到的異步(多爲異步,同步也可以)操作看爲一個事件流,RxJS 內部封裝了對一個事件流的操作符(創建、轉換、組合、過濾、錯誤異常處理等),組合使用這些操作符來以更便利的方式來管理事件。

爲什麼用 RxJS,摘自知乎回答:

思考一下,異步的本質是什麼?

異步操作和同步操作最大的區別就是異步有時序。

我們可以把同步操作理解爲:數據 + 函數

那麼異步操作就是:數據 + 函數 + 時序

Rx 就是把時序抽離成一根時間軸,在這根時間軸上進行同步操作,而異步相關的時序處理就交給 Rx 提供的各種 operator 操作符。

所以問題就很簡單了,如果你的應用是一個時序密集的應用,那麼使用 Rx 能幫你理清複雜的異步邏輯。反之,如果異步操作之間沒有太多的聯繫,時序分散, 則不那麼需要使用 Rx。

RxJS 中解決異步事件管理的基本概念

1. Observable(可觀察對象)

將一個數據流看作一個可觀察對象,表示這個數據流變化傳播過程中發生的一些事件的集合。

vnbpnj

拉取推送是兩種不同的協議,用來描述數據生產者 (Producer) 與數據消費者 (Consumer) 如何通信。

  1. 拉取體系

js 中每個函數 function 都屬於拉取體系,函數來生產數據,消費者通過調用該函數的代碼來從函數中獲取單個返回值來對該函數進行消費,而迭代器 Iterator 則是消費者調用 iterator.next() 來獲取多個返回值進行消費。

拉取的過程中,生產者是一個被動的過程,在消費者請求調用自己時才產生數據,消費者是一個主動的過程,消費者自己來決定何時調用生產者來獲取收據。

  1. 推送體系

在如今的 js 中,Promise 是最常見的推送體系,Promise 作爲生產者,將解析過的resolved值傳給消費者註冊過的一個回調函數。

推送的過程中,生產者是一個主動的過程,在生產者獲取resolved值的時候,生產者可以決定何時把值推送給消費者,而消費者並不知道什麼時候可以從生產者這裏獲取到值。在 RxJS 中,observable也屬於推送體系,並且可以推送一個或多個值。

上面這些術語有些抽象,舉個🌰更容易理解什麼是 Observable

Function
function foo() {
    console.log('Hello')
    return 'world';
}

const x = foo();
console.log(x);

const y = foo();
console.log(y);
Observable
const foo = Observable.create(function (observer) {
    console.log('Hello');
    observer.next('world');
});
// .subscribe()類似於調用函數
foo.subscribe(function (x) {
    console.log(x);
});
foo.subscribe(function (y) {
    console.log(y);
});
// 控制檯輸出是相同的: 
'Hello'
'world'
'Hello'
'world'

Observable 可以隨着時間推移返回(推送)多個值 ,這一點是函數做不到的。

Function
function foo() {
    return 'Hello';
    return 'world'; // 永遠不會執行
}

const a = foo();
console.log(a);

//控制檯輸出
'Hello'
Observable
const foo = Observable.create(function (observer) {
    observer.next('Hello');
    observer.next('world');
});

foo.subscribe(function (x) {
    console.log(x);
});

// 控制檯輸出
'Hello'
'world'

// 也可以異步推送一些值
const foo = Observable.create(function (observer) {
    observer.next('Hello')
    setTimeout(() => {
        observer.next('rxjs');
    },0)
    observer.next('world');
});

// 控制檯輸出
'Hello'
'world'
'rxjs'

1.1 創建 Observable

Observable 可以使用 Observable.create 來創建,但通常我們使用創建操作符 [1] 來創建 Observable。

1.2   訂閱 Observable

訂閱 Observable 像是調用函數,並提供接收數據的回調函數。

observable.subscribe(value => {
 // do something
})

不同觀察者通過 subscribe 調用同一 observable 數據不共享。

每一次調用,等於重新執行一遍函數。

1.3 執行 Observable

Observable 執行可以傳遞三種類型的值:

  1. Next:推送一個值,可以是任意類型;

  2. Error:推送一個錯誤或者異常;

  3. Complete:推送一個「已完成」的消息,表明不會再發送任何值;

next() 方法中的值代表要推送給觀察者的實際數據,可以執行多次;

error() 和 complete() 會在 Observable 執行期間至多執行一次,並且只會執行其中一個;

Observable.create(observer => {
    try {
        observer.next(1);
        observer.next(2);
        observer.complete();
        observer.next(3); // 前面已經通知觀察者已經完成,所以這個值不會發送
    } catch (e) {
        observer.error(e); // 捕獲到異常發送一個錯誤
    }
})

1.4   銷燬 Observable 執行

Observable 的執行可能會是無限的,通常觀察者希望在一個有限的時間裏終止 Observable 執行,以避免浪費計算資源和內存消耗。

類似於清除定時器,var timer = setInterval(() => {},1000); clearInterval(timer);

// 調用subscribe時,觀察者會被附加到新創建的Observable執行中,
// 會返回一個對象,即Subscription(訂閱)
var subscription = observable.subscribe();
// Subscription表示正在進行中的執行,調用unsubscribe()來取消observable執行;
subscription.unsubscribe();

2. Observer (觀察者)

Observer(觀察者)是一組回調函數的集合,每一個回調函數對應 Observable 發送通知的類型:nexterrorcomplete

const observer = {
    next: () => {}, // 觀察者接收到next()消息執行的回調函數
    error: () => {}, // 觀察者接收到error()消息執行的回調函數
    complete: () => {}, // 接收到complete()消息執行的回調函數
}
// observer中的觀察者可能是部分的,沒有提供某個回調,observable還是可以執行的。
// 方法1:將observer觀察者傳入subscribe
observable.subscribe(observer)
// 方法2:subscribe按順序(next,error,complete)傳入三個回調函數
observable.subscribe((value) => {},(error) => {}, () => {})

3.  Subscription (訂閱)

Subscription 是一個可清理資源的對象,代表 Observable 的執行。

基本用處就是使用 unsubscribe 來釋放資源或取消 Observable 的執行。

4. Subject (主體)

引入一個新的概念,Cold Observable / Hot Observable。

Observable 對象就是一個數據流,在一個時間範圍內推送一系列數據。

在只存在一個 observer 的情況下很簡單,但是對於存在多個 observer 的場景,會變得複雜。

假設一個場景:

兩個 observer 觀察者 A 和 B 訂閱同一個 Observable 對象,但他們不是同時訂閱,第一個觀察者 A 訂閱 N 秒後,第二個觀察者 B 才訂閱這個 Observable 對象。並且在這 N 秒期間, Observable 已經推送了一些數據,那麼第二個觀察者 B 應不應該收到已經被推送給第一個觀察者 A 的那些數據呢?

Selection 1 :已經推送給觀察者 A 的值就不給 B 了,B 只從訂閱那一時間點接收 Observable 推送的數據就行了。

Selection 2:已經推送給觀察者 A 的值還是要給 B,B 訂閱時從頭開始獲取 Observable 推送的數據。

RxJS 考慮到這兩種不同的場景,讓 Observable 支持這兩種不同的需求,Selection 1 這樣的 Observable 就是 Hot Observable,而 Selection 2 這樣的 Observable 就是 Cold Observable。

RxJS Subject 是一種特殊類型的 Observable,允許將值多播給多個觀察者(每個已訂閱的觀察者從訂閱時間點開始接收當前 Observable 推送的值,非獨立),而普通的 Observable 是單播的(每個已訂閱的觀察者是獨立執行 Observable 的)。

對於多個訂閱 Subject 的觀察者,subscribe 不會重新從頭髮送值,他只是將觀察者註冊到觀察者列表中,後續有新值發送的時候,將值多播給觀察者列表中的所有觀察者。

RxJS 的四種不同類型 Subject

1F0JLj

4.1.1   BehaviorSubject

BS 有一個 “當前值” 的概念,它保存了發送給觀察者的最後一個值(當前值),當有新的觀察者訂閱時,會立即接收到“當前值”;

而如果用 Subject,在觀察者訂閱時,之前已發送的值不會再發給觀察者包括最近的一個值,後續再有值發送的時候,新註冊的觀察者纔會接收到新的值。

var subject = new BehaviorSubject(0); // 0是初始值

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);
// 輸出:
observerA: 0 //line3 :A訂閱時立即收到當前值(初始值)0
observerA: 1 //line7 : BS推送新的值1,訂閱者A接收到值1
observerA: 2 //line8 : BS推送新的值2,訂閱者A接收到值2
observerB: 2 //line 10 : B訂閱時立即收到變化後的當前值2
observerA: 3 //line 14:BS推送新的值3,訂閱者A和B一起收到值3
observerB: 3
4.1.2 AsyncSubject

AS 只有當 Observable 執行完成時【執行 complete()】,纔會將執行的最後一個值發送給觀察者

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();
// 輸出:
// line 17 執行complete()後兩個訂閱者A和B纔會收到最後的一個值(5)
observerA: 5
observerB: 5
4.1.3 ReplaySubject

RS 類似 BS,它可以發送舊值給新的觀察者,還可以記錄 Observable 的執行的一部分,將 Observable 執行過程中的多個值回放給新的觀察者。

var subject = new ReplaySubject(3); // 爲新的訂閱者緩衝3個值

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
// 輸出
observerA: 1 // line 7: RS推送值1,訂閱者A收到值1
observerA: 2 // line 8: RS推送值2,訂閱者A收到值2
observerA: 3 // Line 9: RS推送值3,訂閱者A收到值3
observerA: 4 // line 10: RS推送值4,訂閱者A收到值4
observerB: 2 // line 12: 新的訂閱者訂閱RS
observerB: 3 // 訂閱時按順序收到了RS緩衝的三個值
observerB: 4
observerA: 5 // line 16:RS推送值5,觀察者A和B收到值5
observerB: 5

Tips
//RS除了可以指定緩衝數量,還可以指定時間(單位毫秒)來確定多久之前的值要記錄
var subject = new ReplaySubject(3,500) 記錄3個值500ms前

5. Operators (操作符)

操作符是允許複雜的異步代碼以聲明式的方式進行輕鬆組合的基礎代碼單元。

操作符本質就是一個純函數,當操作符被調用時,不會改變已經存在的 Observable 實例,會基於當前 Observable 創建一個新的 Observable。

一個 Observable 對象代表的是一個數據流,實際場景中,產生 Observable 對象並不是每次都通過直接調用 Observable 構造函數來創造數據流對象。於現實中複雜的問題,並不會創造一個數據流之後就直接通過 subscribe 接上一個 Observer,往往需要對這個數據流做一系列處理,然後才交給 Observer。就像一個管道,數據從管道的一段流入,途徑管道各個環節,當數據到達 Observer 的時候,已經被管道操作過,有的數據已經被中途過濾拋棄掉了,有的數據已經被改變了原來的形態,而且最後的數據可能來自多個數據源,最後 Observer 只需要處理能夠走到終點的數據,而這個數據管道就是 pipe。

而對於每一個操作符,鏈接的就是上游 (upstream) 和下游 (downstream) 

Marble diagrams (彈珠圖)

爲了能夠解釋流是如何變化的,文字通常不足以能夠描述清楚,我們常常使用彈珠圖來對流的時序變化(操作符的運行方式)進行描述。

一個彈珠圖🌰:

參考資料

https://cn.rx.js.org/manual/overview.html

https://rxjs-cn.github.io/learn-rxjs-operators/

參考資料

[1] 創建操作符: https://rxjs-cn.github.io/learn-rxjs-operators/operators/creation/

❤️ 謝謝支持

以上便是本次分享的全部內容,希望對你有所幫助 ^_^

歡迎關注公衆號 ELab 團隊 收穫大廠一手好文章~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/zfWkDmbAQFwq7Rufx4eeMQ