Golang 事件系統 Event Bus

最近在學習開源項目Grafana的代碼,發現作者實現了一個事件總線的機制,在項目裏面大量應用,效果也非常好,代碼也比較簡單,介紹給大家看看。

源碼文件地址:grafana/bus.go at main · grafana/grafana · GitHub

1. 註冊和調用

在這個項目裏面隨處可見這種寫法:

func ValidateOrgAlert(c *models.ReqContext) {
    id := c.ParamsInt64(":alertId")
    
    query := models.GetAlertByIdQuery{Id: id}

    if err := bus.Dispatch(&query); err != nil {
        c.JsonApiErr(404, "Alert not found", nil)
        return
    }

    if c.OrgId != query.Result.OrgId {
        c.JsonApiErr(403, "You are not allowed to edit/view alert", nil)
        return
    }
}

關鍵是bus.Dispatch(&query)這段代碼,它的參數是一個結構體GetAlertByIdQuery,內容如下:

type GetAlertByIdQuery struct {
    Id int64
    Result *Alert
}

根據名字可以看出這個方法就是通過 Id 去查詢 Alert,其中Alert結構體就是結果對象,這裏就不貼出來了。

通過查看源碼可以得知,Dispatch 背後是調用了GetAlertById這個方法,然後把結果賦值到 query 參數的 Result 中返回。

func GetAlertById(query *models.GetAlertByIdQuery) error {
    alert := models.Alert{}
    has, err := x.ID(query.Id).Get(&alert)
    if !has {
        return fmt.Errorf("could not find alert")
    }
    if err != nil {
        return err
    }
    query.Result = &alert
    return nil
}

問題來了,這是怎麼實現的呢?Dispatch 到底做了哪些操作?這樣做有什麼好處?

下面我來一一解答:

首先,在 Dispatch 之前,你需要先註冊這個方法,也就是調用AddHandler,在這個項目裏面可以看到 init 函數里面有大量這樣的代碼:

func init() {
    bus.AddHandler("sql", SaveAlerts)
    bus.AddHandler("sql", HandleAlertsQuery)
    bus.AddHandler("sql", GetAlertById)
    ...
}

其實這個方法的邏輯也很簡單,所謂註冊也就是把通過一個 map 把函數名和對應的函數做一個映射關係保存起來,當我們 Dispatch 的時候其實就是通過參數名查找之前註冊過的函數,然後通過反射調用該函數。

Bus 結構體裏面有幾個 map 成員,在這個項目裏面作者定義了 3 種不同類型的 handler,一種是普通的 handler,也就是剛纔展示的那種,第二種是帶上下文的 handler,還有一種則是事件訂閱用到的 handler,我們給一個事件註冊多個監聽者,當事件觸發的時候會依次調用多個監聽函數,其實就是一個觀察者模式。

// InProcBus defines the bus structure
type InProcBus struct {
    handlers        map[string]HandlerFunc
    handlersWithCtx map[string]HandlerFunc
    listeners       map[string][]HandlerFunc
    txMng           TransactionManager
}

下面就看看具體的源碼,AddHandler方法內容如下:

func (b *InProcBus) AddHandler(handler HandlerFunc) {
    handlerType := reflect.TypeOf(handler)
    queryTypeName := handlerType.In(0).Elem().Name() // 獲取函數第一個參數的名稱,在上面例子裏面就是GetAlertByIdQuery
    b.handlers[queryTypeName] = handler
}

Dispatch 方法的源碼如下:

func (b *InProcBus) Dispatch(msg Msg) error {
    var msgName = reflect.TypeOf(msg).Elem().Name()

    withCtx := true
    handler := b.handlersWithCtx[msgName] // 根據參數名查找註冊過的函數,先查找帶Ctx的handler
    if handler == nil {
        withCtx = false
        handler = b.handlers[msgName]
        if handler == nil {
            return ErrHandlerNotFound
        }
    }
    var params = []reflect.Value{}
    if withCtx {
     // 如果查找到的handler是帶Ctx的就給個默認的Background的Ctx
        params = append(params, reflect.ValueOf(context.Background()))
    }
    params = append(params, reflect.ValueOf(msg))

    ret := reflect.ValueOf(handler).Call(params) // 通過反射機制調用函數
    err := ret[0].Interface()
    if err == nil {
        return nil
    }
    return err.(error)
}

對於AddHandlerCtxDispatchCtx這個 2 個方法基本上是一樣的,只不過多了一個上下文參數,可以拿來做超時控制或者其它用途。

2. 訂閱和發佈

除此之外,還有 2 個方法AddEventListenerPublish,即事件的訂閱和發佈。

func (b *InProcBus) AddEventListener(handler HandlerFunc) {
    handlerType := reflect.TypeOf(handler)
    eventName := handlerType.In(0).Elem().Name()
    _, exists := b.listeners[eventName]
    if !exists {
        b.listeners[eventName] = make([]HandlerFunc, 0)
    }
    b.listeners[eventName] = append(b.listeners[eventName], handler)
}

查看源碼可以得知,可以給一個事件註冊多個 handler 函數,而 Publish 的時候則是依次調用註冊的函數,邏輯也不復雜。

func (b *InProcBus) Publish(msg Msg) error {
    var msgName = reflect.TypeOf(msg).Elem().Name()
    var listeners = b.listeners[msgName]

    var params = make([]reflect.Value, 1)
    params[0] = reflect.ValueOf(msg)

    for _, listenerHandler := range listeners {
        ret := reflect.ValueOf(listenerHandler).Call(params)
        e := ret[0].Interface()
        if e != nil {
            err, ok := e.(error)
            if ok {
                return err
            }
            return fmt.Errorf("expected listener to return an error, got '%T'", e)
        }
    }
    return nil
}

這裏面有一點不好,所有訂閱函數的調用是順序的,並沒有使用協程,所以如果註冊了很多個函數,這樣效率也不高啊。

3. 好處

可能有人會好奇,爲什麼明明可以直接調用函數就行,爲啥非得繞個彎子,整這麼複雜?

況且,每次調用都得使用反射機制,性能也不行。

我覺得主要有以下幾點:

  1. 這種寫法邏輯清晰,解耦

  2. 方便單元測試

  3. 性能不是最大考量,雖然說反射會降低性能

轉自: wangbjun.site/2021/coding/golang/event-bus.html

Go 開發大全

參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/cTZKPHYHqF1_bJsJPSwgkw