拒絕 Go 代碼臃腫,其實在這幾塊可以用下觀察者模式

今天學習一下用 Go 實現觀察者模式,觀察者模式主要是用來實現事件驅動編程。事件驅動編程的應用還是挺廣的,除了我們都知道的能夠用來解耦:用戶修改密碼後,給用戶發短信進行風險提示之類的典型場景,在微服務架構實現最終一致性、實現事件源(A + ES)這些都會用到。

我們先來簡單學習一下用 Go 實現觀察者設計模式,給怎麼實現事件驅動編程、事件源這些模式做個鋪墊。主要也是我也老沒看設計模式了,一起再複習一下。以前看的設計模式教程都是 Java 的,這次用 Go 實現一番。

觀察者模式

咱們先來看一下觀察者模式的概念,我儘量加一些自己的理解,讓它變成咱們都能理解的大俗話:

概念

觀察者模式 (Observer Pattern),定義對象間的一種一對多依賴關係,使得每當一個對象狀態發生改變時,其相關依賴對象皆得到通知,依賴對象在收到通知後,可自行調用自身的處理程序,實現想要乾的事情,比如更新自己的狀態。

發佈者對觀察者唯一瞭解的是它實現了某個接口(觀察者接口)。這種鬆散耦合的設計最大限度地減少了對象之間的相互依賴,因此使我們能夠構建靈活的系統。

我的理解

上面這段話看完,相信幾乎對於理解觀察者模式能起到的作用微乎其微,類似於現實職場里加班對項目進度起到的作用一樣,加班的時候誰還沒打過幾把王者榮耀,嘿。下面我用自己的理解再給你們嘮一下。

觀察者模式也經常被叫做發佈 - 訂閱(Publish/Subscribe)模式、上面說的定義對象間的一種一對多依賴關係,一 - 指的是發佈變更的主體對象,多 - 指的是訂閱變更通知的訂閱者對象。

發佈的狀態變更信息會被包裝到一個對象裏,這個對象被稱爲事件,事件一般用英語過去式的語態來命名,比如用戶註冊時,用戶模塊在用戶創建好後發佈一個事件 UserCreated 或者 UserWasCreated 都行,這樣從名字上就能看出,這是一個已經發生過的事件。

事件發佈給訂閱者的過程,其實就是遍歷一下已經註冊的事件訂閱者,逐個去調用訂閱者實現的觀察者接口方法,比如叫 handleEvent 之類的方法,這個方法的參數一般就是當前的事件對象。

至於很多人會好奇的,事件的處理是不是異步的?主要看我們的需求是什麼,一般情況下是同步的,即發佈事件後,觸發事件的方法會阻塞等到全部訂閱者返回後再繼續,當然也可以讓訂閱者的處理異步執行,完全看我們的需求。

大部分場景下其實是同步執行的,單體架構會在一個數據庫事務裏持久化因爲主體狀態變更,而需要更改的所有實體類。

微服務架構下常見的做法是有一個事件存儲,訂閱者接到事件通知後,會把事件先存到事件存儲裏,這兩步也需要在一個事務裏完成才能保證最終一致性,後面會再有其他線程把事件從事件存儲裏搞到消息設施裏,發給其他服務,從而在微服務架構下實現各個位於不同服務的實體間的最終一致性。

所以觀察者模式,從程序效率上看,大多數情況下沒啥提升,更多的是達到一種程序結構上的解耦,讓代碼不至於那麼難維護。

Go 實現觀察者模式

說了這麼多,我們再看下用 Go 怎麼實現最簡單的觀察者模式:

package main

import "fmt"

// Subject 接口,它相當於是發佈者的定義
type Subject interface {
 Subscribe(observer Observer)
 Notify(msg string)
}

// Observer 觀察者接口
type Observer interface {
 Update(msg string)
}

// Subject 實現
type SubjectImpl struct {
 observers []Observer
}

// Subscribe 添加觀察者(訂閱者)
func (sub *SubjectImpl) Subscribe(observer Observer) {
 sub.observers = append(sub.observers, observer)
}


// Notify 發佈通知
func (sub *SubjectImpl) Notify(msg string) {
 for _, o := range sub.observers {
  o.Update(msg)
 }
}

// Observer1 Observer1
type Observer1 struct{}

// Update 實現觀察者接口
func (Observer1) Update(msg string) {
 fmt.Printf("Observer1: %s\n", msg)
}

// Observer2 Observer2
type Observer2 struct{}

