Go 的三種擴展原語之 — SingleFlight

概述

singleflight.Group 是 Go 語言擴展包的另一種同步原語,它能夠在一個服務中抑制對下游的多次重複請求。一個比較常見的使用場景是,我們使用 Redis 對數據庫中的數據進行緩存,發生緩存擊穿時,大量請求會打到數據庫上進而影響服務的尾延時。

singleflight.Group 能夠有效地解決這個問題,它能夠限制對同一個鍵值對的多次重複請求,減少對下游的瞬時流量。

在資源的獲取非常昂貴時(例如訪問緩存、數據庫),就很適合使用 singleflight.Group 優化服務。它的使用方法如下:

type service struct {
    requestGroup singleflight.Group
}

func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
    v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
        rows, err := // select * from tables
        if err != nil {
            return nil, err
        }
        return rows, nil
    })
    if err != nil {
        return nil, err
    }
    return Response{
        rows: rows,
    }, nil
}

因爲請求的哈希在業務上一般表示相同的請求,所以上述代碼使用它作爲請求的鍵。當然,我們也可以選擇其他的字段作爲 singleflight.Group.Do 方法的第一個參數減少重複的請求。

結構體

singleflight.Group 結構體由一個互斥鎖 sync.Mutex 和一個映射表組成,每一個 singleflight.call 結構體都保存了當前調用對應的信息:

type Group struct {
 mu sync.Mutex
 m  map[string]*call
}

type call struct {
 wg sync.WaitGroup

 val interface{}
 err error

 dups  int
 chans []chan<- Result
}

singleflight.call 結構體中的 val 和 err 字段都只會在執行傳入的函數時賦值一次並在 sync.WaitGroup.Wait 返回時被讀取。

dups 和 chans 兩個字段分別存儲了抑制的請求數量以及用於同步結果的 Channel。

接口

singleflight.Group求的方法:

這兩個方法在功能上沒有太多的區別,只是在接口的表現上稍有不同。

每次調用 singleflight.Group.Do 方法時都會獲取互斥鎖,隨後判斷是否已經存在鍵對應的 singleflight.call

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
            g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
            c.dups++
            g.mu.Unlock()
            c.wg.Wait()
            return c.val, c.err, true
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

因爲 val 和 err 兩個字段都只會在 singleflight.Group.doCall 方法中賦值,所以當 singleflight.Group.doCall 和 sync.WaitGroup.Wait 返回時,函數調用的結果和錯誤都會返回給 singleflight.Group.Do 的調用方。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
 c.val, c.err = fn()
 c.wg.Done()

 g.mu.Lock()
 delete(g.m, key)
 for _, ch := range c.chans {
  ch <- Result{c.val, c.err, c.dups > 0}
 }
 g.mu.Unlock()
}
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 ch := make(chan Result, 1)
 g.mu.Lock()
 if g.m == nil {
  g.m = make(map[string]*call)
 }
 if c, ok := g.m[key]; ok {
  c.dups++
  c.chans = append(c.chans, ch)
  g.mu.Unlock()
  return ch
 }
 c := &call{chans: []chan<- Result{ch}}
 c.wg.Add(1)
 g.m[key] = c
 g.mu.Unlock()

 go g.doCall(c, key, fn)

 return ch
}

singleflight.Group.Do 和 singleflight.Group.DoChan 分別提供了同步和異步的調用方式,這讓我們使用起來也更加靈活。

小結

當我們需要減少對下游的相同請求時,可以使用 singleflight.Group 來增加吞吐量和服務質量,不過在使用的過程中我們也需要注意以下的幾個問題:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Kc5na-C1yb9lWjj2lXA-dw