賞析 Singleflight 設計
前言
哈嘍,大家好,我是
asong
。今天想與大家分享一下singleflight
這個庫,singleflight
僅僅只有 100 多行卻可以做到防止緩存擊穿,有點厲害哦!所以本文我們就一起來看一看他是怎麼設計的~。注意:本文基於 https://pkg.go.dev/golang.org/x/sync/singleflight 進行分析。
緩存擊穿
什麼是緩存擊穿
平常在高併發系統中,會出現大量的請求同時查詢一個
key
的情況,假如此時這個熱key
剛好失效了,就會導致大量的請求都打到數據庫上面去,這種現象就是緩存擊穿。緩存擊穿和緩存雪崩有點像,但是又有一點不一樣,緩存雪崩是因爲大面積的緩存失效,打崩了 DB,而緩存擊穿則是指一個 key 非常熱點,在不停的扛着高併發,高併發集中對着這一個點進行訪問,如果這個 key 在失效的瞬間,持續的併發到來就會穿破緩存,直接請求到數據庫,就像一個完好無損的桶上鑿開了一個洞,造成某一時刻數據庫請求量過大,壓力劇增!
如何解決
-
方法一
我們簡單粗暴點,直接讓熱點數據永遠不過期,定時任務定期去刷新數據就可以了。不過這樣設置需要區分場景,比如某寶首頁可以這麼做。
-
方法二
爲了避免出現緩存擊穿的情況,我們可以在第一個請求去查詢數據庫的時候對他加一個互斥鎖,其餘的查詢請求都會被阻塞住,直到鎖被釋放,後面的線程進來發現已經有緩存了,就直接走緩存,從而保護數據庫。但是也是由於它會阻塞其他的線程,此時系統吞吐量會下降。需要結合實際的業務去考慮是否要這麼做。
-
方法三
方法三就是 singleflight 的設計思路,也會使用互斥鎖,但是相對於方法二的加鎖粒度會更細,這裏先簡單總結一下 singleflight 的設計原理,後面看源碼在具體分析。
singleflightd 的設計思路就是將一組相同的請求合併成一個請求,使用
map
存儲,只會有一個請求到達 mysql,使用sync.waitgroup
包進行同步,對所有的請求返回相同的結果。
截屏 2021-07-14 下午 8.30.56
源碼賞析
已經迫不及待了,直奔主題吧,下面我們一起來看看singleflight
是怎麼設計的。
數據結構
singleflight
的結構定義如下:
type Group struct {
mu sync.Mutex // 互斥鎖,保證併發安全
m map[string]*call // 存儲相同的請求,key是相同的請求,value保存調用信息。
}
Group
結構還是比較簡單的,只有兩個字段,m
是一個map
,key
是相同請求的標識,value
是用來保存調用信息,這個map
是懶加載,其實就是在使用時纔會初始化;mu
是互斥鎖,用來保證m
的併發安全。m
存儲調用信息也是單獨封裝了一個結構:
type call struct {
wg sync.WaitGroup
// 存儲返回值,在wg done之前只會寫入一次
val interface{}
// 存儲返回的錯誤信息
err error
// 標識別是否調用了Forgot方法
forgotten bool
// 統計相同請求的次數,在wg done之前寫入
dups int
// 使用DoChan方法使用,用channel進行通知
chans []chan<- Result
}
// Dochan方法時使用
type Result struct {
Val interface{} // 存儲返回值
Err error // 存儲返回的錯誤信息
Shared bool // 標示結果是否是共享結果
}
Do 方法
// 入參:key:標識相同請求,fn:要執行的函數
// 返回值:v: 返回結果 err: 執行的函數錯誤信息 shard: 是否是共享結果
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 代碼塊加鎖
g.mu.Lock()
// map進行懶加載
if g.m == nil {
// map初始化
g.m = make(map[string]*call)
}
// 判斷是否有相同請求
if c, ok := g.m[key]; ok {
// 相同請求次數+1
c.dups++
// 解鎖就好了,只需要等待執行結果了,不會有寫入操作了
g.mu.Unlock()
// 已有請求在執行,只需要等待就好了
c.wg.Wait()
// 區分panic錯誤和runtime錯誤
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
// 之前沒有這個請求,則需要new一個指針類型
c := new(call)
// sync.waitgroup的用法,只有一個請求運行,其他請求等待,所以只需要add(1)
c.wg.Add(1)
// m賦值
g.m[key] = c
// 沒有寫入操作了,解鎖即可
g.mu.Unlock()
// 唯一的請求該去執行函數了
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
這裏是唯一有疑問的應該是區分panic
和runtime
錯誤部分吧,這個與下面的docall
方法有關聯,看完docall
你就知道爲什麼了。
docall
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
// 標識是否正常返回
normalReturn := false
// 標識別是否發生panic
recovered := false
defer func() {
// 通過這個來判斷是否是runtime導致直接退出了
if !normalReturn && !recovered {
// 返回runtime錯誤信息
c.err = errGoexit
}
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
// 防止重複刪除key
if !c.forgotten {
delete(g.m, key)
}
// 檢測是否出現了panic錯誤
if e, ok := c.err.(*panicError); ok {
// 如果是調用了dochan方法,爲了channel避免死鎖,這個panic要直接拋出去,不能recover住,要不就隱藏錯誤了
if len(c.chans) > 0 {
go panic(e) // 開一個寫成panic
select {} // 保持住這個goroutine,這樣可以將panic寫入crash dump
} else {
panic(e)
}
} else if c.err == errGoexit {
// runtime錯誤不需要做任何時,已經退出了
} else {
// 正常返回的話直接向channel寫入數據就可以了
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
// 使用匿名函數目的是recover住panic,返回信息給上層
func() {
defer func() {
if !normalReturn {
// 發生了panic,我們recover住,然後把錯誤信息返回給上層
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
// 執行函數
c.val, c.err = fn()
// fn沒有發生panic
normalReturn = true
}()
// 判斷執行函數是否發生panic
if !normalReturn {
recovered = true
}
}
這裏來簡單描述一下爲什麼區分panic
和runtime
錯誤,不區分的情況下如果調用出現了恐慌,但是鎖沒有被釋放,會導致使用相同 key 的所有後續調用都出現了死鎖,具體可以查看這個issue
:https://github.com/golang/go/issues/33519。
Dochan 和 Forget 方法
//異步返回
// 入參數:key:標識相同請求,fn:要執行的函數
// 出參數:<- chan 等待接收結果的channel
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
// 初始化channel
ch := make(chan Result, 1)
g.mu.Lock()
// 懶加載
if g.m == nil {
g.m = make(map[string]*call)
}
// 判斷是否有相同的請求
if c, ok := g.m[key]; ok {
//相同請求數量+1
c.dups++
// 添加等待的chan
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 開一個寫成調用
go g.doCall(c, key, fn)
// 返回這個channel等待接收數據
return ch
}
// 釋放某個 key 下次調用就不會阻塞等待了
func (g *Group) Forget(key string) {
g.mu.Lock()
if c, ok := g.m[key]; ok {
c.forgotten = true
}
delete(g.m, key)
g.mu.Unlock()
}
注意事項
因爲我們在使用singleflight
時需要自己寫執行函數,所以如果我們寫的執行函數一直循環住了,就會導致我們的整個程序處於循環的狀態,積累越來越多的請求,所以在使用時,還是要注意一點的,比如這個例子:
result, err, _ := d.singleGroup.Do(key, func() (interface{}, error) {
for{
// TODO
}
}
不過這個問題一般也不會發生,我們在日常開發中都會使用context
控制超時。
總結
好啦,這篇文章就到這裏啦。因爲最近我在項目中也使用singleflight
這個庫,所以就看了一下源碼實現,真的是厲害,這麼短的代碼就實現了這麼重要的功能,我怎麼就想不到呢。。。。所以說還是要多讀一些源碼庫,真的能學到好多,真是應了那句話:你知道的越多,不知道的就越多!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/JUkxGbx1Ufpup3Hx08tI2w