// Update 實現觀察者接口
func (Observer2) Update(msg string) {
 fmt.Printf("Observer2: %s\n", msg)
}

func main(){
 sub := &SubjectImpl{}
 sub.Subscribe(&Observer1{})
 sub.Subscribe(&Observer2{})
 sub.Notify("Hello")
}

這就是 Go 實現觀察者模式的代碼,實際應用的時候,一般會定義個事件總線 EventBus 或者事件分發器 Event Dispatcher,來管理事件和訂閱者間的關係,以及分發事件,它們兩個就是名不一樣,角色定位一樣。

下面看一下用 Go 怎麼實現事件總線。

Go 實現事件總線

下面我們實現一個支持以下功能的事件總線

  1. 異步不阻塞

  2. 支持任意參數值

這個代碼不是我自己寫的,出處見代碼註釋首行。

代碼

// 代碼來自https://lailin.xyz/post/observer.html
package eventbus

import (
 "fmt"
 "reflect"
 "sync"
)

// Bus Bus
type Bus interface {
 Subscribe(topic string, handler interface{}) error
 Publish(topic string, args ...interface{})
}

// AsyncEventBus 異步事件總線
type AsyncEventBus struct {
 handlers map[string][]reflect.Value
 lock     sync.Mutex
}

// NewAsyncEventBus new
func NewAsyncEventBus() *AsyncEventBus {
 return &AsyncEventBus{
  handlers: map[string][]reflect.Value{},
  lock:     sync.Mutex{},
 }
}

// Subscribe 訂閱
func (bus *AsyncEventBus) Subscribe(topic string, f interface{}) error {
 bus.lock.Lock()
 defer bus.lock.Unlock()

 v := reflect.ValueOf(f)
 if v.Type().Kind() != reflect.Func {
  return fmt.Errorf("handler is not a function")
 }

 handler, ok := bus.handlers[topic]
 if !ok {
  handler = []reflect.Value{}
 }
 handler = append(handler, v)
 bus.handlers[topic] = handler

 return nil
}

// Publish 發佈
// 這裏異步執行,並且不會等待返回結果
func (bus *AsyncEventBus) Publish(topic string, args ...interface{}) {
 handlers, ok := bus.handlers[topic]
 if !ok {
  fmt.Println("not found handlers in topic:", topic)
  return
 }

 params := make([]reflect.Value, len(args))
 for i, arg := range args {
  params[i] = reflect.ValueOf(arg)
 }

 for i := range handlers {
  go handlers[i].Call(params)
 }
}

單測

package eventbus

import (
 "fmt"
 "testing"
 "time"
)

func sub1(msg1, msg2 string) {
 time.Sleep(1 * time.Microsecond)
 fmt.Printf("sub1, %s %s\n", msg1, msg2)
}

func sub2(msg1, msg2 string) {
 fmt.Printf("sub2, %s %s\n", msg1, msg2)
}
func TestAsyncEventBus_Publish(t *testing.T) {
 bus := NewAsyncEventBus()
 bus.Subscribe("topic:1", sub1)
 bus.Subscribe("topic:1", sub2)
 bus.Publish("topic:1""test1""test2")
 bus.Publish("topic:1""testA""testB")
 time.Sleep(1 * time.Second)
}

毫不意外這個事件總線,只是個例子,咱也不能在項目開發裏使用,這篇文章咱們先搞清概念,我其實前兩天關注了下,沒有發現什麼好用的事件分發、事件總線的三方庫,好在實現起來也不難,後面我準備自己寫一個能用的到時候分享給大家,最起碼是在學習、練習項目裏能使用的吧。

總結

今天給大家用大白話瞎嘮了一下觀察者模式的原理和實際怎麼應用,感覺文章的精髓主要在前半部分,可能有的不你還不能理解,後面我會再通過後續文章逐一解釋,其實這些都是事件驅動和事件源這些模式裏的基礎內容。

至於這次給出的代碼,其實沒啥實戰意義,就是大家先了解一下。Go 裏邊關於事件驅動之類的內容,感覺不多,有 Spring 使用經驗的可以先看看 Spring 提供的 @EventListener 註解,需要訂閱者異步執行可以配合 @Async 註解使用,至於我上面說的需要保證事件發佈的主體和訂閱者的原子化的話,則是通過 @Transitional 和 @TransactionalEventListener 結合使用來實現。

網管叨 bi 叨 分享軟件開發和系統架構設計基礎、Go 語言和 Kubernetes。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/4NqjkXVqFPamEc_QsyRipA