Golang 事件系統 Event Bus
最近在學習開源項目Grafana
的代碼,發現作者實現了一個事件總線的機制,在項目裏面大量應用,效果也非常好,代碼也比較簡單,介紹給大家看看。
源碼文件地址:grafana/bus.go at main · grafana/grafana · GitHub
1. 註冊和調用
在這個項目裏面隨處可見這種寫法:
func ValidateOrgAlert(c *models.ReqContext) {
id := c.ParamsInt64(":alertId")
query := models.GetAlertByIdQuery{Id: id}
if err := bus.Dispatch(&query); err != nil {
c.JsonApiErr(404, "Alert not found", nil)
return
}
if c.OrgId != query.Result.OrgId {
c.JsonApiErr(403, "You are not allowed to edit/view alert", nil)
return
}
}
關鍵是bus.Dispatch(&query)
這段代碼,它的參數是一個結構體GetAlertByIdQuery
,內容如下:
type GetAlertByIdQuery struct {
Id int64
Result *Alert
}
根據名字可以看出這個方法就是通過 Id 去查詢 Alert,其中Alert
結構體就是結果對象,這裏就不貼出來了。
通過查看源碼可以得知,Dispatch 背後是調用了GetAlertById
這個方法,然後把結果賦值到 query 參數的 Result 中返回。
func GetAlertById(query *models.GetAlertByIdQuery) error {
alert := models.Alert{}
has, err := x.ID(query.Id).Get(&alert)
if !has {
return fmt.Errorf("could not find alert")
}
if err != nil {
return err
}
query.Result = &alert
return nil
}
問題來了,這是怎麼實現的呢?Dispatch 到底做了哪些操作?這樣做有什麼好處?
下面我來一一解答:
首先,在 Dispatch 之前,你需要先註冊這個方法,也就是調用AddHandler
,在這個項目裏面可以看到 init 函數里面有大量這樣的代碼:
func init() {
bus.AddHandler("sql", SaveAlerts)
bus.AddHandler("sql", HandleAlertsQuery)
bus.AddHandler("sql", GetAlertById)
...
}
其實這個方法的邏輯也很簡單,所謂註冊也就是把通過一個 map 把函數名和對應的函數做一個映射關係保存起來,當我們 Dispatch 的時候其實就是通過參數名查找之前註冊過的函數,然後通過反射調用該函數。
Bus 結構體裏面有幾個 map 成員,在這個項目裏面作者定義了 3 種不同類型的 handler,一種是普通的 handler,也就是剛纔展示的那種,第二種是帶上下文的 handler,還有一種則是事件訂閱用到的 handler,我們給一個事件註冊多個監聽者,當事件觸發的時候會依次調用多個監聽函數,其實就是一個觀察者模式。
// InProcBus defines the bus structure
type InProcBus struct {
handlers map[string]HandlerFunc
handlersWithCtx map[string]HandlerFunc
listeners map[string][]HandlerFunc
txMng TransactionManager
}
下面就看看具體的源碼,AddHandler
方法內容如下:
func (b *InProcBus) AddHandler(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
queryTypeName := handlerType.In(0).Elem().Name() // 獲取函數第一個參數的名稱,在上面例子裏面就是GetAlertByIdQuery
b.handlers[queryTypeName] = handler
}
Dispatch 方法的源碼如下:
func (b *InProcBus) Dispatch(msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
withCtx := true
handler := b.handlersWithCtx[msgName] // 根據參數名查找註冊過的函數,先查找帶Ctx的handler
if handler == nil {
withCtx = false
handler = b.handlers[msgName]
if handler == nil {
return ErrHandlerNotFound
}
}
var params = []reflect.Value{}
if withCtx {
// 如果查找到的handler是帶Ctx的就給個默認的Background的Ctx
params = append(params, reflect.ValueOf(context.Background()))
}
params = append(params, reflect.ValueOf(msg))
ret := reflect.ValueOf(handler).Call(params) // 通過反射機制調用函數
err := ret[0].Interface()
if err == nil {
return nil
}
return err.(error)
}
對於AddHandlerCtx
和DispatchCtx
這個 2 個方法基本上是一樣的,只不過多了一個上下文參數,可以拿來做超時控制或者其它用途。
2. 訂閱和發佈
除此之外,還有 2 個方法AddEventListener
和Publish
,即事件的訂閱和發佈。
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
eventName := handlerType.In(0).Elem().Name()
_, exists := b.listeners[eventName]
if !exists {
b.listeners[eventName] = make([]HandlerFunc, 0)
}
b.listeners[eventName] = append(b.listeners[eventName], handler)
}
查看源碼可以得知,可以給一個事件註冊多個 handler 函數,而 Publish 的時候則是依次調用註冊的函數,邏輯也不復雜。
func (b *InProcBus) Publish(msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
var listeners = b.listeners[msgName]
var params = make([]reflect.Value, 1)
params[0] = reflect.ValueOf(msg)
for _, listenerHandler := range listeners {
ret := reflect.ValueOf(listenerHandler).Call(params)
e := ret[0].Interface()
if e != nil {
err, ok := e.(error)
if ok {
return err
}
return fmt.Errorf("expected listener to return an error, got '%T'", e)
}
}
return nil
}
這裏面有一點不好,所有訂閱函數的調用是順序的,並沒有使用協程,所以如果註冊了很多個函數,這樣效率也不高啊。
3. 好處
可能有人會好奇,爲什麼明明可以直接調用函數就行,爲啥非得繞個彎子,整這麼複雜?
況且,每次調用都得使用反射機制,性能也不行。
我覺得主要有以下幾點:
-
這種寫法邏輯清晰,解耦
-
方便單元測試
-
性能不是最大考量,雖然說反射會降低性能
轉自: wangbjun.site/2021/coding/golang/event-bus.html
Go 開發大全
參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/cTZKPHYHqF1_bJsJPSwgkw