使用 Go 實現一個簡單的事件總線模式

事件驅動架構是計算機科學中一種高度可擴展的範例。它允許我們可以多方系統異步處理事件。

事件總線是發佈 / 訂閱模式的實現,其中發佈者發佈數據,並且感興趣的訂閱者可以監聽這些數據並基於這些數據作出處理。這使發佈者與訂閱者松耦合。發佈者將數據事件發佈到事件總線,總線負責將它們發送給訂閱者。

傳統的實現事件總線的方法會涉及到使用回調。訂閱者通常實現接口,然後事件總線通過接口傳播數據。

使用 go 的併發模型,我們知道在大多數地方可以使用 channel 來替代回調。在本文中,我們將重點介紹如何使用 channel 來實現事件總線。

我們專注於基於主題(topic)的事件。發佈者發佈到主題,訂閱者可以收聽它們。

定義數據結構

爲了實現事件總線,我們需要定義要傳遞的數據結構。我們可以使用 struct 簡單地創建一個新的數據類型。我們定義一個 DataEvent 的結構體如下:

type DataEvent struct {
   Data interface{}
   Topic string
}

在這裏,我們已經將基礎數據定義爲接口,這意味着它可以是任何值。我們還將主題定義爲結構的成員。訂閱者可能會收聽多個主題,因此,我們通過主題來讓訂閱者可以區分不同的事件的做法是不錯的。

介紹 channels

現在我們已經爲事件總線定義了我們主要的數據結構,我們還需要一種方法來傳遞它。爲此,我們可以定義一個可以傳播 DataEventDataChannel 類型。

// DataChannel 是一個能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一個包含 DataChannels 數據的切片
type DataChannelSlice [] DataChannel

DataChannelSlice 的創建是爲了保留 DataChannel 的切片並輕鬆引用它們。

事件總線

// EventBus 存儲有關訂閱者感興趣的特定主題的信息
type EventBus struct {
   subscribers map[string]DataChannelSlice
   rm sync.RWMutex
}

EventBussubscribers,這是一個包含 DataChannelSlices 的 map。我們使用互斥鎖來保護併發訪問的讀寫。

通過使用 map 和定義 topics ,它允許我們輕鬆地組織事件。主題被視爲 map 的鍵。當有人發佈它時,我們可以通過鍵輕鬆找到主題,然後將事件傳播到 channel 中以進行進一步處理。

訂閱主題

對於訂閱主題,使用 channel。它就像傳統方法中的回調一樣。當發佈者向主題發佈數據時,channel將接收數據。

func (eb *EventBus)Subscribe(topic string, ch DataChannel)  {
   eb.rm.Lock()
   if prev, found := eb.subscribers[topic]; found {
      eb.subscribers[topic] = append(prev, ch)
   } else {
      eb.subscribers[topic] = append([]DataChannel{}, ch)
   }
   eb.rm.Unlock()
}

簡單地說,我們將訂閱者添加到 channel 切片中然後給該結構加鎖,最後在操作後將其解鎖。

發佈主題

要發佈事件,發佈者需要提供廣播給訂閱者所需要的主題和數據。

func (eb *EventBus) Publish(topic string, data interface{}) {
   eb.rm.RLock()
   if chans, found := eb.subscribers[topic]; found {
      // 這樣做是因爲切片引用相同的數組,即使它們是按值傳遞的
      // 因此我們正在使用我們的元素創建一個新切片,從而能正確地保持鎖定
      channels := append(DataChannelSlice{}, chans...)
      go func(data DataEvent, dataChannelSlices DataChannelSlice) {
         for _, ch := range dataChannelSlices {
            ch <- data
         }
      }(DataEvent{Data: data, Topic: topic}, channels)
   }
   eb.rm.RUnlock()
}

在此方法中,首先我們檢查主題是否存在任何訂閱者。然後我們只是簡單地遍歷與主題相關的 channel 切片並把事件發佈給它們。

請注意,我們在發佈方法中使用了 goroutine 來避免阻塞發佈者

開始

首先,我們需要創建一個事件總線的實例。在實際場景中,你可以從包中導出單個 EventBus使其像單例一樣運行

var eb = &EventBus{
   subscribers: map[string]DataChannelSlice{},
}

爲了測試新創建的事件總線,我們將創建一個以隨機間隔時間發佈到指定主題的方法。

func publisTo(topic string, data string)  {
   for {
      eb.Publish(topic, data)
      time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
   }
}

接下來,我們需要一個可以收聽主題的 main 函數。它使用輔助方法打印出事件的數據。

