Go 的三種擴展原語之 — SingleFlight
概述
singleflight.Group
是 Go 語言擴展包的另一種同步原語,它能夠在一個服務中抑制對下游的多次重複請求。一個比較常見的使用場景是,我們使用 Redis 對數據庫中的數據進行緩存,發生緩存擊穿時,大量請求會打到數據庫上進而影響服務的尾延時。
而 singleflight.Group
能夠有效地解決這個問題,它能夠限制對同一個鍵值對的多次重複請求,減少對下游的瞬時流量。
在資源的獲取非常昂貴時(例如訪問緩存、數據庫),就很適合使用 singleflight.Group
優化服務。它的使用方法如下:
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
因爲請求的哈希在業務上一般表示相同的請求,所以上述代碼使用它作爲請求的鍵。當然,我們也可以選擇其他的字段作爲 singleflight.Group.Do
方法的第一個參數減少重複的請求。
結構體
singleflight.Group
結構體由一個互斥鎖 sync.Mutex
和一個映射表組成,每一個 singleflight.call
結構體都保存了當前調用對應的信息:
type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
chans []chan<- Result
}
singleflight.call
結構體中的 val
和 err
字段都只會在執行傳入的函數時賦值一次並在 sync.WaitGroup.Wait
返回時被讀取。
dups
和 chans
兩個字段分別存儲了抑制的請求數量以及用於同步結果的 Channel。
接口
singleflight.Group
求的方法:
這兩個方法在功能上沒有太多的區別,只是在接口的表現上稍有不同。
每次調用 singleflight.Group.Do
方法時都會獲取互斥鎖,隨後判斷是否已經存在鍵對應的 singleflight.call
:
-
當不存在對應的
singleflight.call
時: -
初始化一個新的
singleflight.call
指針 -
增加
sync.WaitGroup
持有的計數器 -
將
singleflight.call
指針添加到映射表 -
釋放持有的互斥鎖
-
阻塞地調用
singleflight.Group.doCall
方法等待結果的返回 -
當存在對應的
singleflight.call
時: -
增加
dups
計數器,它表示當前重複的調用次數 -
釋放持有的互斥鎖
-
通過
sync.WaitGroup.Wait
等待請求的返回
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
因爲 val
和 err
兩個字段都只會在 singleflight.Group.doCall
方法中賦值,所以當 singleflight.Group.doCall
和 sync.WaitGroup.Wait
返回時,函數調用的結果和錯誤都會返回給 singleflight.Group.Do
的調用方。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
-
行傳入的函數
fn
,該函數的返回值會賦值給c.val
和c.err
-
調用
sync.WaitGroup.Done
方法通知所有等待結果的Goroutine
— 當前函數已經執行完成,可以從call
結構體中取出返回值並返回了 -
獲取持有的互斥鎖並通過管道將信息同步給使用
singleflight.Group.DoChan
方法的Goroutine
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
singleflight.Group.Do
和 singleflight.Group.DoChan
分別提供了同步和異步的調用方式,這讓我們使用起來也更加靈活。
小結
當我們需要減少對下游的相同請求時,可以使用 singleflight.Group
來增加吞吐量和服務質量,不過在使用的過程中我們也需要注意以下的幾個問題:
-
singleflight.Group.Do
和singleflight.Group.DoChan
一個用於同步阻塞調用傳入的函數,一個用於異步調用傳入的參數並通過 Channel 接收函數的返回值 -
singleflight.Group.Forget
可以通知singleflight.Group
在持有的映射表中刪除某個鍵,接下來對該鍵的調用就不會等待前面的函數返回了 -
一旦調用的函數返回了錯誤,所有在等待的
Goroutine
也都會接收到同樣的錯誤
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Kc5na-C1yb9lWjj2lXA-dw