Go 併發編程 —— I-O 聚合優化(動畫講解)

背景提要

在存儲系統中,在確保功能不受損的前提下,儘量的減少讀寫 I/O 的次數是優化的一個重要方向,也就是聚合 I/O 的場景。讀寫操作雖然都有聚合 I/O 的需求,但各自的重點和實現方法卻有所不同。接下來,我們將分別探討讀和寫請求的聚合優化方法。

讀請求的聚合

以讀操作中,緩存優化是一種常見的優化手段。具體做法是將讀取的數據存儲在內存中,並通過一個唯一的 Key 來索引這些數據。當讀請求來到時,如果該 Key 在緩存中沒有命中,那麼就需要從後端存儲獲取。用戶請求直接穿透到後端存儲,如果併發很大,這可能是一個很大的風險。

例如,對於 Key:“test”,如果緩存中沒有相應的數據,並且突然出現大量併發讀取請求,每個請求都會發現緩存未命中。如果這些請求全部直接訪問後端存儲,可能會給後端存儲帶來巨大壓力。

爲了應對這種情況,我們其實可以只允許一個讀請求去後端讀取數據,而其他併發請求則等待這個請求的結果。這就是讀請求聚合的基本原理。

在 Go 語言中,可以使用 singleflight 這類第三方庫完成上述需求。singleflight 的設計理念是 “單一請求執行”,即針對同一個 Key,在多個併發請求中只允許一個請求訪問後端。

01 - 讀請求聚合的使用姿勢


下面是一個使用 singleflight 的示例,展現瞭如何通過傳入特定的 Key 和閉包函數來聚合併發請求。

package main

import (
  // ...
 "golang.org/x/sync/singleflight"
)

func main() {
   var g singleflight.Group
   var wg sync.WaitGroup

   // 模擬多個 goroutine 併發請求相同的資源
   for i := 0; i < 5; i++ {
      wg.Add(1)
      go func(idx int) {
          defer wg.Done()
          v, err, shared := g.Do("objectkey", func() (interface{}, error) {
              fmt.Printf("協程ID:%v 正在執行...\n", idx)
              time.Sleep(2 * time.Second)
              return "objectvalue", nil
          })
          if err != nil {
              log.Fatalf("err:%v", err)
          }
          fmt.Printf("協程ID:%v 請求結果: %v, 是否共享結果: %v\n", idx, v, shared)
      }(i)
   }
   wg.Wait()
}

在這個例子中,多個 Goroutine 併發地請求 Key 爲 “objectkey” 的資源。通過 singleflight,我們確保只有一個 Goroutine 去執行實際的數據加載操作,而其他請求則等待這個操作的結果。接下來,我們將探討 singleflight 的原理。

02 - singleflight 的原理


singleflight 庫提供了一個 Group 結構體,用於管理不同的請求,意圖在內部實現聚合的效果。定義如下:

type Group struct {
   mu sync.Mutex       // 互斥鎖,包含下面的映射表
   m  map[string]*call // 正在執行請求的映射表
}

Group 結構的核心就是這個 map 結構。每個正在執行的請求被封裝在 call 結構中,定義如下:

type call struct {
   wg sync.WaitGroup // 用於同步併發的請求
   val interface{}   // 用於存放執行的結果
   err error         // 存放執行的結果
   dups  int         // 用於計數聚合的請求
    // ...其他字段用於處理特殊情況和提高容錯性
}

Group 結構的 Do 方法實現了聚合去重的核心邏輯,代碼實現如下所示:

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   g.mu.Lock()
   if g.m == nil {
      g.m = make(map[string]*call)
   }
   // 用 map 結構,來判斷是否已經有對應 Key 正在執行的請求
   if c, ok := g.m[key]; ok {
      c.dups++
      // 如果有對應 Key 的請求正在執行,那麼等待結果即可。
      g.mu.Unlock()
      c.wg.Wait()
      // ...
      return c.val, c.err, true
   }
   // 創建一個代表執行請求的結構,和 Key 關聯起來,存入map中
   c := new(call)
   c.wg.Add(1)
   g.m[key] = c
   g.mu.Unlock()
   g.doCall(c, key, fn) // 真正執行請求
   return c.val, c.err, c.dups > 0
}

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    defer func() {
      // ...省略異常處理
      c.wg.Done()
    }()
    func() {
        // 真正執行請求
         c.val, c.err = fn()
    }()
    // ...
}