func printDataEvent(ch string, data DataEvent)  {
   fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func main()  {
   ch1 := make(chan DataEvent)
   ch2 := make(chan DataEvent)
   ch3 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   eb.Subscribe("topic2", ch2)
   eb.Subscribe("topic2", ch3)
   go publisTo("topic1""Hi topic 1")
   go publisTo("topic2""Welcome to topic 2")
   for {
      select {
      case d := <-ch1:
         go printDataEvent("ch1", d)
      case d := <-ch2:
         go printDataEvent("ch2", d)
      case d := <-ch3:
         go printDataEvent("ch3", d)
      }
   }
}

我們創建了三個可以訂閱主題的 channels 訂閱者(ch1,ch2,ch3)。其中 ch2 和 ch3 這兩個監聽同一事件。

我們使用 select 語句從最快返回的 channel 中獲取數據。然後它使用另一個 goroutine 打印輸出數據。用 goroutine 也不是必需的。但在某些情況下,你必須對事件進行一些繁重的操作處理。爲了防止阻塞 select,我們使用了 goroutine。

示例輸出將如下所示

Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
...

你可以看到事件總線通過 channel 分發事件。

基於簡單 channel 的事件總線的源代碼。

完整的代碼

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type DataEvent struct {
    Data  interface{}
    Topic string
}

// DataChannel 是一個能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一個包含 DataChannels 數據的切片
type DataChannelSlice []DataChannel

// EventBus 存儲有關訂閱者感興趣的特定主題的信息
type EventBus struct {
    subscribers map[string]DataChannelSlice
    rm          sync.RWMutex
}

func (eb *EventBus) Publish(topic string, data interface{}) {
    eb.rm.RLock()
    if chans, found := eb.subscribers[topic]; found {
        // 這樣做是因爲切片引用相同的數組,即使它們是按值傳遞的
        // 因此我們正在使用我們的元素創建一個新切片,從而正確地保持鎖定
        channels := append(DataChannelSlice{}, chans...)
        go func(data DataEvent, dataChannelSlices DataChannelSlice) {
            for _, ch := range dataChannelSlices {
                ch <- data
            }
        }(DataEvent{Data: data, Topic: topic}, channels)
    }
    eb.rm.RUnlock()
}

func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
    eb.rm.Lock()
    if prev, found := eb.subscribers[topic]; found {
        eb.subscribers[topic] = append(prev, ch)
    } else {
        eb.subscribers[topic] = append([]DataChannel{}, ch)
    }
    eb.rm.Unlock()
}

var eb = &EventBus{
    subscribers: map[string]DataChannelSlice{},
}

func printDataEvent(ch string, data DataEvent) {
    fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}

func publisTo(topic string, data string) {
    for {
        eb.Publish(topic, data)
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    }
}

func main() {
    ch1 := make(chan DataEvent)
    ch2 := make(chan DataEvent)
    ch3 := make(chan DataEvent)

    eb.Subscribe("topic1", ch1)
    eb.Subscribe("topic2", ch2)
    eb.Subscribe("topic2", ch3)

    go publisTo("topic1""Hi topic 1")
    go publisTo("topic2""Welcome to topic 2")

    for {
        select {
        case d := <-ch1:
            go printDataEvent("ch1", d)
        case d := <-ch2:
            go printDataEvent("ch2", d)
        case d := <-ch3:
            go printDataEvent("ch3", d)
        }
    }
}

使用 channel 取代回調的理由

傳統的回調方式要求實現某種接口。

例如,

type Subscriber interface {
   onData(event Event)
}

使用回調的話,如果你想訂閱一個事件,你需要實現該接口,以便事件總線可以傳播它。

type MySubscriber struct {
}
func (m MySubscriber) onData(event Event)  {
   // 處理事件
}

channel 允許你在沒有接口的情況下在一個簡單的函數中註冊訂閱者。

func main() {
   ch1 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   fmt.Println((<-ch1).Data)
   ...
}

結論

本文的目的是指出編寫事件總線的不同實現方法。

這可能不是理想的解決方案。

例如,channel 被阻塞直到有人消費它們。這有一定的侷限性。

我已經使用切片來存儲主題的所有訂閱者。這用於簡化文章。這需要用 SET 替換,以至於列表中不存在重複的訂閱者。

傳統的回調方法可以使用提供的相同的原理去簡單地實現。你可以輕鬆地在 goroutine 中進行異步裝飾發佈事件。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/WbkhgMCI3WIGgGfLvgxarg