singleflight 設計與實現

概述

singleflight 提供了一套函數重複調用時的抑制機制,經常用來限制併發訪問下的重複請求。

例如一個比較常見的場景是使用 singleflight 來限制同一個緩存 key 的重複請求,避免發生 緩存擊穿 時,避免請求全部落到數據庫,減少性能影響和宕機風險。

singleflight 示意圖

示例

參考 擴展閱讀 列表的文章。

內部實現

我們來探究一下 golang.org/x/sync/singleflight 的內部實現,筆者的 Go 版本爲 go1.19 linux/amd64。注意:這個包屬於擴展包,標準庫的包是 internal/singleflight/singleflight.go,因爲筆者項目中用到的是擴展包,所以這裏以擴展包實現爲準, 感興趣的讀者可以研究一下標準庫中的包,兩者大同小異。

UML

singleflight UML

errGoexit

errGoexit 錯誤表明用戶函數執行過程中,調用了 runtime.Goexit(), runtime.Goexit() 會直接終止程序,並在終止前執行所有 defer.

var errGoexit = errors.New("runtime.Goexit was called")

panicError

panicError 對象實現了標準庫內置的錯誤接口,表示在執行給定函數期間,調用 stack trace 從拋出的 panic 中捕獲 (recover) 到了具體的值。

type panicError struct {
 value interface{}
 stack []byte
}
 
func (p *panicError) Error() string {
 return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}

newPanicError 方法

func newPanicError(v interface{}) error {
 stack := debug.Stack()

 // stack trace 返回的結果中,第一行的格式爲 "goroutine N [status]:"
 // 但是發生 panic 時,對應的 goroutine 可能已經不存在了,並且狀態已經發生改變 (這時需要刪除第一行,避免語義誤導性)
 if line := bytes.IndexByte(stack[:]'\n'); line >= 0 {
  stack = stack[line+1:]
 }
 return &panicError{value: v, stack: stack}
}

call 對象

call 對象表示正在執行中或已經執行完的 Do 方法的調用信息,這是一個抽象的數據集合,字段涵蓋了調用週期內需要用到的數據。

type call struct {
 wg sync.WaitGroup

 // 這倆字段在調用 WaitGroup 之前寫入一次
 // 並且只能在 WaitGroup 完成後讀取
 val interface{}
 err error

 // forgotten 表明在 call 執行過程中,調用方 (併發的 goroutine) 是否調用了 Forget 方法
 // 簡單來說就是,同一個 key 在獲取過程中併發調用了 Forget 方法
 forgotten bool

    // 返回值被其他 goroutine 複用的次數
 dups  int
 // 等待複用結果的 goroutine 列表
 //     goroutine 通過 Result channel 等待獲取數據 
 chans []chan<- Result
}

Group 對象

Group 表示一種類型的工作並形成一個命名空間,在該命名空間中可以抑制重複執行的工作單元,簡單來說,就是一個邏輯分組,通常根據業務場景來劃分,比如 CMS 系統中的首頁、專欄、內容頁都可以作爲單獨的分組。

type Group struct {
 // 併發調用時保護 m 字段的互斥鎖
 mu sync.Mutex  
 // key 和回調方法的關係映射
 //     採用懶加載初始化
 m  map[string]*call 
}

Result 對象

Result 對象表示 Do 方法返回的結果,可以在 channel 中傳遞 (一般是一個等待獲取數據的 goroutine)。

type Result struct {
 // 返回值支支持各種數據類型
 Val    interface{}
 // 返回錯誤值
 Err    error
 // 返回值是否被其他 goroutine 複用
 Shared bool
}

Do 方法

Do 方法執行給定的回調函數並返回結果,內部確保一個給定的 key 在同一時間 (多個 goroutine 調用時) 只執行一次,如果出現重複調用,則重複調用方等待原始(第一個)調用完成後,接收並複用相同的結果。第二個返回值 shared 表示是否將返回值 v 賦值給了多個調用者 (返回值是否被複用)。

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
 // 操作加鎖
 g.mu.Lock()
 if g.m == nil {
        // 懶加載
  g.m = make(map[string]*call)
 }
 
 if c, ok := g.m[key]; ok {  // 檢測 key 是否存在
        // 除了第 1 個搶到鎖的 goroutine,其他的 goroutine 都會執行到這裏
  
  c.dups++        // 複用數量+1
  g.mu.Unlock()   // 釋放鎖
  c.wg.Wait()     // 等待 goroutine 執行完(就是第 1 個搶到鎖的 goroutine)
  
  if e, ok := c.err.(*panicError); ok {   // 回調函數拋出了 panic error
   panic(e)
  } else if c.err == errGoexit {  // 回調函數調用了 runtime.Goexit
   runtime.Goexit()
  }
  return c.val, c.err, true
 }
 
 c := new(call)  // 如果沒有對應的 key,創建一個新的結果集
 c.wg.Add(1)     // 只有第 1 個搶到鎖的 goroutine 可以調用 Add 方法,其他的 goroutine 進入 Wait 等待
 g.m[key] = c    // 創建一個新的 key
 g.mu.Unlock()   // 創建新 key 後,就可以解鎖了,其他 goroutine 獲得鎖之後會進入上面的 if 流程

 g.doCall(c, key, fn)    // 調用 doCall() 執行 fn 函數
 return c.val, c.err, c.dups > 0
}

DoChan 方法