通過上述代碼,singleflight 的 Group 結構體利用 map 記錄了正在執行的請求,關聯了請求的 Key 和執行體。當新的請求到來時,先檢查是否有相同 Key 的正在執行的請求,如果有,則等待起結果,從而避免重複執行相同的請求。

動畫示意圖:

對於讀操作,singleflight 通過這種方式有效地減少了重複工作。然而,對於寫操作,處理邏輯會有所不同,它需要額外的機制來保證數據落盤的時序。

寫請求的聚合

我們先回憶一下寫操作的姿勢。首先通過 Write 系統調用來寫入數據,默認情況下此時數據可能僅駐留在 PageCache 中,爲了確保數據安全落盤,此時我們需要手動調用一次 Sync 系統調用。

然而,Sync 操作的成本相當大,並且它除了數據,還會同步元數據等其他信息到磁盤上。對於性能影響巨大。並且,在機械盤的場景下,串行化的執行 Sync 是更好的實踐。

因此,我們面臨的一個問題是:如果在不犧牲數據安全性的前提下,能否減少 Sync 的次數呢?

對於同一個文件的寫操作,合併 Sync 操作是可行的。

文件的 Sync 會將當前時刻文件在內存中的全部數據一次性同步到磁盤。無論之前執行過多少次 Write 調用,一次 Sync 就能全部刷盤。這正是聚合寫請求以優化性能的關鍵所在。

01 - 寫聚合的原理


假設對同一個文件寫了三次數據,每一次都是 Write+Sync 的操作。那麼在合適的時機,三次 Sync 調用可以優化成一次。如下圖所示:

請求 C 的 Sync 操作是在所有請求的 Write 之後才發起的,所以它必定能保證在此之前的所有變更的數據都安全落盤。這就是寫操作聚合的根本原理。

接下來我們來思考兩個問題。

問題一:有童鞋可能會問,讀寫聚合優化感覺有一點相似?那能否用 singleflight 聚合寫操作呢?

例如,當併發調用 Sync 的時候,如果發現有正在執行的 Sync,能否共享這次 Sync 請求呢?

答案是:不可以。使用 singleflight 來優化寫無法保證數據的安全性。

我們必須要保證的是,Sync 操作一定要在 Write 完成之後發起。只要兩者存在併發的可能性,那麼 Sync 就不能保證攜帶了這次 Write 操作的數據,也就無法保證安全性。

示意圖:

還是以上面的圖爲例來說明,當請求 B 完成 Write 操作後,看到請求 A 已經發起了 Sync 操作。此時它是無法判斷請求 A 的 Sync 操作是否包含了請求 B 的數據。從圖示我們也很清晰的看到,請求 B 的 Write 和請求 A 的 Sync 在時間上存在重疊。

因此,當 Write 完成後,如果發現有一個 Sync 正在執行,我們不能簡單地複用這個 Sync。我們需要啓動一個新的 Sync 操作。

問題二:那麼聚合的時機在哪裏呢?

對於讀請求的聚合,其時機相對直觀:一旦發現有針對同一個 Key 的請求,就可以等待這次的結果並複用該結果。但寫請求的聚合時機則不是,它的聚合時機是在等待中遇到 “志同道合 “的請求。

讓我們通過一個具體例子來說明(注意,以下所有的請求都是針對相同的文件):

  1. t0 時刻:A 執行了 Write,並嘗試發起 Sync,由於此時沒有其他請求在執行,A 便執行真正的 Sync 操作。

  2. t1 時刻:B 執行了 Write,發現已經有請求在 Sync 了(即 A),因此進入等待狀態,直到 A 完成。

  3. t2 時刻:C 執行了 Write,發現已經有請求在 Sync 了(即 A),因此進入等待狀態,直到 A 完成。

  4. t3 時刻:D 執行了 Write,發現已經有請求在 Sync 了(即 A),因此進入等待狀態,直到 A 完成。

  5. t4 時刻:A 的 Sync 操作終於完成。A 隨即通知 B、C、D 三位,告知它們可以進行 Sync 請求了。

  6. t5 時刻:從 B、C、D 中選擇一個來執行一次 Sync 操作。假設 B 被選中,則 C、D 請求則等待 B 完成 Sync 即可。B 發起的 Sync 操作一定包含了 B,C,D 三者寫的數據,確保了安全性。

  7. t6:B 的 Sync 操作完成,C、D 被通知操作已完成。如此一來,B、C、D 三者的數據都確保落盤。

