Go 每日一庫之 rxgo
簡介
ReactiveX,簡稱爲 Rx,是一個異步編程的 API。與 callback(回調)、promise(JS 提供這種方式)和 deferred(Python 的 twisted 網絡編程庫就是使用這種方式)這些異步編程方式有所不同,Rx 是基於事件流的。這裏的事件可以是系統中產生或變化的任何東西,在代碼中我們一般用對象表示。在 Rx 中,事件流被稱爲 Observable(可觀察的)。事件流需要被 Observer(觀察者)處理纔有意義。想象一下,我們日常作爲一個 Observer,一個重要的工作就是觀察 BUG 的事件流。每次發現一個 BUG,我們都需要去解決它。
Rx 僅僅只是一個 API 規範的定義。Rx 有多種編程語言實現,RxJava/RxJS/Rx.NET/RxClojure/RxSwift
。RxGo 是 Rx 的 Go 語言實現。藉助於 Go 語言簡潔的語法和強大的併發支持(goroutine、channel),Rx 與 Go 語言的結合非常完美。
pipelines (官方博客:https://blog.golang.org/pipelines) 是 Go 基礎的併發編程模型。其中包含,fan-in——多個 goroutine 產生數據,一個 goroutine 處理數據,fan-out——一個 goroutine 產生數據,多個 goroutine 處理數據,fan-inout——多個 goroutine 產生數據,多個 goroutine 處理數據。它們都是通過 channel 連接。RxGo 的實現就是基於 pipelines 的理念,並且提供了方便易用的包裝和強大的擴展。
快速使用
本文代碼使用 Go Modules。
創建目錄並初始化:
$ mkdir rxgo && cd rxgo
$ go mod init github.com/darjun/go-daily-lib/rxgo
安裝rxgo
庫:
$ go get -u github.com/reactivex/rxgo/v2
編碼:
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)()
ch := observable.Observe()
for item := range ch {
fmt.Println(item.V)
}
}
使用 RxGo 的一般流程如下:
- 使用相關的 Operator 創建 Observable,Operator 就是用來創建 Observable 的。這些術語都比較難貼切地翻譯,而且英文也很好懂,就不強行翻譯了;
- 中間各個階段可以使用過濾操作篩選出我們想要的數據,使用轉換操作對數據進行轉換;
- 調用 Observable 的
Observe()
方法,該方法返回一個<- chan rxgo.Item
。然後for range
遍歷即可。
GitHub 上一張圖很形象地描繪了這個過程:
- 首先使用
Just
創建一個僅有若干固定數據的 Observable; - 使用
Map()
方法執行轉換(將圓形轉爲方形); - 使用
Filter()
方法執行過濾(過濾掉黃色的方形)。
看懂了這張圖片,就能瞭解 RxGo 工作的基本流程了。
上面是簡單的示例,沒有過濾、轉換操作的使用。
運行:
$ go run main.go
1
2
3
4
5
關於上面的示例,需要注意:
Just
使用柯里化(currying)讓它可以在第一個參數中接受多個數據,在第二個參數中接受多個選項定製行爲。柯里化是函數化編程的思想,簡單來說就是通過在函數中返回函數,以此來減少每個函數的參數個數。例如:
func add(value int) func (int) int {
return func (a int) int {
return value + a
}
}
fmt.Prinlnt(add(5)(10)) // 15
由於 Go 不支持多個可變參數,Just
通過柯里化迂迴地實現了這個功能:
// rxgo/factory.go
func Just(items ...interface{}) func(opts ...Option) Observable {
return func(opts ...Option) Observable {
return &ObservableImpl{
iterable: newJustIterable(items...)(opts...),
}
}
}
實際上rxgo.Item
還可以包含錯誤。所以在使用時,我們應該做一層判斷:
func main() {
observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
ch := observable.Observe()
for item := range ch {
if item.Error() {
fmt.Println("error:", item.E)
} else {
fmt.Println(item.V)
}
}
}
運行:
$ go run main.go
1
2
error: unknown
3
4
5
我們使用item.Error()
檢查是否出現錯誤。然後使用item.V
訪問數據,item.E
訪問錯誤。
除了使用for range
之外,我們還可以調用 Observable 的ForEach()
方法來實現遍歷。ForEach()
接受 3 個回調函數:
NextFunc
:類型爲func (v interface {})
,處理數據;ErrFunc
:類型爲func (err error)
,處理錯誤;CompletedFunc
:類型爲func ()
,Observable 完成時調用。
有點Promise
那味了。使用ForEach()
,可以將上面的示例改寫爲:
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)()
<-observable.ForEach(func(v interface{}) {
fmt.Println("received:", v)
}, func(err error) {
fmt.Println("error:", err)
}, func() {
fmt.Println("completed")
})
}
運行:
$ go run main.go
received: 1
received: 2
received: 3
received: 4
received: 5
completed
ForEach()
實際上是異步執行的,它返回一個接收通知的 channel。當 Observable 數據發送完畢時,該 channel 會關閉。所以如果要等待ForEach()
執行完成,我們需要使用<-
。上面的示例中如果去掉<-
,可能就沒有輸出了,因爲主 goroutine 結束了,整個程序就退出了。
創建 Observable
上面使用最簡單的方式創建 Observable:直接調用Just()
方法傳入一系列數據。下面再介紹幾種創建 Observable 的方式。
Create
傳入一個[]rxgo.Producer
的切片,其中rxgo.Producer
的類型爲func(ctx context.Context, next chan<- Item)
。我們可以在代碼中調用rxgo.Of(value)
生成數據,rxgo.Error(err)
生成錯誤,然後發送到next
通道中:
func main() {
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Of(3)
next <- rxgo.Error(errors.New("unknown"))
next <- rxgo.Of(4)
next <- rxgo.Of(5)
}})
ch := observable.Observe()
for item := range ch {
if item.Error() {
fmt.Println("error:", item.E)
} else {
fmt.Println(item.V)
}
}
}
當然,分成兩個rxgo.Producer
也是一樣的效果:
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Of(3)
next <- rxgo.Error(errors.New("unknown"))
}, func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(4)
next <- rxgo.Of(5)
}})
FromChannel
FromChannel
可以直接從一個已存在的<-chan rxgo.Item
對象中創建 Observable:
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 1; i <= 5; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
注意:
通道需要手動調用close()
關閉,上面Create()
方法內部rxgo
自動幫我們執行了這個步驟。
Interval
Interval
以傳入的時間間隔生成一個無窮的數字序列,從 0 開始:
func main() {
observable := rxgo.Interval(rxgo.WithDuration(5 * time.Second))
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上面的程序啓動後,第 5s 輸出 0,第 10s 輸出 1,…,而且不會停止。
我們可以用time.Ticker
實現相同的功能:
func main() {
t := time.NewTicker(5 * time.Second)
var count int
for range t.C {
fmt.Println(count)
count++
}
}
Range
Range
可以生成一個範圍內的數字:
func main() {
observable := rxgo.Range(0, 3)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上面代碼依次輸出 0,1,2,3。
Repeat
在已存在的 Observable 對象上調用Repeat
,可以實現每隔指定時間,重複一次該序列,一共重複指定次數:
func main() {
observable := rxgo.Just(1, 2, 3)().Repeat(
3, rxgo.WithDuration(1*time.Second),
)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
運行上面的代碼,立即輸出 1,2,3,然後等待 1s,又輸出一次 1,2,3,然後又等待 1s,最後又輸出一次 1,2,3。
Start
可以給Start
方法傳入[]rxgo.Supplier
作爲參數,它可以包含任意數量的rxgo.Supplier
類型。rxgo.Supplier
的底層類型爲:
// rxgo/types.go
var Supplier func(ctx context.Context) rxgo.Item
Observable 內部會依次調用這些rxgo.Supplier
生成rxgo.Item
:
func Supplier1(ctx context.Context) rxgo.Item {
return rxgo.Of(1)
}
func Supplier2(ctx context.Context) rxgo.Item {
return rxgo.Of(2)
}
func Supplier3(ctx context.Context) rxgo.Item {
return rxgo.Of(3)
}
func main() {
observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
Observable 分類
根據數據在何處生成,Observable 被分爲 Hot 和 Cold 兩種類型(類比熱啓動和冷啓動)。數據在其它地方生成的被成爲 Hot Observable。相反,在 Observable 內部生成數據的就是 Cold Observable。
使用上面介紹的方法創建的實際上都是 Hot Observable。
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
for item := range observable.Observe() {
fmt.Println(item.V)
}
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上面創建的是 Hot Observable。但是有個問題,第一次Observe()
消耗了所有的數據,第二個就沒有數據輸出了。
而 Cold Observable 就不會有這個問題,因爲它創建的流是獨立於每個觀察者的。即每次調用Observe()
都創建一個新的 channel。我們使用Defer()
方法創建 Cold Observable,它的參數與Create()
方法一樣。
func main() {
observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
}})
for item := range observable.Observe() {
fmt.Println(item.V)
}
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
輸出:
$ go run main.go
0
1
2
0
1
2
可連接的 Observable
可連接的(Connectable)Observable 對普通的 Observable 進行了一層組裝。調用它的Observe()
方法時並不會立刻產生數據。使用它,我們可以等所有的觀察者都準備就緒了(即調用了Observe()
方法)之後,再調用其Connect()
方法開始生成數據。我們通過兩個示例比較使用普通的 Observable 和可連接的 Observable 有何不同。
普通的:
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 1; i <= 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
time.Sleep(3 * time.Second)
fmt.Println("before subscribe second observer")
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
time.Sleep(3 * time.Second)
}
上例中我們使用DoOnNext()
方法來註冊觀察者。由於DoOnNext()
方法是異步執行的,所以爲了等待結果輸出,在最後增加了一行time.Sleep
。運行:
$ go run main.go
First observer: 1
First observer: 2
First observer: 3
before subscribe second observer
由輸出可以看出,註冊第一個觀察者之後就開始產生數據了。
我們通過在創建 Observable 的方法中指定rxgo.WithPublishStrategy()
選項就可以創建可連接的 Observable:
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 1; i <= 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
time.Sleep(3 * time.Second)
fmt.Println("before subscribe second observer")
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
observable.Connect(context.Background())
time.Sleep(3 * time.Second)
}
運行輸出:
$ go run main.go
before subscribe second observer
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
上面是等兩個觀察者都註冊之後,並且手動調用了 Observable 的Connect()
方法才產生數據。而且可連接的 Observable 有一個特性:它是冷啓動的!!!,即每個觀察者都會收到一份相同的拷貝。
轉換 Observable
rxgo 提供了很多轉換函數,可以修改經過它的rxgo.Item
,然後再發送給下一個階段。
Map
Map()
方法簡單修改它收到的rxgo.Item
然後發送到下一個階段(轉換或過濾)。Map()
接受一個類型爲func (context.Context, interface{}) (interface{}, error)
的函數。第二個參數就是rxgo.Item
中的數據,返回轉換後的數據。如果出錯,則返回錯誤。
func main() {
observable := rxgo.Just(1, 2, 3)()
observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int)*2 + 1, nil
}).Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int)*3 + 2, nil
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上例中每個數字經過兩個Map
,第一個Map
執行2 * i + 1
,第二個Map
執行3 * i + 2
。即對於每個數字來說,最終進行的變換爲3 * (2 * i + 1) + 2
。運行:
$ go run main.go
11
17
23
Marshal
Marshal
對經過它的數據進行一次Marshal
。這個Marshal
可以是json.Marshal/proto.Marshal
,甚至我們自己寫的Marshal
函數。它接受一個類型爲func(interface{}) ([]byte, error)
的函數用於對數據進行處理。
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
func main() {
observable := rxgo.Just(
User{
Name: "dj",
Age: 18,
},
User{
Name: "jw",
Age: 20,
},
)()
observable = observable.Marshal(json.Marshal)
for item := range observable.Observe() {
fmt.Println(string(item.V.([]byte)))
}
}
由於Marshal
操作返回的是[]byte
類型,我們需要進行類型轉換之後再輸出。
Unmarshal
既然有Marshal
,也就有它的相反操作Unmarshal
。Unmarshal
用於將一個[]byte
類型轉換爲相應的結構體或其他類型。與Marshal
不同,Unmarshal
需要知道轉換的目標類型,所以需要提供一個函數用於生成該類型的對象。然後將[]byte
數據Unmarshal
到該對象中。Unmarshal
接受兩個參數,參數一是類型爲func([]byte, interface{}) error
的函數,參數二是func () interface{}
用於生成實際類型的對象。我們拿上面的例子中生成的 JSON 字符串作爲數據,將它們重新Unmarshal
爲User
對象:
type User struct {
Name string `json:"name"`
Age int `json:"age"`
}
func main() {
observable := rxgo.Just(
`{"name":"dj","age":18}`,
`{"name":"jw","age":20}`,
)()
observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return []byte(i.(string)), nil
}).Unmarshal(json.Unmarshal, func() interface{} {
return &User{}
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
由於Unmarshaller
接受[]byte
類型的參數,我們在Unmarshal
之前加了一個Map
用於將string
轉爲[]byte
。運行:
$ go run main.go
&{dj 18}
&{jw 20}
Buffer
Buffer
按照一定的規則收集接收到的數據,然後一次性發送出去(作爲切片),而不是收到一個發送一個。有 3 種類型的Buffer
:
BufferWithCount(n)
:每收到n
個數據發送一次,最後一次可能少於n
個;BufferWithTime(n)
:發送在一個時間間隔n
內收到的數據;BufferWithTimeOrCount(d, n)
:收到n
個數據,或經過d
時間間隔,發送當前收到的數據。
BufferWithCount
:
func main() {
observable := rxgo.Just(1, 2, 3, 4)()
observable = observable.BufferWithCount(3)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
運行:
$ go run main.go
[1 2 3]
[4]
注意,最後一組只有一個。
BufferWithTime
:
func main() {
ch := make(chan rxgo.Item, 1)
go func() {
i := 0
for range time.Tick(time.Second) {
ch <- rxgo.Of(i)
i++
}
}()
observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(3 * time.Second))
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
每 3s 發送一次:
$ go run main.go
[0 1 2]
[3 4 5]
[6 7 8]
...
BufferWithTimeOrCount
:
func main() {
ch := make(chan rxgo.Item, 1)
go func() {
i := 0
for range time.Tick(time.Second) {
ch <- rxgo.Of(i)
i++
}
}()
observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(3*time.Second), 2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上面 3s 可以收集 3 個數據,但是設置了收集 2 個就發送。所以,運行輸出爲:
$ go run main.go
[0 1]
[2 3]
[4 5]
...
GroupBy
GroupBy
根據傳入一個 Hash 函數,爲每個不同的結果分別創建新的 Observable。換句話說,GroupBy
生成一個數據類型爲 Observable 的 Observable。
func main() {
count := 3
observable := rxgo.Range(0, 10).GroupBy(count, func(item rxgo.Item) int {
return item.V.(int) % count
}, rxgo.WithBufferedChannel(10))
for subObservable := range observable.Observe() {
fmt.Println("New observable:")
for item := range subObservable.V.(rxgo.Observable).Observe() {
fmt.Printf("item: %v\n", item.V)
}
}
}
上面根據每個數模 3 的餘數將整個流分爲 3 組。運行:
$ go run main.go
New observable:
item: 0
item: 3
item: 6
item: 9
New observable:
item: 1
item: 4
item: 7
item: 10
New observable:
item: 2
item: 5
item: 8
注意rxgo.WithBufferedChannel(10)
的使用,由於我們的數字是連續生成的,依次爲 0->1->2->…->9->10。而 Observable 默認是惰性的,即由Observe()
驅動。內層的Observe()
在返回一個 0 之後就等待下一個數,但是下一個數 1 不在此 Observable 中。所以會陷入死鎖。使用rxgo.WithBufferedChannel(10)
,設置它們之間的連接 channel 緩衝區大小爲 10,這樣即使我們未取出 channel 裏面的數字,上游還是能發送數字進來。
並行操作
默認情況下,這些轉換操作都是串行的,即只有一個 goroutine 負責執行轉換函數。我們也可以使用rxgo.WithPool(n)
選項設置運行n
個 goroutine,或者rxgo.WitCPUPool()
選項設置運行與邏輯 CPU 數量相等的 goroutine。
func main() {
observable := rxgo.Range(1, 100)
observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
time.Sleep(time.Duration(rand.Int31()))
return i.(int)*2 + 1, nil
}, rxgo.WithCPUPool())
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
由於是並行,所以輸出順序就不確定了。爲了讓不確定性更明顯一點,我在代碼中加了一行time.Sleep
。
過濾 Observable
Observable 中發送過來的數據並不一定都是我們需要的,我們要把不想要的過濾掉。
Filter
Filter()
接受一個類型爲func (i interface{}) bool
的參數,通過的數據使用這個函數斷言,返回true
的將發送給下一個階段。否則,丟棄。
func main() {
observable := rxgo.Range(1, 10)
observable = observable.Filter(func(i interface{}) bool {
return i.(int)%2 == 0
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上面過濾掉奇數,最後只剩下偶數:
$ go run main.go
2
4
6
8
10
ElementAt
ElementAt()
只發送指定索引的數據,如ElementAt(2)
只發送索引爲 2 的數據,即第 3 個數據。
func main() {
observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上面代碼輸出 2。
Debounce
Debounce()
比較有意思,它收到數據後還會等待指定的時間間隔,後續間隔內沒有收到其他數據纔會發送剛開始的數據。
func main() {
ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
time.Sleep(2 * time.Second)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
time.Sleep(2 * time.Second)
close(ch)
}()
observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
上面示例,先收到 1,然後 2s 內沒收到數據,所以發送 1。接着收到了數據 2,由於馬上又收到了 3,所以 2 不會發送。收到 3 之後 2s 內沒有收到數據,發送了 3。所以最後輸出爲 1,3。
Distinct
Distinct()
會記錄它發送的所有數據,它不會發送重複的數據。由於數據格式多樣,Distinct()
要求我們提供一個函數,根據原數據返回一個唯一標識碼(有點類似哈希值)。基於這個標識碼去重。
func main() {
observable := rxgo.Just(1, 2, 2, 3, 3, 4, 4)().
Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
依次輸出 1,2,3,4,沒有重複。
Skip
Skip
可以跳過前若干個數據。
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
Take
Take
只取前若干個數據。
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
選項
rxgo 提供的大部分方法的最後一個參數是一個可變長的選項類型。這是 Go 中特有的、經典的選項設計模式。我們前面已經使用了:
rxgo.WithBufferedChannel(10)
:設置 channel 的緩存大小;rxgo.WithPool(n)/rxgo.WithCpuPool()
:使用多個 goroutine 執行轉換操作;rxgo.WithPublishStrategy()
:使用發佈策略,即創建可連接的 Observable。
除此之外,rxgo 還提供了很多其他選項。留待大家自行探索了。
總結
rxgo 讓基於 pipelines 的併發編程變得更容易!
大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
參考
- rxgo GitHub:https://github.com/jordan-wright/rxgo
- Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib
我
歡迎關注我的微信公衆號【GoUpUp】,共同學習,一起進步~
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://darjun.github.io/2020/10/11/godailylib/rxgo/