DoChan 方法和 Do 方法功能一樣 (同步和異步的區別),但是返回一個只讀 channel, 當 channel 準備好時,就開始接收數據,返回的 channel 不能關閉。此外,調用方需要根據返回值中的 Err 字段來處理可能發生的錯誤。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 // 初始化返回值 channel
 ch := make(chan Result, 1)
 // 操作加鎖
 g.mu.Lock()
 if g.m == nil {
        // 懶加載
  g.m = make(map[string]*call)
 }
 
 if c, ok := g.m[key]; ok {  // 檢測 key 是否存在
        // 除了第 1 個搶到鎖的 goroutine,其他的 goroutine 都會執行到這裏

        c.dups++        // 複用數量+1
        c.chans = append(c.chans, ch)   // 追加到等待複用結果的 goroutine 列表
        g.mu.Unlock()   // 釋放鎖
  return ch
 }
 
 c := &call{chans: []chan<- Result{ch}}  // 如果沒有對應的 key,創建一個新的列表
 c.wg.Add(1)     // 只有第 1 個搶到鎖的 goroutine 可以調用 Add 方法,其他的 goroutine 進入 Wait 等待
 g.m[key] = c    // 創建一個新的 key
 g.mu.Unlock()   // 創建新 key 後,就可以解鎖了,其他 goroutine 獲得鎖之後會進入上面的 if 流程

 go g.doCall(c, key, fn) // 調用 doCall() 執行 fn 函數

 return ch
}

doCall 方法

doCall 方法負責調用 key 對應的回調方法。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
 normalReturn := false
 recovered := false

 // 使用兩個 defer 區分 panic 和 runtime.Goexit 
 //    避免了回調函數導致的死鎖 (也就是下面的 c.wg.Done() 得不到執行)
 // 詳情見: https://golang.org/cl/134395
 defer func() {
  if !normalReturn && !recovered {
   // 說明回調函數內部執行了 runtime.Goexit
   c.err = errGoexit
  }

  c.wg.Done() // 如果這裏得不到執行,那麼調用方的 c.wg.Wait() 就會永久阻塞
  
  g.mu.Lock()
  defer g.mu.Unlock()
  
  if !c.forgotten { // 如果已經刪除過該 key,就不需要重複刪除了
   delete(g.m, key)
  }

  // 這裏的 panic 都是爲了第一個搶到的鎖的 goroutine,因爲後續的 goroutine 不會執行到 doCall() 方法
  if e, ok := c.err.(*panicError); ok {
   // 爲了防止等待的 channels 被永久阻塞,需要確保這種 panic 無法恢復
   if len(c.chans) > 0 {
    go panic(e)
    select {} // 保留這個 goroutine 的調用堆棧,這樣它就會出現在 crash dump(這種小技巧值得學習)
   } else {
    panic(e)
   }
  } else if c.err == errGoexit {
   // goexit 正在處理,不需要重複調用
  } else {
   // 正常返回, 依次向等待的 goroutine 發送數據
   for _, ch := range c.chans {    // c.chans 是切片
    ch <- Result{c.val, c.err, c.dups > 0}
   }
  }
 }()

 // 使用一個匿名函數來執行回調函數
 func() {
  defer func() {
   // 如果發生 panic,就創建一個 panic error,由調用方處理
   if !normalReturn {
    // 理想情況下,我們應該根據 stack trace 返回的結果, 確定這是一個 panic 還是一個 runtime.Goexit
    // 能夠區分兩者的唯一方法是查看 recover 是否阻止了 goroutine 終止
    //     如果 recover 捕獲到了錯誤,說明是 panic
    //     如果 recover 沒有捕獲到錯誤,說明是 runtime.Goexit
    // 但是當我們知道這一點時,與 panic 相關的 stack trace 信息已經被丟棄了
    if r := recover(); r != nil {
     c.err = newPanicError(r)
    }
   }
  }()

  c.val, c.err = fn()
  
  // 如果代碼執行到這裏,說明調用回調函數沒有發生 panic
  // 所以可以將變量 normalReturn 設置爲 true
  normalReturn = true
 }()
 
 // 如果 normalReturn != true, 說明調用回調函數發生了 panic
 // 同時也說明了,發生的 panic 被捕獲 (recover) 到了,而不是直接被 runtime.Goexit 終止程序
 //    如果直接被 runtime.Goexit 終止程序,代碼就執行不到這裏了,而是會直接去執行 defer
 if !normalReturn {
  recovered = true
 }
}

Forget 方法

Forget 方法用於刪除指定的 key, 後續對該 key 調用 Do 方法將執行回調函數,不再複用之前的返回值結果。

func (g *Group) Forget(key string) {
 g.mu.Lock()
 if c, ok := g.m[key]; ok {
  c.forgotten = true
 }
 delete(g.m, key)
 g.mu.Unlock()
}

小結

從應用層面來說,singleflight 是將 多個請求的返回結果相同 + 計算過程次數限制 這一過程抽象出來,將所有細節隱藏在內部實現中,只提供 GETDELETE 兩種類型 API, 簡化了調用方法。

需要注意的有兩點:一是 GROUP 組的劃分,通常根據業務模塊劃分即可,類似緩存的 key 前綴,二是 key 的內存佔用,key 沒有過期機制, 對應的數據會一直佔用內存,對於熱門數據沒有任何問題,但是對於非熱門數據,會增加內存的佔用,可以根據數據的大致有效期設計 延遲刪除 方案。

從內部實現來說,singleflight 的代碼非常簡潔,尤其是 3 個 Do* 方法之間的緊密配合,可以讓我們學習到很多 channel 使用技巧, 還有 doCall 方法中的 雙重 defer 機制、判斷 panic 和 runtime.Goexit,都是非常實用的 Go code style 代碼。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/WayT3afVbzngdNGyvsBZyQ