Go singlefight 源碼詳解

寫在前面

通俗的來說就是 singleflight 將相同的併發請求合併成一個請求,進而減少對下層服務的壓力,通常用於解決緩存擊穿的問題。

詳解

基礎結構

golang.org/x/sync/singleflight singleflight 結構體:

type call struct {
 wg sync.WaitGroup

 // 這些字段在 WaitGroup 結束前寫入一次
 // 只有在 WaitGroup 結束後纔會被讀取。
 val interface{}
 err error

 // 這些字段在 WaitGroup 結束前使用 singleflight 互斥鎖進行讀寫
 // 在 WaitGroup 結束後讀取但不寫入。
 dups  int
 chans []chan<- Result
}

Group 代表分成多個工作組,形成一個命名空間,在這個命名空間中,各工作單元可以重複執行。

type Group struct {
 mu sync.Mutex       // 互斥鎖
 m  map[string]*call // 懶加載
}

Result 保存 Do 方法的結果,以便在通道上傳遞。做異步處理。

type Result struct {
 Val    interface{}
 Err    error
 Shared bool
}

簡單 demo

func TestSingleFightExample(t *testing.T) {
 var group singleflight.Group
 // 模擬一個併發請求
 for i := 0; i < 5; i++ {
  go func(i int) {
   key := "example"
   tmp := i // 將tmp放進去
   val, err, _ := group.Do(key, func() (interface{}, error) {
    // 模擬一次耗時操作
    time.Sleep(time.Second)
    return fmt.Sprintf("result_%d", tmp), nil
   })
   if err != nil {
    fmt.Println("Error:", err)
   }
   fmt.Println("Value:", val)
  }(i)
 }
 // 等待所有請求完成
 time.Sleep(3 * time.Second)
}

結果:這是一個很隨機的過程,0~4 都有可能,主要看哪個協程最先進來。

Value: result_2
Value: result_2
Value: result_2
Value: result_2
Value: result_2

Do 執行函數:對同一個 key 多次調用的時候,在第一次調用沒有執行完的時候, 只會執行一次 fn,其他的調用會阻塞住等待這次調用返回, shared 表示fn的結果是否被共享

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

DoChan 和 Do 類似,只是 DoChan 返回一個 channel,也就是同步與異步的區別

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

Forget:用於通知 Group 刪除某個 key 這樣後面繼續這個 key 的調用的時候就不會在阻塞等待了

func (g *Group) Forget(key string){
 g.mu.Lock()
 if c, ok := g.m[key]; ok {
  c.forgotten = true
 }
 delete(g.m, key)
 g.mu.Unlock()
}

singleflight 的本質是對某次函數調用的複用,只執行1次,並將執行期間相同的函數返回相同的結果。由此產生一個問題,如果實際執行的函數出了問題,比如超時,則在此期間的所有調用都會超時,由此需要一些額外的方法來控制。

在一些對可用性要求極高的場景下,往往需要一定的請求飽和度來保證業務的最終成功率。一次請求還是多次請求,對於下游服務而言並沒有太大區別,此時使用 singleflight 只是爲了降低請求的數量級,那麼使用 Forget() 提高下游請求的併發。

常見面試題

singleflight 是什麼?什麼時候用的?

緩存失效,合併請求的時候用的,這樣我們就可以減少對 DB 的請求壓力。

如果這個 goruntine 超時怎麼辦?

singleflight 內部使用 waitGroup 來讓同一個 key 的除了第一個請求的後續所有請求都阻塞。直到第一個請求執行 func 返回後,其他請求才會返回。 這意味着,如果 func 執行需要很長時間,那麼後面的所有請求都會被一直阻塞。 這時候我們可以使用 DoChan 結合ctx + select做超時控制

func TestSingleFightTimeout(t *testing.T) {
 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 go doFly(ctx)
 time.Sleep(2 * time.Second)
 cancel() // 2秒後超時
}

func doFly(ctx context.Context) {
 var g singleflight.Group
 key := "example"
 // 使用 DoChan 結合 select 做超時控制
 result := g.DoChan(key, func() (interface{}, error) {
  time.Sleep(5 * time.Second) // 模擬超時
  return "result", nil
 })
 select {
 case r := <-result:
  fmt.Println("r", r.Val)
 case <-ctx.Done():
  fmt.Println("done")
  return
 }
}

結果輸出:

done

上述代碼中,我們將主進程先sleep 2秒,然後再進行cancel,那麼此時我們將會讓DoChan這個方法 time.Sleep 5 秒模擬超時。那麼我們會發現函數過了 2 秒之後就會輸出done

doChan 方法具體是怎麼實現的?

在 DoChan 方法中,有一個 go g.doCall(c, key, fn) 的操作,當一個 goroutine 來執行,並通過channel 來返回數據,這樣外部可以自定義超時邏輯,防止因爲 fn 的阻塞,導致大量請求都被阻塞。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 ch := make(chan Result, 1)
 g.mu.Lock()
 if g.m == nil {
  g.m = make(map[string]*call)
 }
 if c, ok := g.m[key]; ok { // 如果沒有這個key
  c.dups++
  c.chans = append(c.chans, ch)
  g.mu.Unlock()
  return ch
 }
 c := &call{chans: []chan<- Result{ch}} // 構造異步返回結構體,可以接參數進行超時
 c.wg.Add(1)
 g.m[key] = c
 g.mu.Unlock()

 go g.doCall(c, key, fn) // 異步執行

 return ch
}

如果請求失敗了怎麼辦?

如果第一個請求失敗了,那麼後續所有等待的請求都會返回同一個 error。但實際上可以根據下游能支撐的 rps 定時 forget 這個 key,讓更多的請求能有機會走到後續邏輯。

go func() {
       time.Sleep(100 * time.Millisecond)
       g.Forget(key)
   }()

比如 1 秒內有 100 個請求過來,正常是第一個請求能執行 queryDB,後續 99 個都會阻塞。增加這個 Forget 之後,每 100ms 就能有一個請求執行 queryDB,相當於是多了幾次嘗試的機會,相對的也給 DB 造成了更大的壓力,需要根據具體場景進去取捨。 因爲有可能前幾次是因爲 DB 的抖動導致的查詢失敗,重試之後就能實現了。

參考鏈接

[1] https://pkg.go.dev/golang.org/x/sync/singleflight

[2] https://www.lixueduan.com/posts/go/singleflight

[3] https://juejin.cn/post/7093859835694809125

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_DvY8P0wmq4J9KGqFmnNKg