深入理解 go singleflight

【導讀】本文整理了 go 語言緩存插件 singleflight 的用法和使用場景。

前言

最近從 java 轉到 go,來公司第一個開發工作就是對一個資源請求去重複,最終發現這個 singleflight 這個好東西,分享一下。

singleflight 使用場景

  1. 緩存擊穿:緩存在某個時間點過期的時候,恰好在這個時間點對這個 Key 有大量的併發請求過來,這些請求發現緩存過期一般都會從後端 DB 加載數據並回設到緩存,這個時候大併發的請求可能會瞬間把後端 DB 壓垮。
  1. 請求資源去重複

singleflight 簡介

singleflightgolang.org/x/sync/singleflight 項目下,對外提供了以下幾個方法

//Do方法,傳入key,以及回調函數,如果key相同,fn方法只會執行一次,同步等待
//返回值v:表示fn執行結果
//返回值err:表示fn的返回的err
//返回值shared:表示是否是真實fn返回的還是從保存的map[key]返回的,也就是共享的
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
//DoChan方法類似Do方法,只是返回的是一個chan
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
//設計Forget 控制key關聯的值是否失效,默認以上兩個方法只要fn方法執行完成後,內部維護的fn的值也刪除(即併發結束後就失效了)
func (g *Group) Forget(key string)

singleflight 的使用

從 singleflight 的 test 探尋最簡單用法

func TestDo(t *testing.T) {
    var g Group
    // key 可以理解資源的id
    v, err, _ := g.Do("key", func() (interface{}, error) {
    // do what you want
        return "bar", nil
    })
    if got, want := fmt.Sprintf("%v (%T)", v, v)"bar (string)"; got != want {
        t.Errorf("Do = %v; want %v", got, want)
    }
    if err != nil {
        t.Errorf("Do error = %v", err)
    }
}

驗證併發重複請求

func process(g *Group, t *testing.T, ch chan int, key string) {
    for count := 0; count < 10; count++ {
        v, err, shared := g.Do(key, func() (interface{}, error) {
            time.Sleep(1000 * time.Millisecond)
            return "bar", nil
        })
        t.Log("v = ", v, " err = ", err, " shared =", shared, " ch :", ch, "g ", len(g.m))
        if got, want := fmt.Sprintf("%v (%T)", v, v)"bar (string)"; got != want {
            t.Errorf("Do = %v; want %v", got, want)
        }
        if err != nil {
            t.Errorf("Do error = %v", err)
        }
    }
    ch <- 1
}

func TestDo1(t *testing.T) {
    var g Group
    channels := make([]chan int, 10)
    key := "key"
    for i := 0; i < 10; i++ {
        channels[i] = make(chan int)
        go process(&g, t, channels[i], key)
    }
    for i, ch := range channels {
        <-ch
        fmt.Println("routine ", i, "quit!")
    }
}

singleflight 的原理

call


call 用來表示一個正在執行或已完成的函數調用。

// call is an in-flight or completed singleflight.Do call
type call struct {
    wg sync.WaitGroup

    // These fields are written once before the WaitGroup is done
    // and are only read after the WaitGroup is done.
    //val和err用來記錄fn發放執行的返回值
    val interface{}
    err error

    // forgotten indicates whether Forget was called with this call's key
    // while the call was still in flight.
    // 用來標識fn方法執行完成之後結果是否立馬刪除還是保留在singleflight中
    forgotten bool

    // These fields are read and written with the singleflight
    // mutex held before the WaitGroup is done, and are read but
    // not written after the WaitGroup is done.
    //dups 用來記錄fn方法執行的次數
    dups  int
    //用來記錄DoChan中調用次數以及需要返回的數據
    chans []chan<- Result
}

Group

Group 可以看做是任務的分類。

// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

Do 函數

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err, true
    }
    c := new(call)
    // 設置forgotten = true, doCall時 不再調用delete(g.m, key)
    // c.forgotten = true
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    c.val, c.err = fn()
    c.wg.Done()

    g.mu.Lock()
    if !c.forgotten {
        delete(g.m, key)
    }
    for _, ch := range c.chans {
        ch <- Result{c.val, c.err, c.dups > 0}
    }
    g.mu.Unlock()
}

在 Do 方法中是通過 waitgroup 來控制的,主要流程如下:

  1. 在 Group 中設置了一個 map,如果 key 不存在,則實例化 call(用來保存值信息),並將 key=>call 的對應關係存入 map 中通過 mutex 保證了併發安全

  2. 如果已經在調用中則 key 已經存在 map,則 wg.Wait

  3. 在 fn 執行結束之後(在 doCall 方法中執行)執行 wg.Done

  4. 卡在第 2 步的方法得到執行,返回結果

其他的 DoChan 方法也是類似的邏輯,只是返回的是一個 chan。

轉自:

jianshu.com/p/ec59beec138b

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/p2rlkCRRj7h7-aojTidQbg