Go 事件驅動編程:實現一個簡單的事件總線
前言
在當今微服務和分佈式系統盛行的背景下,事件驅動架構(Event-Driven Architecture
,EDA
)扮演着一個至關重要的角色,此架構的設計使得服務間可以通過事件進行同步或異步通信,替代了傳統的直接接口調用。基於事件的交互方式,促進了服務之間的松耦合,提高系統的可擴展性。
發佈 - 訂閱模式是實現事件驅動架構的模式之一,它允許系統的不同組件或服務發佈事件,而其他組件或服務可以訂閱這些事件並根據事件內容進行響應。相信大部分開發者都接觸過這一模式,常見的技術實現有消息隊列(MQ
)和 Redis
發佈 / 訂閱(PUB/SUB
)功能等。
在 Go
語言中,我們可以利用其強大的 channel
和併發機制來實現發佈 - 訂閱模式。本文將深入探討如何在 Go
中實現一個簡單的事件總線,這是發佈 - 訂閱模式的具體實現。
準備好了嗎?準備一杯你最喜歡的咖啡或茶,隨着本文一探究竟吧。
事件總線
事件總線是發佈 - 訂閱模式的具體實現,它作爲發佈者和訂閱者的中間件,管理着事件傳遞與分發,確保事件從發佈者順利地傳達到訂閱者。
事件總線的優勢主要包括:
-
解耦:服務間不需要直接通信,而是通過事件進行交互,減少服務間的依賴。
-
異步處理:事件可以被異步處理,提高系統的響應性和性能。
-
可擴展性:新的訂閱者可以輕鬆訂閱事件,不需要修改現有的發佈者代碼。
-
錯誤隔離:事件處理的失敗不會直接影響其他服務的正常運行。
事件總線的代碼實現
接下來將介紹如何在 Go
語言中實現一個簡單的事件總線,它包含以下關鍵功能:
-
發佈:允許系統的各個服務發送事件。
-
訂閱:允許感興趣的服務訂閱接收特定類型的事件。
-
取消訂閱:允許各個服務將本身已訂閱的事件刪除。
項目源碼地址:https://github.com/chenmingyong0423/go-eventbus
事件數據結構定義
type Event struct {
Payload any
}
Event
是一個封裝事件的結構體,其中 Payload
爲事件的上下文信息,類型是 any
。
事件總線定義
type (
EventChan chan Event
)
type EventBus struct {
mu sync.RWMutex
subscribers map[string][]EventChan
}
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string][]EventChan),
}
}
EventChan
是一個類型別名,定義爲傳遞 Event
結構體的通道 chan Event
。
EventBus
爲事件總線的定義,它包含兩個屬性:
-
mu
:一個讀寫互斥鎖(sync.RWMutex
),用於保證下面subscribers
的併發讀寫安全。 -
subscribers
:一個映射,鍵爲字符串類型,表示訂閱的主題;值爲EventChan
切片類型。該屬性用於存儲各個主體的所有訂閱者,每個訂閱者通過EventChan
接收事件。
NewEventBus
函數用於創建一個新的 EventBus
事件總線。
事件總線的方法實現
事件總線實現了三個方法,分別爲發佈事件(Publish
)和訂閱事件(Subscribe
)以及取消訂閱事件(Unsubscribe
)。
Publish 發佈事件
func (eb *EventBus) Publish(topic string, event Event) {
eb.mu.RLock()
defer eb.mu.RUnlock()
// 複製一個新的訂閱者列表,避免在發佈事件時修改訂閱者列表
subscribers := append([]EventChan{}, eb.subscribers[topic]...)
gofunc() {
for _, subscriber := range subscribers {
subscriber <- event
}
}()
}
Publish
方法用於發佈事件。該方法接收兩個參數:topic
(主題)和 event
(封裝事件的對象)。
在 Publish
方法的實現中,首先通過 mu
屬性獲取讀鎖,以確保接下來的 subscribers
寫操作是協程安全的。然後複製一份當前主題的訂閱者列表 subscribers
。接下來開啓一個新 goroutine
,在這個 goroutine
中遍歷複製的訂閱者列表,將事件通過通道發送給所有訂閱者。完成這些操作後,釋放讀鎖。
爲什麼會複製一個新的訂閱者列表?
答:複製訂閱者列表是爲了在發送事件時保持數據的一致性和穩定性。由於向通道發送數據的操作是在一個新的 goroutine
中進行的,在發送數據時,讀鎖已經被釋放,原來的訂閱者列表可能會由於添加或刪除訂閱者而發生變化。如果直接使用原來的訂閱者列表,可能會發生預料之外的錯誤(如向一個已經關閉的通道發送數據會產生 panic
)。
Subscribe 訂閱事件
func (eb *EventBus) Subscribe(topic string) EventChan {
eb.mu.Lock()
defer eb.mu.Unlock()
ch := make(EventChan)
eb.subscribers[topic] = append(eb.subscribers[topic], ch)
return ch
}
Subscribe
方法用於訂閱特定主題的事件。該方法有接收一個 topic
參數,表示希望訂閱的主題。通過此方法,可以獲得一個 EventChan
通道,用於接收該主題的事件。
在 Subscribe
方法的實現中,首先通過 mu
屬性獲取寫鎖,以保證接下來的 subscribers
讀寫操作是協程安全的;接着創建一個新的 EventChan
通道 ch
,將其添加到相應主題的訂閱者切片中。完成這些操作後,釋放寫鎖。
Unsubscribe 取消訂閱事件
func (eb *EventBus) Unsubscribe(topic string, ch EventChan) {
eb.mu.Lock()
defer eb.mu.Unlock()
if subscribers, ok := eb.subscribers[topic]; ok {
for i, subscriber := range subscribers {
if ch == subscriber {
eb.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)
close(ch)
// 清空通道
forrange ch {
}
return
}
}
}
}
Unsubscribe
方法用於取消訂閱事件。該方法接收兩個參數:topic
(已訂閱的主題)和 ch
(被頒發的通道)。
在 Unsubscribe
方法裏,首先通過 mu
屬性獲取寫鎖,以保證接下來的 subscribers
讀寫操作是協程安全的;然後檢查 topic
主題是否存在對應的訂閱者。如果存在,遍歷該主題的訂閱者切片,找到與 ch
相匹配的通道,將其從訂閱者切片裏移除並關閉該通道。然後清空通道。完成這些操作後,釋放寫鎖。
使用示例
// https://github.com/chenmingyong0423/blog/blob/master/tutorial-code/go/eventbus/main.go
package main
import (
"fmt"
"time"
"github.com/chenmingyong0423/go-eventbus"
)
func main() {
eventBus := eventbus.NewEventBus()
// 訂閱 post 主題事件
subscribe := eventBus.Subscribe("post")
gofunc() {
for event := range subscribe {
fmt.Println(event.Payload)
}
}()
eventBus.Publish("post", eventbus.Event{Payload: map[string]any{
"postId": 1,
"title": "Go 事件驅動編程:實現一個簡單的事件總線",
"author": "陳明勇",
}})
// 不存在訂閱者的 topic
eventBus.Publish("pay", eventbus.Event{Payload: "pay"})
time.Sleep(time.Second * 2)
// 取消訂閱 post 主題事件
eventBus.Unsubscribe("post", subscribe)
}
擴展建議
本文實現的事件總線較爲簡單,如果要增強時間總線的靈活性,可靠性和易用性等方面,我們可以考慮擴展它,以下是一些建議:
-
事件持久化:實現時間的持久化存儲功能,確保系統崩潰後可以恢復未處理的事件。
-
通配符和模式匹配訂閱:允許使用通配符或正則表達式來訂閱一組相關主題,而不是單個具體的主題。
-
負載均衡和消息分發策略:在多個訂閱者之間分配事件,實現負載均衡。
-
插件支持:支持通過插件來擴展功能,如日誌記錄、消息過濾、轉換等。
小結
本文深入探討了在 Go
語言中實現簡單事件總線的過程。通過利用 Go
語言的強大特性,如 channel
和併發機制,我們可以輕鬆地實現發佈 - 訂閱模式。
文章從事件總線的優勢開始,介紹了其解耦、異步處理、可擴展性和錯誤隔離等特點。然後詳細解釋瞭如何定義事件數據結構和事件總線結構,並實現了發佈、訂閱和取消訂閱事件的方法。最後,提出了一些可能的擴展方向,如事件持久化、通配符訂閱、負載均衡和插件支持,以增強事件總線的靈活性和功能性。
通過閱讀本文,你可以學會在 Go
語言中實現一個簡單但功能強大的事件總線,並根據可能的需求進行擴展。
項目源碼地址:https://github.com/chenmingyong0423/go-eventbus
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/7LgEB79u6orPmW-44KSBpg