Golang 設計模式之觀察者模式

1 原理介紹

本期基於 go 語言和大家探討設計模式中的觀察者模式. 觀察者模式適用於多對一的訂閱 / 發佈場景.

在上述場景中,我們瞭解到核心對象有兩類,一類是 “觀察者”,一類是 “被觀察的事物”,且兩者間在數量上存在多對一的映射關係.

在具體作編程實現時,上述場景的實現思路可以是百花齊放的,而觀察者模式只是爲我們提供了一種相對規範的設計實現思路,其遵循的核心宗旨是實現 “觀察者” 與“被觀察對象”之間的解耦,並將其設計爲通用的模塊,便於後續的擴展和複用.

學習設計模式時,我們腦海中需要中需要明白,教條是相對刻板的,而場景和問題則是靈活多變的,在工程實踐中,我們避免生搬硬套,要做到因地制宜,隨機應變.

2 代碼實踐

2.1 核心角色

在觀察者模式中,核心的角色包含三類:

觀察者模式的核心就在於建立了 EventBus 的角色. 由於 EventBus 模塊的誕生,實現了觀察者與具體被觀察事物之間的解耦:

三類角色組織生成的 UML 類圖如下所示:

對應的代碼實現示例展示如下:

type Event struct {
    Topic string
    Val   interface{}
}


type Observer interface {
    OnChange(ctx context.Context, e *Event) error
}


type EventBus interface {
    Subscribe(topic string, o Observer)
    Unsubscribe(topic string, o Observer)
    Publish(ctx context.Context, e *Event)
}
type BaseEventBus struct {
    mux       sync.RWMutex
    observers map[string]map[Observer]struct{}
}


func NewBaseEventBus() BaseEventBus {
    return BaseEventBus{
        observers: make(map[string]map[Observer]struct{}),
    }
}


func (b *BaseEventBus) Subscribe(topic string, o Observer) {
    b.mux.Lock()
    defer b.mux.Unlock()
    _, ok := b.observers[topic]
    if !ok {
        b.observers[topic] = make(map[Observer]struct{})
    }
    b.observers[topic][o] = struct{}{}
}


func (b *BaseEventBus) Unsubscribe(topic string, o Observer) {
    b.mux.Lock()
    defer b.mux.Unlock()
    delete(b.observers[topic], o)
}

2.2 同步模式

在同步模式下,EventBus 在接受到變更事件 Event 時,會根據事件類型 Topic 匹配到對應的觀察者列表 observers,然後採用串行遍歷的方式分別調用 Observer.OnChange 方法對每個觀察者進行通知,並對處理流程中遇到的錯誤進行聚合,放到 handleErr 方法中進行統一的後處理.

type SyncEventBus struct {
    BaseEventBus
}


func NewSyncEventBus() *SyncEventBus {
    return &SyncEventBus{
        BaseEventBus: NewBaseEventBus(),
    }
}


func (s *SyncEventBus) Publish(ctx context.Context, e *Event) {
    s.mux.RLock()
    subscribers := s.observers[e.Topic]
    s.mux.RUnlock()


    errs := make(map[Observer]error)
    for subscriber := range subscribers {
        if err := subscriber.OnChange(ctx, e); err != nil {
            errs[subscriber] = err
        }
    }


    s.handleErr(ctx, errs)
}

此處對 handleErr 方法的實現邏輯進行建立了簡化,在真實的實踐場景中,可以針對遇到的錯誤建立更完善的後處理流程,如採取重試或告知之類的操作.

func (s *SyncEventBus) handleErr(ctx context.Context, errs map[Observer]error) {
    for o, err := range errs {
        // 處理 publish 失敗的 observer
        fmt.Printf("observer: %v, err: %v", o, err)
    }
}

2.3 異步模式

在異步模式的實現中,通過 AsyncEventBus 實現了 EventBus 的異步通知版本,對應類圖如下:

在異步模式下,會在 EventBus 啓動之初,異步啓動一個守護協程,負責對接收到的錯誤進行後處理.

