Go 併發編程 —— I-O 聚合優化(動畫講解)
背景提要
在存儲系統中,在確保功能不受損的前提下,儘量的減少讀寫 I/O 的次數是優化的一個重要方向,也就是聚合 I/O 的場景。讀寫操作雖然都有聚合 I/O 的需求,但各自的重點和實現方法卻有所不同。接下來,我們將分別探討讀和寫請求的聚合優化方法。
讀請求的聚合
以讀操作中,緩存優化是一種常見的優化手段。具體做法是將讀取的數據存儲在內存中,並通過一個唯一的 Key 來索引這些數據。當讀請求來到時,如果該 Key 在緩存中沒有命中,那麼就需要從後端存儲獲取。用戶請求直接穿透到後端存儲,如果併發很大,這可能是一個很大的風險。
例如,對於 Key:“test”,如果緩存中沒有相應的數據,並且突然出現大量併發讀取請求,每個請求都會發現緩存未命中。如果這些請求全部直接訪問後端存儲,可能會給後端存儲帶來巨大壓力。
爲了應對這種情況,我們其實可以只允許一個讀請求去後端讀取數據,而其他併發請求則等待這個請求的結果。這就是讀請求聚合的基本原理。
在 Go 語言中,可以使用 singleflight 這類第三方庫完成上述需求。singleflight 的設計理念是 “單一請求執行”,即針對同一個 Key,在多個併發請求中只允許一個請求訪問後端。
01 - 讀請求聚合的使用姿勢
下面是一個使用 singleflight 的示例,展現瞭如何通過傳入特定的 Key 和閉包函數來聚合併發請求。
package main
import (
// ...
"golang.org/x/sync/singleflight"
)
func main() {
var g singleflight.Group
var wg sync.WaitGroup
// 模擬多個 goroutine 併發請求相同的資源
for i := 0; i < 5; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
v, err, shared := g.Do("objectkey", func() (interface{}, error) {
fmt.Printf("協程ID:%v 正在執行...\n", idx)
time.Sleep(2 * time.Second)
return "objectvalue", nil
})
if err != nil {
log.Fatalf("err:%v", err)
}
fmt.Printf("協程ID:%v 請求結果: %v, 是否共享結果: %v\n", idx, v, shared)
}(i)
}
wg.Wait()
}
在這個例子中,多個 Goroutine 併發地請求 Key 爲 “objectkey” 的資源。通過 singleflight,我們確保只有一個 Goroutine 去執行實際的數據加載操作,而其他請求則等待這個操作的結果。接下來,我們將探討 singleflight 的原理。
02 - singleflight 的原理
singleflight 庫提供了一個 Group 結構體,用於管理不同的請求,意圖在內部實現聚合的效果。定義如下:
type Group struct {
mu sync.Mutex // 互斥鎖,包含下面的映射表
m map[string]*call // 正在執行請求的映射表
}
Group 結構的核心就是這個 map 結構。每個正在執行的請求被封裝在 call 結構中,定義如下:
type call struct {
wg sync.WaitGroup // 用於同步併發的請求
val interface{} // 用於存放執行的結果
err error // 存放執行的結果
dups int // 用於計數聚合的請求
// ...其他字段用於處理特殊情況和提高容錯性
}
Group 結構的 Do 方法實現了聚合去重的核心邏輯,代碼實現如下所示:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
// 用 map 結構,來判斷是否已經有對應 Key 正在執行的請求
if c, ok := g.m[key]; ok {
c.dups++
// 如果有對應 Key 的請求正在執行,那麼等待結果即可。
g.mu.Unlock()
c.wg.Wait()
// ...
return c.val, c.err, true
}
// 創建一個代表執行請求的結構,和 Key 關聯起來,存入map中
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn) // 真正執行請求
return c.val, c.err, c.dups > 0
}
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
defer func() {
// ...省略異常處理
c.wg.Done()
}()
func() {
// 真正執行請求
c.val, c.err = fn()
}()
// ...
}
通過上述代碼,singleflight 的 Group 結構體利用 map 記錄了正在執行的請求,關聯了請求的 Key 和執行體。當新的請求到來時,先檢查是否有相同 Key 的正在執行的請求,如果有,則等待起結果,從而避免重複執行相同的請求。
動畫示意圖:
對於讀操作,singleflight 通過這種方式有效地減少了重複工作。然而,對於寫操作,處理邏輯會有所不同,它需要額外的機制來保證數據落盤的時序。
寫請求的聚合
我們先回憶一下寫操作的姿勢。首先通過 Write 系統調用來寫入數據,默認情況下此時數據可能僅駐留在 PageCache 中,爲了確保數據安全落盤,此時我們需要手動調用一次 Sync 系統調用。
然而,Sync 操作的成本相當大,並且它除了數據,還會同步元數據等其他信息到磁盤上。對於性能影響巨大。並且,在機械盤的場景下,串行化的執行 Sync 是更好的實踐。
因此,我們面臨的一個問題是:如果在不犧牲數據安全性的前提下,能否減少 Sync 的次數呢?
對於同一個文件的寫操作,合併 Sync 操作是可行的。
文件的 Sync 會將當前時刻文件在內存中的全部數據一次性同步到磁盤。無論之前執行過多少次 Write 調用,一次 Sync 就能全部刷盤。這正是聚合寫請求以優化性能的關鍵所在。
01 - 寫聚合的原理
假設對同一個文件寫了三次數據,每一次都是 Write+Sync 的操作。那麼在合適的時機,三次 Sync 調用可以優化成一次。如下圖所示:
請求 C 的 Sync 操作是在所有請求的 Write 之後才發起的,所以它必定能保證在此之前的所有變更的數據都安全落盤。這就是寫操作聚合的根本原理。
接下來我們來思考兩個問題。
問題一:有童鞋可能會問,讀寫聚合優化感覺有一點相似?那能否用 singleflight 聚合寫操作呢?
例如,當併發調用 Sync 的時候,如果發現有正在執行的 Sync,能否共享這次 Sync 請求呢?
答案是:不可以。使用 singleflight 來優化寫無法保證數據的安全性。
我們必須要保證的是,Sync 操作一定要在 Write 完成之後發起。只要兩者存在併發的可能性,那麼 Sync 就不能保證攜帶了這次 Write 操作的數據,也就無法保證安全性。
示意圖:
還是以上面的圖爲例來說明,當請求 B 完成 Write 操作後,看到請求 A 已經發起了 Sync 操作。此時它是無法判斷請求 A 的 Sync 操作是否包含了請求 B 的數據。從圖示我們也很清晰的看到,請求 B 的 Write 和請求 A 的 Sync 在時間上存在重疊。
因此,當 Write 完成後,如果發現有一個 Sync 正在執行,我們不能簡單地複用這個 Sync。我們需要啓動一個新的 Sync 操作。
問題二:那麼聚合的時機在哪裏呢?
對於讀請求的聚合,其時機相對直觀:一旦發現有針對同一個 Key 的請求,就可以等待這次的結果並複用該結果。但寫請求的聚合時機則不是,它的聚合時機是在等待中遇到 “志同道合 “的請求。
讓我們通過一個具體例子來說明(注意,以下所有的請求都是針對相同的文件):
-
t0 時刻:A 執行了 Write,並嘗試發起 Sync,由於此時沒有其他請求在執行,A 便執行真正的 Sync 操作。
-
t1 時刻:B 執行了 Write,發現已經有請求在 Sync 了(即 A),因此進入等待狀態,直到 A 完成。
-
t2 時刻:C 執行了 Write,發現已經有請求在 Sync 了(即 A),因此進入等待狀態,直到 A 完成。
-
t3 時刻:D 執行了 Write,發現已經有請求在 Sync 了(即 A),因此進入等待狀態,直到 A 完成。
-
t4 時刻:A 的 Sync 操作終於完成。A 隨即通知 B、C、D 三位,告知它們可以進行 Sync 請求了。
-
t5 時刻:從 B、C、D 中選擇一個來執行一次 Sync 操作。假設 B 被選中,則 C、D 請求則等待 B 完成 Sync 即可。B 發起的 Sync 操作一定包含了 B,C,D 三者寫的數據,確保了安全性。
-
t6:B 的 Sync 操作完成,C、D 被通知操作已完成。如此一來,B、C、D 三者的數據都確保落盤。
正如上述所演示,寫操作的聚合是在等待前一次 Sync 操作完成期間收集到的請求。本來需要 4 次 Sync 操作,現在僅需 2 次 Sync 就可以確保數據的安全性。
在高併發的場景下,這種聚合方式的效益尤爲顯著。下面,我們將探討這種策略的具體代碼實現。
02 - 寫聚合的代碼實現
實現寫操作聚合的關鍵在於確保數據安全的時序前提下進行聚合。以下是一種典型和實現方式,它是對 sync.Cond 和 sync.Once 的巧妙應用。首先,我們定義一個負責聚合的結構體,如下:
// SyncJob 用於管理一個文件的 Sync 任務
type SyncJob struct {
*sync.Cond // 聚合 Sync 的關鍵
holding int32 // 記錄聚合的個數
lastErr error // 記錄執行 Sync 結果
syncPoint *sync.Once // 確保同一時間只有一個 Sync 執行
syncFunc func(interface{}) error // 實際執行 Sync 的函數
}
// SyncJob 的構建函數
func NewSyncJob(fn func(interface{}) error) *SyncJob {
return &SyncJob{
Cond: sync.NewCond(&sync.Mutex{}),
syncFunc: fn,
syncPoint: &sync.Once{},
}
}
接下來,我們爲 SyncJob 定義一個執行聚合的方法,如下:
func (s *SyncJob) Do(job interface{}) error {
s.L.Lock()
if s.holding > 0 {
// 如果有請求在前面,則等待前一次請求完成。
// 等待的過程中,會有"志同道合"之人
s.Wait()
}
// 準備要下發請求了,增加計數
s.holding += 1
syncPoint := s.syncPoint
s.L.Unlock()
// "志同道合"的人一起來到這裏,此時已經滿足 Write 和 Sync 的時序關係。
// 使用 sync.Once 確保只有請求者執行同步操作。
syncPoint.Do(func() {
// 執行實際的 Sync 操作
s.lastErr = s.syncFunc(job)
s.L.Lock()
// holding 展示本批次有多少個請求
fmt.Printf("holding:%v\n", s.holding)
// 本次請求執行完成,重置計數器,準備下一輪聚合
s.holding = 0
s.syncPoint = &sync.Once{}
// 喚醒下一批的請求
s.Broadcast()
s.L.Unlock()
})
return s.lastErr
}
在這裏,我們使用了一個 Go 的 sync.Cond 來阻塞和通知等待中的請求,並通過 sync.Once 確保同步操作同一時間、同一批只有一個在執行。
- 其實在這個場景下,從代碼實現來講,sync.Cond 也可以使用 Go 的 Channel 來實現相同的效果,用 Ch← 來阻塞,用 close(Ch) 來通知。效果是一樣的,感興趣的童鞋可以改造試試。
現在讓我們來看看這段代碼的實際運行效果:
func main() {
file, err := os.OpenFile("hello.txt", os.O_RDWR, 0700)
if err != nil {
log.Fatal(err)
}
defer file.Close()
// 初始化 Sync 聚合服務
syncJob := NewSyncJob(func(interface{}) error {
fmt.Printf("do sync...\n")
time.Sleep(time.Second())
return file.Sync()
})
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 執行寫操作 write ...
fmt.Printf("write...\n")
// 觸發 sync 操作
syncJob.Do(file)
}()
}
wg.Wait()
}
通過上述代碼,我們講對文件寫入操作後的 Sync 調用進行有效的聚合。童鞋們可以多次運行程序,觀察其行爲。可以通過觀察打印的 holding 字段獲悉每一批聚合的請求是多少個。
思考:從效果來講,上面的代碼無論怎麼跑,最少要執行兩次 Sync。你知道是爲什麼嗎?
動畫示意圖:
總結
上面介紹了讀寫聚合優化的兩種實現。讀和寫的聚合是有區別的。
-
讀操作,核心是一個 map,只要有相同 Key 的讀取正在執行,那麼等待這份正在執行的請求的結果也是符合預期的。同步等待則用的是 sync.WaitGroup 來實現。
-
寫操作,核心是要先保證數據安全性。它必須保證 Sync 操作在 Write 操作之後。因此當發現有正在執行的 Sync 操作,那麼就等待這次完成,然後必須重新開啓一輪的 Sync 操作,等待的過程也是聚合的時機。我們可以使用 sync.Cond(或者 Channel )來實現阻塞和喚醒,使用 sync.Once 來保證同一時間單個執行。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/DBjO5jKnQHy0m2gIWVdgdw