正如上述所演示,寫操作的聚合是在等待前一次 Sync 操作完成期間收集到的請求。本來需要 4 次 Sync 操作,現在僅需 2 次 Sync 就可以確保數據的安全性。

在高併發的場景下,這種聚合方式的效益尤爲顯著。下面,我們將探討這種策略的具體代碼實現。

02 - 寫聚合的代碼實現


實現寫操作聚合的關鍵在於確保數據安全的時序前提下進行聚合。以下是一種典型和實現方式,它是對 sync.Cond 和 sync.Once 的巧妙應用。首先,我們定義一個負責聚合的結構體,如下:

// SyncJob 用於管理一個文件的 Sync 任務
type SyncJob struct {
   *sync.Cond                         // 聚合 Sync 的關鍵
   holding    int32                   // 記錄聚合的個數
   lastErr    error                   // 記錄執行 Sync 結果
   syncPoint  *sync.Once              // 確保同一時間只有一個 Sync 執行
   syncFunc   func(interface{}) error // 實際執行 Sync 的函數
}

// SyncJob 的構建函數
func NewSyncJob(fn func(interface{}) error) *SyncJob {
   return &SyncJob{
      Cond:      sync.NewCond(&sync.Mutex{}),
      syncFunc:  fn,
      syncPoint: &sync.Once{},
   }
}

接下來,我們爲 SyncJob 定義一個執行聚合的方法,如下:

func (s *SyncJob) Do(job interface{}) error {
 s.L.Lock()
 if s.holding > 0 {
  // 如果有請求在前面,則等待前一次請求完成。
    // 等待的過程中,會有"志同道合"之人
  s.Wait()
 }
 // 準備要下發請求了,增加計數
 s.holding += 1
 syncPoint := s.syncPoint
 s.L.Unlock()

 // "志同道合"的人一起來到這裏,此時已經滿足 Write 和 Sync 的時序關係。
  // 使用 sync.Once 確保只有請求者執行同步操作。
 syncPoint.Do(func() {
  // 執行實際的 Sync 操作
  s.lastErr = s.syncFunc(job)

  s.L.Lock()
    // holding 展示本批次有多少個請求
    fmt.Printf("holding:%v\n", s.holding)
  // 本次請求執行完成,重置計數器,準備下一輪聚合
  s.holding = 0
  s.syncPoint = &sync.Once{}
  // 喚醒下一批的請求
  s.Broadcast()
  s.L.Unlock()
 })
 return s.lastErr
}

在這裏,我們使用了一個 Go 的 sync.Cond 來阻塞和通知等待中的請求,並通過 sync.Once 確保同步操作同一時間、同一批只有一個在執行。

現在讓我們來看看這段代碼的實際運行效果:

func main() {
 file, err := os.OpenFile("hello.txt", os.O_RDWR, 0700)
 if err != nil {
  log.Fatal(err)
 }
 defer file.Close()

 // 初始化 Sync 聚合服務
 syncJob := NewSyncJob(func(interface{}) error {
  fmt.Printf("do sync...\n")
    time.Sleep(time.Second())
  return file.Sync()
 })

 wg := sync.WaitGroup{}
 for i := 0; i < 10; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()
   // 執行寫操作 write ...
   fmt.Printf("write...\n")
   // 觸發 sync 操作
   syncJob.Do(file)
  }()
 }
 wg.Wait()
}

通過上述代碼,我們講對文件寫入操作後的 Sync 調用進行有效的聚合。童鞋們可以多次運行程序,觀察其行爲。可以通過觀察打印的 holding 字段獲悉每一批聚合的請求是多少個。

思考:從效果來講,上面的代碼無論怎麼跑,最少要執行兩次 Sync。你知道是爲什麼嗎?

動畫示意圖:

總結

上面介紹了讀寫聚合優化的兩種實現。讀和寫的聚合是有區別的。

  1. 讀操作,核心是一個 map,只要有相同 Key 的讀取正在執行,那麼等待這份正在執行的請求的結果也是符合預期的。同步等待則用的是 sync.WaitGroup 來實現。

  2. 寫操作,核心是要先保證數據安全性。它必須保證 Sync 操作在 Write 操作之後。因此當發現有正在執行的 Sync 操作,那麼就等待這次完成,然後必須重新開啓一輪的 Sync 操作,等待的過程也是聚合的時機。我們可以使用 sync.Cond(或者 Channel )來實現阻塞和喚醒,使用 sync.Once 來保證同一時間單個執行。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DBjO5jKnQHy0m2gIWVdgdw