在事物發生變更時,EventBus 的 Publish 方法會被調用,此時 EventBus 會併發調用 Observer.OnChange 方法對每個觀察者進行通知,在這個過程中遇到的錯誤會通過 channel 統一彙總到 handleErr 的守護協程中進行處理.

type observerWithErr struct {
    o   Observer
    err error
}




type AsyncEventBus struct {
    BaseEventBus
    errC chan *observerWithErr
    ctx  context.Context
    stop context.CancelFunc
}




func NewAsyncEventBus() *AsyncEventBus {
    aBus := AsyncEventBus{
        BaseEventBus: NewBaseEventBus(),
    }
    aBus.ctx, aBus.stop = context.WithCancel(context.Background())
    // 處理處理錯誤的異步守護協程
    go aBus.handleErr()
    return &aBus
}


func (a *AsyncEventBus) Stop() {
    a.stop()
}


func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) {
    a.mux.RLock()
    subscribers := a.observers[e.Topic]
defer a.mux.RUnlock()
    for subscriber := range subscribers {
        // shadow
        subscriber := subscriber
        go func() {
            if err := subscriber.OnChange(ctx, e); err != nil {
                select {
                case <-a.ctx.Done():
                case a.errC <- &observerWithErr{
                    o:   subscriber,
                    err: err,
                }:
                }
            }
        }()
    }
}


func (a *AsyncEventBus) handleErr() {
    for {
        select {
        case <-a.ctx.Done():
            return
        case resp := <-a.errC:
            // 處理 publish 失敗的 observer
            fmt.Printf("observer: %v, err: %v", resp.o, resp.err)
        }
    }
}

2.4 使用示例

下面分別給出同步和異步模式下觀察者模式的使用示例:

func Test_syncEventBus(t *testing.T) {
    observerA := NewBaseObserver("a")
    observerB := NewBaseObserver("b")
    observerC := NewBaseObserver("c")
    observerD := NewBaseObserver("d")


    sbus := NewSyncEventBus()
    topic := "order_finish"
    sbus.Subscribe(topic, observerA)
    sbus.Subscribe(topic, observerB)
    sbus.Subscribe(topic, observerC)
    sbus.Subscribe(topic, observerD)


    sbus.Publish(context.Background()&Event{
        Topic: topic,
        Val:   "order_id: xxx",
    })
}

異步測試代碼:

func Test_asyncEventBus(t *testing.T) {
    observerA := NewBaseObserver("a")
    observerB := NewBaseObserver("b")
    observerC := NewBaseObserver("c")
    observerD := NewBaseObserver("d")


    abus := NewAsyncEventBus()
    defer abus.Stop()


    topic := "order_finish"
    abus.Subscribe(topic, observerA)
    abus.Subscribe(topic, observerB)
    abus.Subscribe(topic, observerC)
    abus.Subscribe(topic, observerD)


    abus.Publish(context.Background()&Event{
        Topic: topic,
        Val:   "order_id: xxx",
    })


    <-time.After(time.Second)
}

3 工程案例

本章和大家一起梳理一下在工程實踐中對觀察者模式的使用場景.

3.1 MQ 發佈 / 訂閱

大家耳熟能詳的消息隊列就是對觀察者模式的一種實踐,大家可以採用類比的方式在 MQ (Message Queue)架構中代入觀察者模式中的每一類角色:

3.2 ETCD 監聽回調

另一個踐行了觀察者模式的工程案例是基於 golang 編寫的分佈式 kv 存儲組件 etcd.

etcd 提供了作用於指定數據範圍的監聽回調功能,能在對應數據狀態發生變更時,將變更通知傳達到每個訂閱者的手中,在這個過程中:

想了解有關 etcd watch 機制的更多內容,可以閱讀我之前發表的文章:

etcd watch 機制源碼解析——服務端篇 以及 etcd watch 機制源碼解析——客戶端篇.

4 總結

本文和大家一起探討了設計模式中的觀察者模式:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QOXh86eX8z5Ts4O1pky44g