Go 併發控制:singleflight 詳解
singleflight
是 Go 官方擴展庫 x 中提供的擴展併發原語,能夠將多個併發請求合併爲一個,降低服務端壓力。本文就來介紹下它的用法和實現原理。
請求合併
singleflight
主要用於抑制重複的併發調用,從而避免對同一資源進行重複操作,提升系統性能。
比如,當我們有多個 goroutine 併發調用一個同一個函數時,singleflight
能夠實現只讓一個 goroutine 發起調用,其他 goroutine 則阻塞等待,當發起調用的 goroutine 返回後,singleflight
將結果同時返回給所有 goroutine。這樣我們就減少了大量的併發調用,避免重複操作。
這也是 singleflight
提供的唯一能力——請求合併。
在 Go 後端開發中,我們很容易想到,高併發場景下緩存失效時大量請求落到 DB 的場景,正是 singleflight
的用武之地。
如下圖所示:
左側圖(1)中,當大量請求過來讀取 Redis 緩存時,它們同時發現緩存失效,那麼所有請求都會繼續向下請求 MySQL 讀取數據。
右側圖(2)中,當所有請求都去 MySQL 讀取數據時,我們可以使用 singleflight
合併這些請求,只保留一個請求去調用 MySQL 讀取數據,然後將結果返回給所有請求。
這就是 singleflight
的典型應用場景。
現在,請你思考下 singleflight
和 sync.Once
有什麼區別呢?我會在後文中揭曉答案。
NOTE:
如果你對
sync.Once
不熟悉,可以閱讀我的另一篇文章《Go 併發控制:sync.Once 詳解》。
SingleFlight 使用示例
知道了 singleflight
作用,想必你已經躍躍欲試要動手實踐了。廢話不多說,咱們直接看效果。
singleflight
使用示例代碼如下:
package main
import (
"fmt"
"strconv"
"sync"
"time"
"golang.org/x/sync/singleflight"
)
var (
cache = make(map[string]*User) // 模擬緩存
mu sync.RWMutex // 保護緩存
requestGroup singleflight.Group // SingleFlight 實例
)
type User struct {
Id int64
Name string
Email string
}
// GetUserFromDB 模擬從數據庫獲取數據
func GetUserFromDB(username string) *User {
fmt.Printf("Querying DB for key: %s\n", username)
time.Sleep(1 * time.Second) // 模擬耗時操作
id, _ := strconv.Atoi(username[len(username)-3:])
fakeUser := &User{
Id: int64(id),
Name: username,
Email: username + "@jianghushinian.cn",
}
return fakeUser
}
// GetUser 獲取數據,先從緩存讀取,若沒有命中,則從數據庫查詢
func GetUser(key string) *User {
// 先嚐試從緩存獲取
mu.RLock()
val, ok := cache[key]
mu.RUnlock()
if ok {
return val
}
fmt.Printf("User %s not in cache\n", key)
// 緩存未命中,使用 SingleFlight 防止重複查詢
result, _, _ := requestGroup.Do(key, func() (interface{}, error) {
// 模擬從數據庫獲取數據
val := GetUserFromDB(key)
// 存入緩存
mu.Lock()
cache[key] = val
mu.Unlock()
return val, nil
})
return result.(*User)
}
func main() {
var wg sync.WaitGroup
keys := []string{"user_123", "user_123", "user_456"}
// 第一輪併發查詢,緩存中還沒有數據,使用 SingleFlight 減少 DB 查詢
for _, key := range keys {
wg.Add(1)
go func(k string) {
defer wg.Done()
fmt.Printf("Get user for key: %s -> %+v\n", k, GetUser(k))
}(key)
}
time.Sleep(2 * time.Second)
fmt.Println("===================================")
// 第二輪併發查詢,緩存中有數據,直接讀取緩存,不會查詢 DB
for _, key := range keys {
wg.Add(1)
go func(k string) {
defer wg.Done()
fmt.Printf("Get user for key: %s -> %+v\n", k, GetUser(k))
}(key)
}
wg.Wait()
}
簡單解釋下這個示例程序,我們想要模擬的就是高併發場景下緩存失效時大量請求落到 DB 的場景。
在 main
函數中,首先聲明瞭 sync.WaitGroup
變量來控制併發,keys
表示我們要併發查詢的用戶,這裏以 username
作爲查詢的 key
。接着遍歷這些 keys
並開啓新的 goroutine 來併發的查詢 User
信息。
GetUser
會先嚐試從緩存讀取數據,若沒有命中,再去數據庫中查詢。從數據庫獲取數據需要調用 GetUserFromDB
函數,不過 GetUser
中並沒有直接去調用它,而是使用 singleflight
實例對象 requestGroup.Do
方法來調用。Do
方法接收兩個參數,一個字符串類型的 key
和一個函數 fn
,對於同一個 key
,在併發情況下,只有一個 fn
正在執行。而 requestGroup.Do
返回的 result
就是函數 fn
的第一個返回值。在函數 fn
內部調用了 GetUserFromDB
並將從 DB 查詢到的數據存入緩存 cache
中。
我們在 main
函數中共發起了兩輪併發查詢用戶信息的請求。第一輪時,緩存 cache
爲空,所以請求會落在 DB,第二輪時,緩存 cache
中有數據,所以請求直接讀取緩存,不會查詢 DB。
執行示例代碼,得到如下輸出:
$ go run main.go
User user_456 not in cache
Querying DB for key: user_456
User user_123 not in cache
Querying DB for key: user_123
User user_123 not in cache
Get user for key: user_123 -> &{Id:123 Name:user_123 Email:user_123@jianghushinian.cn}
Get user for key: user_456 -> &{Id:456 Name:user_456 Email:user_456@jianghushinian.cn}
Get user for key: user_123 -> &{Id:123 Name:user_123 Email:user_123@jianghushinian.cn}
===================================
Get user for key: user_123 -> &{Id:123 Name:user_123 Email:user_123@jianghushinian.cn}
Get user for key: user_123 -> &{Id:123 Name:user_123 Email:user_123@jianghushinian.cn}
Get user for key: user_456 -> &{Id:456 Name:user_456 Email:user_456@jianghushinian.cn}
可以發現,第一輪併發請求中,fmt.Printf("User %s not in cache\n", key)
的日誌打印了 3 次,說明緩存確實爲空。fmt.Printf("Querying DB for key: %s\n", username)
日誌打印了 2 次,說明 singleflight
生效了,因爲 3 個併發請求中,有 2 個 key
是一樣的 user_123
,所以 singleflight
合併了請求。
第二輪併發請求發起時,緩存中已經存在數據,所以只會打印 fmt.Printf("Get user for key: %s -> %+v\n", k, GetUser(k))
的日誌信息。
現在你應該對 singleflight
有一個比較直觀的認識了。不過,我在這裏講解的並不夠詳細,如果完全沒接觸過 singleflight
這個概念,可能會有一些疑惑。沒關係,接下來我將對 singleflight
源碼進行講解,相信看過源碼後,你心中的疑惑就都能解開了。畢竟,源碼之下無祕密。
SingleFlight 源碼解析
singleflight
源碼中有兩個核心結構體:
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup // in-flight 併發控制
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{} // 記錄 fn 返回值
err error // 記錄 fn 返回的 error
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int // 記錄從緩存中獲取 fn 返回值的次數
chans []chan<- Result // 提供給 DoChan 方法用於傳遞 fn 的返回值
}
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
其中 Group
代表 singleflight
對象,它有兩個字段,mu
是一個互斥鎖,用於保護 m
的併發訪問。m
是一個 map
,會被延遲初始化,m
的鍵就是調用 singleflight.Do
時傳遞的第一個參數 key
,m
的值是一個 *call
對象。
call
代表一個正在執行(in-flight
)或已完成(completed
)的 fn
函數的調用,也就是說,它會記錄我們在調用 singleflight.Do
時傳遞的第二個參數 fn
的完整生命週期。
Group
對象提供了三個公有方法,簽名如下:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
func (g *Group) Forget(key string)
-
Do
方法我們見過了,它接收一個key
和一個函數fn
,對於同一個key
,在併發情況下,只有一個fn
正在執行,其他請求會阻塞等待。函數fn
無參數,有兩個返回值value
和error
。當fn
執行完成並返回,Do
方法會返回fn
的執行結果value
和error
,即Do
方法返回值的前兩個。而Do
方法的最後一個返回值shared
,則表示返回值v
是否共享給了多給調用方,即在fn
執行時,有其他併發請求過來,不過它們並沒有真正執行,而是等待這個fn
的返回結果。 -
DoChan
方法其實和Do
方法類似,只不過返回值變成了一個channel
。併發情況下對DoChan
的調用不會阻塞等待第一個fn
執行完成,而是直接返回channel
,等fn
執行完成後,會將結果Result
通過這個channel
返回。 -
Forget
告知Group
忘記一個key
,在調用Forget
之後,再次調用Do
/DoChan
方法將不再等待前一個未完成的fn
執行結果,而是當作一個新的請求來處理。
DoChan
方法的返回值中的 Result
類型,其實就是對 Do
方法返回的三個值的封裝,方便在 channel
中傳遞。
Result
類型定義如下:
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
現在我們對 Group
對象提供的三個方法源碼依次進行講解。
singleflight.Do
我們先看 Do
方法的實現:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock() // 加鎖,保證併發安全
if g.m == nil {
g.m = make(map[string]*call) // 延遲初始化 m
}
if c, ok := g.m[key]; ok { // 如果 key 已經在 map 中,即非第一個請求會進入到這個代碼塊
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call) // 當前 key 對應的第一個請求會創建一個 call 對象
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn) // 真正去執行 fn 的方法
return c.val, c.err, c.dups > 0
}
Do
方法內部首先會進行加鎖操作,保證所有對 m
的操作併發安全。
Group
對象的 m
屬性延遲到調用 Do
方法時才被初始化,所以 Group
對象其實無需實例化即可直接使用。
如果 key
不在 m
中,說明是這個 key
的第一個請求,會爲其創建一個 call
對象,並保存到 m
中。然後就交給 Group.doCall
來處理 fn
的調用了。並且 call
對象使用了 sync.WaitGroup
來控制併發調用。
如果 key
在 m
中,則說明不是這個 key
的第一個請求,那麼就可以調用 c.wg.Wait()
等待第一個請求完成,然後直接從 call
對象的 val
和 err
屬性中拿到 fn
的返回值。在這裏並沒執行當前請求的 fn
,call
對象上的結果是當前 key
的第一個請求返回的,所以就實現了類似 “緩存” 的效果,有效合併了多次請求調用。
此外,在這裏有兩處錯誤類型判斷,c.err.(*panicError)
和 c.err == errGoexit
。
其中 panicError
定義如下:
type panicError struct {
value interface{} // 記錄 fn 函數的 panic 信息
stack []byte // 記錄發生 panic 時的異常堆棧信息
}
// Error implements error interface.
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func (p *panicError) Unwrap() error {
err, ok := p.value.(error)
if !ok {
return nil
}
return err
}
當同一個 key
的第一個請求函數 fn
調用發生了 panic
,就會在 c.err
中保存一個 *panicError
對象,那麼後續的併發請求過來,也要重新觸發 panic
。
另一個錯誤 errGoexit
定義如下:
var errGoexit = errors.New("runtime.Goexit was called")
這是一個典型的 Sentinel error
,用於標記在用戶提供的 fn
函數內部調用了 runtime.Goexit()
來退出 goroutine,後續的併發請求過來,也要重新調用 runtime.Goexit()
。
NOTE:
runtime.Goexit
用於終止當前 goroutine(其他正在運行的協程不受影響,程序繼續正常運行),不會繼續執行後續代碼。並且在退出前會執行當前 goroutine 的所有defer
語句,確保資源被正確釋放。此外runtime.Goexit()
不會引發panic
,因此無法通過recover
捕獲。
那麼現在 Do
方法的工作流程就清晰了:
- 請求
Do(key, fn) (v, err, shared)
被調用
-
如果
key
不存在:創建一個新的call
,執行用戶函數fn
。 -
如果
key
已存在:等待現有操作fn
調用完成,複用其結果。
fn
函數完成後
-
直接返回
fn
的執行結果。 -
或者喚醒等待的重複請求,返回
fn
的執行結果。
singleflight.DoChan
接下來再看 DoChan
方法的實現:
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1) // 構造一個 channel 用於傳遞 fn 的執行結果
g.mu.Lock() // 加鎖,保證併發安全
if g.m == nil {
g.m = make(map[string]*call) // 延遲初始化 m
}
if c, ok := g.m[key]; ok { // 如果 key 已經在 map 中
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}} // 創建一個 call 對象,並初始化 chans 字段
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn) // 開啓新的 goroutine 來執行 fn
return ch // 返回 channel 對象
}
可以發現,DoChan
方法的內部邏輯與 Do
方法類似,只不過它不會阻塞等待第一個請求執行完成,而是啓動新的 goroutine 調用 doCall
來執行 fn
,並返回一個 channel
對象。
那麼也就是說,Do
方法和 DoChan
方法的核心邏輯其實都是在 doCall
方法中了。
singleflight.doCall
doCall
方法的實現:
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false // fn 是否正常返回
recovered := false // fn 是否產生 panic
// 使用 double-defer 來區分 panic 或 runtime.Goexit
defer func() {
// 如果條件成立,則說明給定的函數 fn 內部調用了 runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done() // 通知阻塞等待的其他請求可以獲取 fn 執行結果了
if g.m[key] == c { // fn 執行完成,從 m 中刪除 key 記錄
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
go panic(e) // 爲了防止等待 channel 的 goroutine 被永久阻塞,需要確保這個 panic 無法被 recover
select {} // 保持當前 goroutine 不退出
} else {
panic(e)
}
} else if c.err == errGoexit {
// 當前 goroutine 正在執行 runtime.Goexit 退出流程,這裏無需特殊處理
} else {
// 進入此代碼塊,說明 fn 正常返回
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil { // 進入此代碼塊,說明 fn 觸發了 panic
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
這個方法有點長,不過整體脈絡是清晰的,我們拆成幾個小的邏輯代碼段來分析它。
函數在最開始處初始化兩個變量:
normalReturn := false
recovered := false
normalReturn
如果爲 true
,則說明 fn
正常返回。
recovered
如果爲 true
,則說明 fn
執行期間發生了 panic
。
然後是一大段延遲執行的 defer
語句,我們先跳過它,直接來看下面的匿名立即執行函數邏輯:
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
這裏之所以使用一個立即執行函數,是爲了執行 defer
語句。函數內主要邏輯就是調用 fn
函數,並將其結果保存到 *call
對象 c.val
和 c.err
兩個屬性中。
fn
執行成功,則標記 normalReturn
爲 true
,表明 fn
正常返回,執行期間沒有發生 panic
或調用 runtime.Goexit()
。
如果 fn
內發生 panic
,則會被 defer
中的 recover
捕獲到,並使用 panic
信息創建一個 *panicError
對象保存到 c.err
屬性中。
newPanicError
函數實現如下:
func newPanicError(v interface{}) error {
stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
這裏代碼很簡單,使用 debug.Stack()
獲取當前 goroutine 的調用棧信息,然後截掉第一行 goroutine N [status]:
格式的堆棧內容,再構造一個 *panicError
對象並返回。
NOTE:
debug.Stack
是對runtime.Stack
的一個高層次的封裝,直接返回當前 goroutine 的調用棧信息。
回憶下在 Do
函數中有一個錯誤類型斷言 c.err.(*panicError)
,錯誤信息就是在這裏通過調用 newPanicError
創建並賦值給 c.err
的。
匿名函數執行完成後,代碼邏輯走到這裏:
if !normalReturn {
recovered = true
}
如果此時 normalReturn
爲 false
,則執行 fn
時必然出現了 panic
,所以記錄 recovered
值爲 true
。
這裏之所以能這樣斷定 fn
中出現 panic
,是因爲這段邏輯與匿名的立即執行函數在同一個 goroutine 中,如果 c.val, c.err = fn()
這行執行成功,內部肯定沒有發生 panic
或調用 runtime.Goexit()
,那麼 normalReturn = true
也必然會執行成功。而如果 normalReturn
爲 false
,則有可能發生 panic
或調用 runtime.Goexit()
。但是如果調用 runtime.Goexit()
,那麼當前 goroutine 會立即終止,所以代碼根本就不會執行到此處。既然代碼能夠執行到此處,且 normalReturn
爲 false
,就只剩一種可能,fn
中發生了 panic
。
doCall
方法最後一行代碼已經執行完成,接下來就要執行到頂部的 defer
函數中了:
// 使用 double-defer 來區分 panic 或 runtime.Goexit
defer func() {
// 如果條件成立,則說明給定的函數 fn 內部調用了 runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done() // 通知阻塞等待的其他請求可以獲取 fn 執行結果了
if g.m[key] == c { // fn 執行完成,從 m 中刪除 key 記錄
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
go panic(e) // 爲了防止等待 channel 的 goroutine 被永久阻塞,需要確保這個 panic 無法被 recover
select {} // 保持當前 goroutine 不退出
} else {
panic(e)
}
} else if c.err == errGoexit {
// 當前 goroutine 正在執行 runtime.Goexit 退出流程,這裏無需特殊處理
} else {
// 進入此代碼塊,說明 fn 正常返回
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
在 defer
函數中首先對 fn
函數的執行結果進行了判斷,如果沒有正常退出,且未發生 panic
,則說明一定是調用了 runtime.Goexit()
。
所以,這也是爲什麼 doCall
方法中共計使用了兩個 defer
語句,就是爲了對 fn
的三種可能執行結果進行判別。
c.wg.Done()
通知阻塞等待的其他請求可以獲取 fn
函數的執行結果了。
當 fn
執行完成,立即從 Group.m
中刪除 fn
函數所對應的 key
。所以,singleflight
只保證併發情況下,合併多個請求。如果這一輪併發結束,下次相同 key
發來的請求,fn
函數會依然會執行。所以看到此處,我想你應該能 Get 到 singleflight
與 sync.Once
的不同之處了。
接下來的邏輯就有點意思了,如果 c.err
中記錄的 error
是 *panicError
類型,則說明 fn
函數發生了 panic
。那麼此時需要重新觸發 panic
,讓調用方感知到。這又分兩種情況,如果 len(c.chans) > 0
成立,則說明用戶調用了 DoChan
方法,此時爲了防止調用方用來等待 channel
的 goroutine 被永久阻塞,需要確保這個 panic
不能被 recover
,所以啓動了一個新的 goroutine 來執行 panic(e)
,select {}
則是用來保持當前 goroutine 不被退出。另一種情況則是用戶調用了 Do
方法,那麼直接執行 panic(e)
即可。
NOTE:
recover
只能捕獲當前 goroutine 中的panic
,我在另一篇文章《Go 錯誤處理指北:Defer、Panic、Recover 三劍客》中進行了詳細講解。
如果 c.err == errGoexit
成立,則說明 fn
函數內容調用了 runtime.Goexit()
,那麼無需特殊處理,當前 goroutine 會繼續執行退出操作。
最終代碼進入 else
邏輯,說明 fn 正常返回,如果用戶調用了 DoChan
方法,則 c.chans
有值,將 fn
執行結果包裝成 Result
並通過 channel
通知給所有等待者。
至此,singleflight
最核心的方法 doCall
就執行完成了。
我們來梳理下 doCall
方法的工作流程:
-
調用
fn
函數,執行fn
的邏輯包裹在嵌套的匿名函數中,並處理可能產生的panic
或runtime.Goexit
。 -
處理返回結果,在
defer
方法中,區分了fn
函數的正常返回、panic
和runtime.Goexit
三種可能執行結果,並設置對應的狀態和錯誤信息。 -
分發
fn
函數的執行結果或錯誤信息,如果用戶調用了Do
方法,可以從*call
對象的c.val
和c.err
兩個屬性中拿到結果,如果用戶調用了DoChan
方法,最終會將結果廣播到所有等待的channel
。
doCall
方法代碼量不大,不過其中中有兩處關鍵點值得注意:
- 雙層
defer
設計(double-defer
)
-
第一層
defer
用於捕獲panic
。 -
第二層
defer
則用於處理runtime.Goexit
和資源釋放。
- 對於
panic
的處理
-
*panicError
中包含了錯誤值和堆棧信息,便於調試。 -
通過 goroutine 執行
panic(e)
保證不會阻塞等待channel
的調用者。
singleflight.Forget
現在還剩下最後方法沒有分析了,Forget
方法源碼如下:
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
一目瞭然,Forget
方法用於調用方主動告知 Group
忘記一個 key
。
Forget
方法適用場景如下:
-
長時間未完成的調用,比如某個函數執行時間過長,但業務上已經不再需要結果,此時可以通過
Forget
主動移除key
。 -
錯誤請求的清理,如果某次調用由於邏輯錯誤進入了無效狀態,直接
Forget
該調用,可以避免fn
執行結果後續被誤用。 -
重試機制,在某些場景下,你希望對同一個
key
發起新的調用,而不是複用之前的結果。
不過,還是建議慎使用 Forget
,有需要時再使用。因爲如果調用時間較短且結果重要,頻繁使用 Forget
可能導致資源浪費,singleflight
也就失去了意義。
SingleFlight 適用場景
現在我們對 singleflight
的源碼進行了解析,那麼 singleflight
的適用場景也就清晰了。
singleflight
典型使用場景如下:
- 緩存擊穿
-
問題: 緩存中的某個熱點鍵過期,導致大量請求同時訪問後端數據庫,增加系統壓力。
-
解決: 使用
singleflight
確保在緩存重建過程中,只有一個請求會訪問數據庫,其他請求等待結果返回。
- 遠程服務調用
-
問題: 多個併發請求訪問同一個遠程服務時,可能造成不必要的重複調用,浪費帶寬和計算資源。
-
解決: 使用
singleflight
使相同的請求合併爲一次。
- 定時任務去重
-
問題: 在分佈式系統中,多個節點可能同時執行定時任務,導致重複任務執行。
-
解決: 使用
singleflight
確保只有一個節點執行任務,其他節點共享結果。
- 消息去重
-
問題: 消息隊列中可能存在重複消息的消費問題。
-
解決: 在消費端使用
singleflight
,確保對相同消息的處理只執行一次。
- 分佈式鎖優化
-
問題: 多個節點同時搶鎖時,可能會發起大量重複的加鎖嘗試。
-
解決: 使用
singleflight
降低對分佈式鎖的訪問壓力,只允許一個請求實際去嘗試加鎖。
SingleFlight
的核心作用是抑制重複的併發調用,在併發場景中,多次相同請求(由同一個 key
標識)過來時,讓它們共享第一個調用的結果,而不是重複執行。這在讀操作中尤其常見,而對於寫操作,合併的需求和行爲需要更慎重的對待。
關於 SingleFlight
你認爲還有那些使用場景可以分享出來,大家一起探討學習。
總結
singleflight
主要用於抑制重複的併發調用,從而避免對同一資源進行重複操作,提升系統性能。所以 singleflight
適用於可以合併請求的操作。
singleflight
提供了三個公有方法 Do
、DoChan
和 Forget
,Do
和 DoChan
兩個方法作用相同都用來合併請求,二者的核心邏輯在 doCall
方法中。Forget
方法則用於調用方主動告知 Group
忘記一個 key
。
singleflight
典型使用場景有緩存擊穿、遠程服務調用、任務去重、消息去重、分佈式鎖優化等。
我在前文中留過一個思考題,singleflight
和 sync.Once
有什麼區別,現在你有答案了嗎?
singleflight
只用在併發場景下,同時有多個重複的請求,才能夠合併請求。而當請求結束,就會執行 delete(g.m, key)
刪除 key
,下一次請求過來 fn
依然被重新執行。
sync.Once
則始終保證函數 f
只被調用一次。
二者雖然看起來功能類似,但它們的實現原理和適用場景各不相同。
此外,其實在 Go 源碼中的 internal
包下,也有一個 SingleFlight
的實現,與擴展庫 x 中的實現思路相同,代碼更加簡單,感興趣的讀者可以跳轉過去查看其源碼實現。
本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。
希望此文能對你有所啓發。
延伸閱讀
-
Go Wiki: X-Repositories:https://go.dev/wiki/X-Repositories
-
singleflight Documentation:https://pkg.go.dev/golang.org/x/sync@v0.9.0/singleflight
-
singleflight GitHub 源碼:https://github.com/golang/sync/tree/v0.9.0/singleflight
-
Go 源碼中 internal 包下的 singleflight:https://github.com/golang/go/tree/go1.23.0/src/internal/singleflight
-
Go 併發控制:sync.Once 詳解:https://jianghushinian.cn/2024/11/11/sync-once/
-
Go 錯誤處理指北:Defer、Panic、Recover 三劍客:https://jianghushinian.cn/2024/10/13/go-error-guidelines-defer-panic-recover/
-
本文 GitHub 示例代碼:https://github.com/jianghushinian/blog-go-example/tree/main/x/sync/singleflight
聯繫我
-
公衆號:Go 編程世界
-
微信:jianghushinian
-
郵箱:jianghushinian007@outlook.com
-
博客:https://jianghushinian.cn
-
GitHub:https://github.com/jianghushinian
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/3BJObWvlzsetMHb7cSR8jg