Golang 設計模式之觀察者模式
1 原理介紹
本期基於 go 語言和大家探討設計模式中的觀察者模式. 觀察者模式適用於多對一的訂閱 / 發佈場景.
-
• ” 多 “:指的是有多名觀察者
-
• ” 一 “:指的是有一個被觀察事物
-
• ” 訂閱 “:指的是觀察者時刻關注着事物的動態
-
• ” 發佈 “:指的是事物狀態發生變化時是透明公開的,能夠正常進入到觀察者的視線
在上述場景中,我們瞭解到核心對象有兩類,一類是 “觀察者”,一類是 “被觀察的事物”,且兩者間在數量上存在多對一的映射關係.
在具體作編程實現時,上述場景的實現思路可以是百花齊放的,而觀察者模式只是爲我們提供了一種相對規範的設計實現思路,其遵循的核心宗旨是實現 “觀察者” 與“被觀察對象”之間的解耦,並將其設計爲通用的模塊,便於後續的擴展和複用.
學習設計模式時,我們腦海中需要中需要明白,教條是相對刻板的,而場景和問題則是靈活多變的,在工程實踐中,我們避免生搬硬套,要做到因地制宜,隨機應變.
2 代碼實踐
2.1 核心角色
在觀察者模式中,核心的角色包含三類:
-
• Observer:觀察者. 指的是關注事物動態的角色
-
• Event:事物的變更事件. 其中 Topic 標識了事物的身份以及變更的類型,Val 是變更詳情
-
• EventBus:事件總線. 位於觀察者與事物之間承上啓下的代理層. 負責維護管理觀察者,並且在事物發生變更時,將情況同步給每個觀察者.
觀察者模式的核心就在於建立了 EventBus 的角色. 由於 EventBus 模塊的誕生,實現了觀察者與具體被觀察事物之間的解耦:
三類角色組織生成的 UML 類圖如下所示:
對應的代碼實現示例展示如下:
type Event struct {
Topic string
Val interface{}
}
type Observer interface {
OnChange(ctx context.Context, e *Event) error
}
type EventBus interface {
Subscribe(topic string, o Observer)
Unsubscribe(topic string, o Observer)
Publish(ctx context.Context, e *Event)
}
type BaseEventBus struct {
mux sync.RWMutex
observers map[string]map[Observer]struct{}
}
func NewBaseEventBus() BaseEventBus {
return BaseEventBus{
observers: make(map[string]map[Observer]struct{}),
}
}
func (b *BaseEventBus) Subscribe(topic string, o Observer) {
b.mux.Lock()
defer b.mux.Unlock()
_, ok := b.observers[topic]
if !ok {
b.observers[topic] = make(map[Observer]struct{})
}
b.observers[topic][o] = struct{}{}
}
func (b *BaseEventBus) Unsubscribe(topic string, o Observer) {
b.mux.Lock()
defer b.mux.Unlock()
delete(b.observers[topic], o)
}
2.2 同步模式
在同步模式下,EventBus 在接受到變更事件 Event 時,會根據事件類型 Topic 匹配到對應的觀察者列表 observers,然後採用串行遍歷的方式分別調用 Observer.OnChange 方法對每個觀察者進行通知,並對處理流程中遇到的錯誤進行聚合,放到 handleErr 方法中進行統一的後處理.
type SyncEventBus struct {
BaseEventBus
}
func NewSyncEventBus() *SyncEventBus {
return &SyncEventBus{
BaseEventBus: NewBaseEventBus(),
}
}
func (s *SyncEventBus) Publish(ctx context.Context, e *Event) {
s.mux.RLock()
subscribers := s.observers[e.Topic]
s.mux.RUnlock()
errs := make(map[Observer]error)
for subscriber := range subscribers {
if err := subscriber.OnChange(ctx, e); err != nil {
errs[subscriber] = err
}
}
s.handleErr(ctx, errs)
}
此處對 handleErr 方法的實現邏輯進行建立了簡化,在真實的實踐場景中,可以針對遇到的錯誤建立更完善的後處理流程,如採取重試或告知之類的操作.
func (s *SyncEventBus) handleErr(ctx context.Context, errs map[Observer]error) {
for o, err := range errs {
// 處理 publish 失敗的 observer
fmt.Printf("observer: %v, err: %v", o, err)
}
}
2.3 異步模式
在異步模式的實現中,通過 AsyncEventBus 實現了 EventBus 的異步通知版本,對應類圖如下:
在異步模式下,會在 EventBus 啓動之初,異步啓動一個守護協程,負責對接收到的錯誤進行後處理.
在事物發生變更時,EventBus 的 Publish 方法會被調用,此時 EventBus 會併發調用 Observer.OnChange 方法對每個觀察者進行通知,在這個過程中遇到的錯誤會通過 channel 統一彙總到 handleErr 的守護協程中進行處理.
type observerWithErr struct {
o Observer
err error
}
type AsyncEventBus struct {
BaseEventBus
errC chan *observerWithErr
ctx context.Context
stop context.CancelFunc
}
func NewAsyncEventBus() *AsyncEventBus {
aBus := AsyncEventBus{
BaseEventBus: NewBaseEventBus(),
}
aBus.ctx, aBus.stop = context.WithCancel(context.Background())
// 處理處理錯誤的異步守護協程
go aBus.handleErr()
return &aBus
}
func (a *AsyncEventBus) Stop() {
a.stop()
}
func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) {
a.mux.RLock()
subscribers := a.observers[e.Topic]
defer a.mux.RUnlock()
for subscriber := range subscribers {
// shadow
subscriber := subscriber
go func() {
if err := subscriber.OnChange(ctx, e); err != nil {
select {
case <-a.ctx.Done():
case a.errC <- &observerWithErr{
o: subscriber,
err: err,
}:
}
}
}()
}
}
func (a *AsyncEventBus) handleErr() {
for {
select {
case <-a.ctx.Done():
return
case resp := <-a.errC:
// 處理 publish 失敗的 observer
fmt.Printf("observer: %v, err: %v", resp.o, resp.err)
}
}
}
2.4 使用示例
下面分別給出同步和異步模式下觀察者模式的使用示例:
func Test_syncEventBus(t *testing.T) {
observerA := NewBaseObserver("a")
observerB := NewBaseObserver("b")
observerC := NewBaseObserver("c")
observerD := NewBaseObserver("d")
sbus := NewSyncEventBus()
topic := "order_finish"
sbus.Subscribe(topic, observerA)
sbus.Subscribe(topic, observerB)
sbus.Subscribe(topic, observerC)
sbus.Subscribe(topic, observerD)
sbus.Publish(context.Background(), &Event{
Topic: topic,
Val: "order_id: xxx",
})
}
異步測試代碼:
func Test_asyncEventBus(t *testing.T) {
observerA := NewBaseObserver("a")
observerB := NewBaseObserver("b")
observerC := NewBaseObserver("c")
observerD := NewBaseObserver("d")
abus := NewAsyncEventBus()
defer abus.Stop()
topic := "order_finish"
abus.Subscribe(topic, observerA)
abus.Subscribe(topic, observerB)
abus.Subscribe(topic, observerC)
abus.Subscribe(topic, observerD)
abus.Publish(context.Background(), &Event{
Topic: topic,
Val: "order_id: xxx",
})
<-time.After(time.Second)
}
3 工程案例
本章和大家一起梳理一下在工程實踐中對觀察者模式的使用場景.
3.1 MQ 發佈 / 訂閱
大家耳熟能詳的消息隊列就是對觀察者模式的一種實踐,大家可以採用類比的方式在 MQ (Message Queue)架構中代入觀察者模式中的每一類角色:
-
• EventBus:對應的是消息隊列組件,爲整個通信架構提供了分佈式解耦、流量削峯等能力
-
• Event:對應的是消息隊列中的一條消息,有明確的主題 topic,由生產者 producer 提供
-
• Observer:對應的是消費者 consumer,對指定事物的動態(topic)進行訂閱,並在消費到對應的變更事件後執行對應的處理邏輯
3.2 ETCD 監聽回調
另一個踐行了觀察者模式的工程案例是基於 golang 編寫的分佈式 kv 存儲組件 etcd.
etcd 提供了作用於指定數據範圍的監聽回調功能,能在對應數據狀態發生變更時,將變更通知傳達到每個訂閱者的手中,在這個過程中:
-
• EventBus:對應的是 etcd 服務端的 watchableStore 監聽器存儲模塊,該模塊會負責存儲用戶創建的一系列監聽器 watcher,並建立由監聽數據 key 到監聽器集合 watcherGroup 之間的映射關係. 當任意存儲數據發生變化時,etcd 的數據存儲模塊會在一個統一的切面中調用通知方法,將這一信息傳達到 watchableStore 模塊,watchableStore 則會將變更數據與監聽數據 key 之間進行 join,最終得到一個需要執行回調操作的 watchers 組合,順沿 watcher 中的路徑,向訂閱者發送通知消息
-
• Event:對應的是一條 etcd 狀態機的數據變更事件,由 etcd 使用方在執行一條寫數據操作時觸發,在寫操作真正生效後,變更事件會被傳送到 watchableStore 模塊執行回調處理
-
• Observer:使用 etcd watch 功能對指定範圍數據建立監聽回調機制的使用方,在 etcd 服務端 watchableStore 模塊會建立監聽器實體 watcher 作爲自身的代理,當變更事件真的發生後,watchableStore 會以 watcher 作爲起點,沿着返回路徑一路將變更事件發送到使用方手中.
想了解有關 etcd watch 機制的更多內容,可以閱讀我之前發表的文章:
etcd watch 機制源碼解析——服務端篇 以及 etcd watch 機制源碼解析——客戶端篇.
4 總結
本文和大家一起探討了設計模式中的觀察者模式:
-
• 觀察者模式適用於多對一的訂閱 / 發佈場景,其實現思路是在觀察者與被觀察對象之間添加收口了發佈訂閱功能的中間層,核心宗旨是實現 “觀察者” 與“被觀察對象”之間的解耦
-
• 通過 UML 類圖結合具體代碼示例,對觀察者模式進行實踐. 根據變更事件的通知模式,觀察者模式可以分爲同步和異步兩種模型
-
• 本文給出兩個踐行了觀察者模式的工程案例,一個是 Message Queue 的發佈訂閱模式,一個是 ETCD 服務端對 watch 功能的實現思路
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/QOXh86eX8z5Ts4O1pky44g