singleflight 設計與實現
概述
singleflight
提供了一套函數重複調用時的抑制機制,經常用來限制併發訪問下的重複請求。
例如一個比較常見的場景是使用 singleflight
來限制同一個緩存 key
的重複請求,避免發生 緩存擊穿
時,避免請求全部落到數據庫,減少性能影響和宕機風險。
singleflight 示意圖
示例
參考 擴展閱讀 列表的文章。
內部實現
我們來探究一下 golang.org/x/sync/singleflight
的內部實現,筆者的 Go 版本爲 go1.19 linux/amd64
。注意:這個包屬於擴展包,標準庫的包是 internal/singleflight/singleflight.go
,因爲筆者項目中用到的是擴展包,所以這裏以擴展包實現爲準, 感興趣的讀者可以研究一下標準庫中的包,兩者大同小異。
UML
singleflight UML
errGoexit
errGoexit
錯誤表明用戶函數執行過程中,調用了 runtime.Goexit()
, runtime.Goexit() 會直接終止程序,並在終止前執行所有 defer
.
var errGoexit = errors.New("runtime.Goexit was called")
panicError
panicError
對象實現了標準庫內置的錯誤接口,表示在執行給定函數期間,調用 stack trace
從拋出的 panic
中捕獲 (recover) 到了具體的值。
type panicError struct {
value interface{}
stack []byte
}
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
newPanicError 方法
func newPanicError(v interface{}) error {
stack := debug.Stack()
// stack trace 返回的結果中,第一行的格式爲 "goroutine N [status]:"
// 但是發生 panic 時,對應的 goroutine 可能已經不存在了,並且狀態已經發生改變 (這時需要刪除第一行,避免語義誤導性)
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
call 對象
call
對象表示正在執行中或已經執行完的 Do 方法的調用信息,這是一個抽象的數據集合,字段涵蓋了調用週期內需要用到的數據。
type call struct {
wg sync.WaitGroup
// 這倆字段在調用 WaitGroup 之前寫入一次
// 並且只能在 WaitGroup 完成後讀取
val interface{}
err error
// forgotten 表明在 call 執行過程中,調用方 (併發的 goroutine) 是否調用了 Forget 方法
// 簡單來說就是,同一個 key 在獲取過程中併發調用了 Forget 方法
forgotten bool
// 返回值被其他 goroutine 複用的次數
dups int
// 等待複用結果的 goroutine 列表
// goroutine 通過 Result channel 等待獲取數據
chans []chan<- Result
}
Group 對象
Group
表示一種類型的工作並形成一個命名空間,在該命名空間中可以抑制重複執行的工作單元,簡單來說,就是一個邏輯分組,通常根據業務場景來劃分,比如 CMS 系統中的首頁、專欄、內容頁都可以作爲單獨的分組。
type Group struct {
// 併發調用時保護 m 字段的互斥鎖
mu sync.Mutex
// key 和回調方法的關係映射
// 採用懶加載初始化
m map[string]*call
}
Result 對象
Result
對象表示 Do 方法返回的結果,可以在 channel
中傳遞 (一般是一個等待獲取數據的 goroutine)。
type Result struct {
// 返回值支支持各種數據類型
Val interface{}
// 返回錯誤值
Err error
// 返回值是否被其他 goroutine 複用
Shared bool
}
Do 方法
Do
方法執行給定的回調函數並返回結果,內部確保一個給定的 key
在同一時間 (多個 goroutine
調用時) 只執行一次,如果出現重複調用,則重複調用方等待原始(第一個)調用完成後,接收並複用相同的結果。第二個返回值 shared
表示是否將返回值 v 賦值給了多個調用者 (返回值是否被複用)。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 操作加鎖
g.mu.Lock()
if g.m == nil {
// 懶加載
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 檢測 key 是否存在
// 除了第 1 個搶到鎖的 goroutine,其他的 goroutine 都會執行到這裏
c.dups++ // 複用數量+1
g.mu.Unlock() // 釋放鎖
c.wg.Wait() // 等待 goroutine 執行完(就是第 1 個搶到鎖的 goroutine)
if e, ok := c.err.(*panicError); ok { // 回調函數拋出了 panic error
panic(e)
} else if c.err == errGoexit { // 回調函數調用了 runtime.Goexit
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call) // 如果沒有對應的 key,創建一個新的結果集
c.wg.Add(1) // 只有第 1 個搶到鎖的 goroutine 可以調用 Add 方法,其他的 goroutine 進入 Wait 等待
g.m[key] = c // 創建一個新的 key
g.mu.Unlock() // 創建新 key 後,就可以解鎖了,其他 goroutine 獲得鎖之後會進入上面的 if 流程
g.doCall(c, key, fn) // 調用 doCall() 執行 fn 函數
return c.val, c.err, c.dups > 0
}
DoChan 方法
DoChan
方法和 Do
方法功能一樣 (同步和異步的區別),但是返回一個只讀 channel
, 當 channel 準備好時,就開始接收數據,返回的 channel 不能關閉。此外,調用方需要根據返回值中的 Err
字段來處理可能發生的錯誤。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
// 初始化返回值 channel
ch := make(chan Result, 1)
// 操作加鎖
g.mu.Lock()
if g.m == nil {
// 懶加載
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 檢測 key 是否存在
// 除了第 1 個搶到鎖的 goroutine,其他的 goroutine 都會執行到這裏
c.dups++ // 複用數量+1
c.chans = append(c.chans, ch) // 追加到等待複用結果的 goroutine 列表
g.mu.Unlock() // 釋放鎖
return ch
}
c := &call{chans: []chan<- Result{ch}} // 如果沒有對應的 key,創建一個新的列表
c.wg.Add(1) // 只有第 1 個搶到鎖的 goroutine 可以調用 Add 方法,其他的 goroutine 進入 Wait 等待
g.m[key] = c // 創建一個新的 key
g.mu.Unlock() // 創建新 key 後,就可以解鎖了,其他 goroutine 獲得鎖之後會進入上面的 if 流程
go g.doCall(c, key, fn) // 調用 doCall() 執行 fn 函數
return ch
}
doCall 方法
doCall
方法負責調用 key
對應的回調方法。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// 使用兩個 defer 區分 panic 和 runtime.Goexit
// 避免了回調函數導致的死鎖 (也就是下面的 c.wg.Done() 得不到執行)
// 詳情見: https://golang.org/cl/134395
defer func() {
if !normalReturn && !recovered {
// 說明回調函數內部執行了 runtime.Goexit
c.err = errGoexit
}
c.wg.Done() // 如果這裏得不到執行,那麼調用方的 c.wg.Wait() 就會永久阻塞
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten { // 如果已經刪除過該 key,就不需要重複刪除了
delete(g.m, key)
}
// 這裏的 panic 都是爲了第一個搶到的鎖的 goroutine,因爲後續的 goroutine 不會執行到 doCall() 方法
if e, ok := c.err.(*panicError); ok {
// 爲了防止等待的 channels 被永久阻塞,需要確保這種 panic 無法恢復
if len(c.chans) > 0 {
go panic(e)
select {} // 保留這個 goroutine 的調用堆棧,這樣它就會出現在 crash dump(這種小技巧值得學習)
} else {
panic(e)
}
} else if c.err == errGoexit {
// goexit 正在處理,不需要重複調用
} else {
// 正常返回, 依次向等待的 goroutine 發送數據
for _, ch := range c.chans { // c.chans 是切片
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
// 使用一個匿名函數來執行回調函數
func() {
defer func() {
// 如果發生 panic,就創建一個 panic error,由調用方處理
if !normalReturn {
// 理想情況下,我們應該根據 stack trace 返回的結果, 確定這是一個 panic 還是一個 runtime.Goexit
// 能夠區分兩者的唯一方法是查看 recover 是否阻止了 goroutine 終止
// 如果 recover 捕獲到了錯誤,說明是 panic
// 如果 recover 沒有捕獲到錯誤,說明是 runtime.Goexit
// 但是當我們知道這一點時,與 panic 相關的 stack trace 信息已經被丟棄了
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
// 如果代碼執行到這裏,說明調用回調函數沒有發生 panic
// 所以可以將變量 normalReturn 設置爲 true
normalReturn = true
}()
// 如果 normalReturn != true, 說明調用回調函數發生了 panic
// 同時也說明了,發生的 panic 被捕獲 (recover) 到了,而不是直接被 runtime.Goexit 終止程序
// 如果直接被 runtime.Goexit 終止程序,代碼就執行不到這裏了,而是會直接去執行 defer
if !normalReturn {
recovered = true
}
}
Forget 方法
Forget
方法用於刪除指定的 key
, 後續對該 key 調用 Do 方法將執行回調函數,不再複用之前的返回值結果。
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
小結
從應用層面來說,singleflight
是將 多個請求的返回結果相同 + 計算過程次數限制 這一過程抽象出來,將所有細節隱藏在內部實現中,只提供 GET
和 DELETE
兩種類型 API, 簡化了調用方法。
需要注意的有兩點:一是 GROUP
組的劃分,通常根據業務模塊劃分即可,類似緩存的 key
前綴,二是 key
的內存佔用,key
沒有過期機制, 對應的數據會一直佔用內存,對於熱門數據沒有任何問題,但是對於非熱門數據,會增加內存的佔用,可以根據數據的大致有效期設計 延遲刪除
方案。
從內部實現來說,singleflight
的代碼非常簡潔,尤其是 3 個 Do*
方法之間的緊密配合,可以讓我們學習到很多 channel
使用技巧, 還有 doCall
方法中的 雙重 defer 機制、判斷 panic 和 runtime.Goexit,都是非常實用的 Go code style
代碼。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/WayT3afVbzngdNGyvsBZyQ