Go singlefight 源碼詳解
寫在前面
通俗的來說就是 singleflight 將相同的併發請求合併成一個請求,進而減少對下層服務的壓力,通常用於解決緩存擊穿的問題。
詳解
基礎結構
golang.org/x/sync/singleflight
singleflight 結構體:
type call struct {
wg sync.WaitGroup
// 這些字段在 WaitGroup 結束前寫入一次
// 只有在 WaitGroup 結束後纔會被讀取。
val interface{}
err error
// 這些字段在 WaitGroup 結束前使用 singleflight 互斥鎖進行讀寫
// 在 WaitGroup 結束後讀取但不寫入。
dups int
chans []chan<- Result
}
Group 代表分成多個工作組,形成一個命名空間,在這個命名空間中,各工作單元可以重複執行。
type Group struct {
mu sync.Mutex // 互斥鎖
m map[string]*call // 懶加載
}
Result 保存 Do 方法的結果,以便在通道上傳遞。做異步處理。
type Result struct {
Val interface{}
Err error
Shared bool
}
簡單 demo
func TestSingleFightExample(t *testing.T) {
var group singleflight.Group
// 模擬一個併發請求
for i := 0; i < 5; i++ {
go func(i int) {
key := "example"
tmp := i // 將tmp放進去
val, err, _ := group.Do(key, func() (interface{}, error) {
// 模擬一次耗時操作
time.Sleep(time.Second)
return fmt.Sprintf("result_%d", tmp), nil
})
if err != nil {
fmt.Println("Error:", err)
}
fmt.Println("Value:", val)
}(i)
}
// 等待所有請求完成
time.Sleep(3 * time.Second)
}
結果:這是一個很隨機的過程,0~4 都有可能,主要看哪個協程最先進來。
Value: result_2
Value: result_2
Value: result_2
Value: result_2
Value: result_2
Do 執行函數:對同一個 key 多次調用的時候,在第一次調用沒有執行完的時候, 只會執行一次 fn,其他的調用會阻塞住等待這次調用返回, shared 表示fn的結果是否被共享
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
DoChan 和 Do 類似,只是 DoChan 返回一個 channel,也就是同步與異步的區別
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
Forget:用於通知 Group 刪除某個 key 這樣後面繼續這個 key 的調用的時候就不會在阻塞等待了
func (g *Group) Forget(key string){
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
singleflight 的本質是對某次函數調用的複用,只執行1次
,並將執行期間相同的函數返回相同的結果。由此產生一個問題,如果實際執行的函數出了問題,比如超時,則在此期間的所有調用都會超時,由此需要一些額外的方法來控制。
在一些對可用性要求極高的場景下,往往需要一定的請求飽和度來保證業務的最終成功率。一次請求還是多次請求,對於下游服務而言並沒有太大區別,此時使用 singleflight 只是爲了降低請求的數量級,那麼使用 Forget()
提高下游請求的併發。
常見面試題
singleflight 是什麼?什麼時候用的?
緩存失效,合併請求的時候用的,這樣我們就可以減少對 DB 的請求壓力。
如果這個 goruntine 超時怎麼辦?
singleflight 內部使用 waitGroup 來讓同一個 key 的除了第一個請求的後續所有請求都阻塞。直到第一個請求執行 func 返回後,其他請求才會返回。 這意味着,如果 func 執行需要很長時間,那麼後面的所有請求都會被一直阻塞。 這時候我們可以使用 DoChan 結合ctx + select
做超時控制
func TestSingleFightTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
go doFly(ctx)
time.Sleep(2 * time.Second)
cancel() // 2秒後超時
}
func doFly(ctx context.Context) {
var g singleflight.Group
key := "example"
// 使用 DoChan 結合 select 做超時控制
result := g.DoChan(key, func() (interface{}, error) {
time.Sleep(5 * time.Second) // 模擬超時
return "result", nil
})
select {
case r := <-result:
fmt.Println("r", r.Val)
case <-ctx.Done():
fmt.Println("done")
return
}
}
結果輸出:
done
上述代碼中,我們將主進程先sleep 2
秒,然後再進行cancel
,那麼此時我們將會讓DoChan
這個方法 time.Sleep 5 秒模擬超時。那麼我們會發現函數過了 2 秒之後就會輸出done
。
doChan 方法具體是怎麼實現的?
在 DoChan 方法中,有一個 go g.doCall(c, key, fn)
的操作,當一個 goroutine 來執行,並通過channel
來返回數據,這樣外部可以自定義超時邏輯,防止因爲 fn 的阻塞,導致大量請求都被阻塞。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 如果沒有這個key
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}} // 構造異步返回結構體,可以接參數進行超時
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn) // 異步執行
return ch
}
如果請求失敗了怎麼辦?
如果第一個請求失敗了,那麼後續所有等待的請求都會返回同一個 error。但實際上可以根據下游能支撐的 rps 定時 forget 這個 key
,讓更多的請求能有機會走到後續邏輯。
go func() {
time.Sleep(100 * time.Millisecond)
g.Forget(key)
}()
比如 1 秒內有 100 個請求過來,正常是第一個請求能執行 queryDB,後續 99 個都會阻塞。增加這個 Forget 之後,每 100ms 就能有一個請求執行 queryDB,相當於是多了幾次嘗試的機會,相對的也給 DB 造成了更大的壓力,需要根據具體場景進去取捨。 因爲有可能前幾次是因爲 DB 的抖動導致的查詢失敗,重試之後就能實現了。
參考鏈接
[1] https://pkg.go.dev/golang.org/x/sync/singleflight
[2] https://www.lixueduan.com/posts/go/singleflight
[3] https://juejin.cn/post/7093859835694809125
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/_DvY8P0wmq4J9KGqFmnNKg