深入理解 Golang 併發工具 - Singleflight

前言

前段時間在一個項目裏使用到了分佈式鎖進行共享資源的訪問限制,後來瞭解到 Golang 裏還能夠使用 singleflight 對共享資源的訪問做限制,於是利用空餘時間瞭解,將知識沉澱下來, 並做分享

文章儘量用通俗的語言表達自己的理解,從入門 demo 開始,結合源碼分析 singleflight 的重點方法,最後分享 singleflight 的實際使用方式與需要注意的 “坑 “。

另外需注意, 本文分析的 singleflight 源碼位於 https://cs.opensource.google/go/x/sync/+/036812b2:singleflight/singleflight.go

這一點請區別於 https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go 的實現。

定義

按照官方文檔的定義, singleflight 提供了一個重複的函數調用抑制機制

Package singleflight provides a duplicate function call suppression

用途

通俗的來說就是 singleflight 將相同的併發請求合併成一個請求,進而減少對下層服務的壓力,通常用於解決緩存擊穿的問題

簡單 Demo

var (
    sfKey1 = "key1"
    wg     *sync.WaitGroup
    sf     singleflight.Group
    nums   = 10
)

func getValueService(key string) { //service
   var val string
   wg = &sync.WaitGroup{}
   wg.Add(nums)
   for idx := 0; idx < nums; idx++ { // 模擬多協程同時請求
      go func(idx int) { // 注意for的一個小坑
         defer wg.Done()
         value, _ := getAndSetCacheNoChan(idx, key) //簡化代碼,不處理error
         log.Printf("request %v get value: %v", idx, value)
         val = value
      }(idx)
   }
   wg.Wait()
   log.Println("val: ", val)
   return
}

// getValueBySingleflight 使用singleflight取cacheKey對應的value值
func getValueBySingleflight(idx int, cacheKey string) (string, error) {
   log.Printf("idx %v into-cache...", idx)
   // 調用singleflight的Do()方法
   value, _, _ := sf.Do(cacheKey, func() (ret interface{}, err error) {
      log.Printf("idx %v is-setting-cache", idx)
      // 休眠0.1s以捕獲併發的相同請求
      time.Sleep(100 * time.Millisecond)
      log.Printf("idx %v set-cache-success!", idx)
      return "myValue", nil
   })
   return value.(string), nil
}

看看實際效果

源碼分析

結構

type (
   Group struct { // singleflight實體
      mu sync.Mutex       // 互斥鎖
      m  map[string]*call // 懶加載
   }

   call struct {
      wg sync.WaitGroup
      // 存儲 調用singleflight.Do()方法返回的結果
      val interface{}
      err error

      // 調用singleflight.Forget(key)時將對應的key從Group.m中刪除
      forgotten bool

      // 通俗的理解成singleflight合併的併發請求數
      dups  int
      // 存儲 調用singleflight.DoChan()方法返回的結果
      chans []chan<- Result
   }

   Result struct {
      Val    interface{}
      Err    error
      Shared bool
   }
)

對外暴露的方法

func Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)   

func DoChan(key string, fn func() (interface{}, error)) <-chan Result) 

// 將key從Group.m中刪除
func Forget(key string)

DoChan()Do()最大的區別是DoChan()屬於異步調用,返回一個 channel,解決同步調用時的阻塞問題

重點方法分析

Do

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   g.mu.Lock() // 加互斥鎖
   if g.m == nil { // 懶加載map
      g.m = make(map[string]*call)
   }
   if c, ok := g.m[key]; ok { // 檢查相同的請求已經是否進入過singleflight
      c.dups++
      g.mu.Unlock()
      c.wg.Wait() // 調用waitGroup的wait()方法阻塞住本次調用,等待第一個進入singleflight的請求執行完畢拿到結果,將本次請求喚醒.

      if e, ok := c.err.(*panicError); ok { //如果調用完成,發生error ,將error上拋
         panic(e)
      } else if c.err == errGoexit {
         runtime.Goexit()
      }
      // 返回調用結果
      return c.val, c.err, true
   }
   c := new(call) // 相同的請求第一次進入singleflight
   c.wg.Add(1)
   g.m[key] = c // new一個call實體,放入singleflight.call這個map
   g.mu.Unlock()

   g.doCall(c, key, fn) //實際執行的函數
   return c.val, c.err, c.dups > 0
}

完整流程圖如下

由源碼可以分析出,最後實際執行我們業務邏輯的函數其實是放到了doCall()裏,我們稍後分析這個函數

Forget

再簡單看看 Forget() 函數, 很短

func (g *Group) Forget(key string) {
   g.mu.Lock()
   if c, ok := g.m[key]; ok {
      c.forgotten = true // key的forgotten標誌位記爲true
   }
   delete(g.m, key)  // Group.m中刪除對應的key
   g.mu.Unlock()
}

doCall

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
   normalReturn := false
   recovered := false

    //使用雙重defer來區分error的類型: panic && runtime.error
   defer func() { 
      if !normalReturn && !recovered {
        // fn()發生了panic且fn()中的panic沒有被recover掉
        // errGoexit連接runtime.Goexit錯誤
         c.err = errGoexit 
      }

      c.wg.Done()
      g.mu.Lock()
      defer g.mu.Unlock()
      if !c.forgotten { // 檢查key是否調用了Forget()
         delete(g.m, key)
      }

      if e, ok := c.err.(*panicError); ok {
         // 如果返回的是 panic 錯誤,爲了避免channel被永久阻塞,我們需要確保這個panic無法被recover
         if len(c.chans) > 0 {
            go panic(e)  // panic無法被恢復
            select {} // 阻塞本goroutine以確保本goroutine出現在奔潰堆棧中
         } else {
            panic(e)
         }
      } else {
         // 將結果正常地返回
         for _, ch := range c.chans {
            ch <- Result{c.val, c.err, c.dups > 0}
         }
      }
   }()

   func() {
      defer func() {
         if !normalReturn {
            // 表示fn()發生了panic()
            // 此時與panic相關的堆棧已經被丟棄(調用的fn()) ,無法通過堆棧跟蹤去確定error類型
            if r := recover(); r != nil {
               c.err = newPanicError(r) //new一個新的自定義panic err,往第一個defer拋
            }
         }
      }()
     // 執行我們實際的業務邏輯,並將業務方法的返回值賦給singleflight.call
      c.val, c.err = fn()
      // 如果fn()發生panic,normalReturn無法被賦值爲true,而是進入doCall()的第二個defer()
      normalReturn = true
   }()
   // 如果normalResult爲false時,表示fn()發生了panic
   // 但是執行到了這一步,表示fn()中的panic被recover了
   if !normalReturn {
      recovered = true // recovered標誌位置爲true
   }
}

由以上分析可以得出幾個重要的結論

  1. singleflight 主要使用 sync.Mutex 和 sync.WaitGroup 進行併發控制

  2. 對於 key 相同的請求, singleflight 只會處理的一個進入的請求,後續的請求都會使用 waitGroup.wait() 將請求阻塞

  3. 使用雙重 defer() 區分了 panic 和 runtime.Goexit 錯誤,如果返回的是一個 panic 錯誤,group.c.chans 會發生阻塞,那麼需要拋出這個 panic 且確保其無法被 recover

實際使用

分享一段實際項目中使用 singleflight 結合本地緩存的代碼模版

func (srv Service) getDataBySingleFlight(ctx  context.Context) (entity.List, error) {
    // 1. 從localCache查
    resData, err := local_cache.Get(ctx, key)
    if err != nil {
       log.Fatalln()
       return resData, err
    }
    if resData != nil {
       return resData, nil
    }
    // 2. localCache無數據,從redis查
    resData, err = srv.rdsRepo.Get()
    if err != nil && err != redis.Nil {
       // redis錯誤
       log.Fatalln()
       return resData, err
    } else if redis.Nil == err {
           // redis無數據 ,查db
           resData, err, _ = singleFlight.Do(key, func() (interface{}, error) {
           // 構建db查詢條件
          searchConn := entity.SearchInfo{}
           //  建議休眠0.1s 捕獲0.1s內的重複請求
          time.Sleep(100 * time.Millisecond)
           // 4. 查db
          data, err := srv.dBRepo.GetByConn(ctx, searchConn)
          if err != nil {
             log.Fatalln()
             return data, err
          }
           // 5. 回寫localCache && redisCache
          err = local_cache.Set(ctx, data)
          if err != nil {
             log.Fatalln()
          }
          err = srv.rdsRepo.Set(ctx, data)
          if err != nil {
             log.Fatalln()
          }
      // 返回db數據,回寫cache的error不上拋
      return data, nil
   })
   return resData, err
}
return resData, nil

弊端與解決方案

singleflight 當然不是解決問題的銀彈,在使用的過程中有一些 “坑” 需要我們注意

解決方案:使用 singleflight 的DoChan()方法,在 service 層使用 channel+select 做超時控制

func enterGetAndSetCacheWithChan(ctx context.Context, key string) (str string, err error) {
   tag := "enterGetAndSetCacheWithChan"
   sonCtx, _ := context.WithTimeout(ctx, 2 * time.Second)
   val := ""
   nums := 10 //協程數
   wg = &sync.WaitGroup{}
   wg.Add(nums)
   for idx := 0; idx < nums; idx++ {
      go func() {
         defer wg.Done()
         val, err = getAndSetCacheWithChan(sonCtx, idx, key)
         if err != nil {
            log.Printf("err:[%+v]", err)
            return
         }
         str = val
      }()
   }
   wg.Wait()
   log.Printf("tag:[%s] val:[%s]", tag, val)
   return
}

func getAndSetCacheWithChan(ctx context.Context, idx int, cacheKey string) (string, error) {
   tag := "getAndSetCacheWithChan"
   log.Printf("tag: %s ;idx %d into-cache...", tag, idx)
   ch := sf.DoChan(cacheKey, func() (ret interface{}, err error) { // do的入參key,可以直接使用緩存的key,這樣同一個緩存,只有一個協程會去讀DB
      log.Printf("idx %v is-setting-cache", idx)
      time.Sleep(100 * time.Millisecond)
      log.Printf("idx %v set-cache-success!", idx)
      return "myValue", nil
   })
   for { // 選擇 context + select 超時控制
      select {
      case <-ctx.Done():
         return "", errors.New("ctx-timeout") // 根據業務邏輯選擇上拋 error
      case data, _ := <-ch:
         return data.Val.(string), nil
      default:
      }
   }
}

解決方案:根據實際情況,結合下游服務調用耗時與下游實際能支持的 QPS 等數據,對 key 做定時 Forget()。

go func() {
    time.Sleep(100 * time.Millisecond)
    g.Forget(key)
}()

最後,衷心希望本文能夠對各位讀者有一定的幫助。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/z4VewPYNe7y1fz6xQEzziA