一個例子,給你講透典型的 Go 併發控制

Go 中可以使用一個go關鍵字讓程序異步執行

一個比較常見的場景:逐個異步調用多個函數,或者循環中異步調用

func main() {
 go do1()
 go do2()
 go do3()
}

// 或者

func main() {
 for i := range []int{1,2,3}{
  go do(i)
 }
}

如果瞭解 Go 併發機制,就知道main在其他 goroutine 運行完成之前就已經結束了,所以上面代碼的運行結果是不符合預期的。我們需要使用一種叫做併發控制的手段,來保證程序正確運行

爲了更容易理解,我們虛擬一個🌰

已知有一個現成的函數search,能夠按照關鍵詞執行搜索

期望實現一個新的函數coSearch能夠進行批量查詢

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 if word == "Go" {
  return "", errors.New("error: Go") // 模擬結果
 }
 return fmt.Sprintf("result: %s", word), nil // 模擬結果
}

func coSearch(ctx context.Context, words []string) (results []string, err error) {
 //tbd

 return
}

func main() {
 words := []string{"Go""Rust""PHP""JavaScript""Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

可以先暫停想想該如何實現coSearch函數

併發控制基礎

sync.WaitGroup是 Go 標準庫中用來控制併發的結構,這裏放一個使用WaitGroup實現coSearch的示例

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 if word == "Go" {
  return "", errors.New("error: Go") // 模擬結果
 }
 return fmt.Sprintf("result: %s", word), nil // 模擬結果
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 var (
  wg      = sync.WaitGroup{}
  once    = sync.Once{}
  results = make([]string, len(words))
  err     error
 )

 for i, word := range words {
  wg.Add(1)

  go func(word string, i int) {
   defer wg.Done()

   result, e := search(ctx, word)
   if e != nil {
    once.Do(func() {
     err = e
    })

    return
   }

   results[i] = result
  }(word, i)
 }

 wg.Wait()

 return results, err
}

func main() {
 words := []string{"Go""Rust""PHP""JavaScript""Java"}
 results, err := coSearch(context.Background(), words)
 if err != nil {
  fmt.Println(err)
  return
 }

 fmt.Println(results)
}

上面的代碼中有非常多的細節,來逐個聊一聊

🌲 sync.WaitGroup{}併發控制

sync.WaitGroup{}的用法非常簡潔

對於coSearch來說,等待所有 goroutine 運行完成,也就完成了函數的任務,返回最終的結果

var (
    wg      = sync.WaitGroup{}
    //...省略其他代碼
)

for i, word := range words {
    wg.Add(1)

    go func(word string, i int) {
        defer wg.Done()
  //...省略其他代碼
    }(word, i)
}

wg.Wait()

🌲 for循環中的 goroutine!

這是一個 Go 經典錯誤,如果 goroutine 中使用了for迭代的變量,所有 goroutine 都會獲得最後一次循環的值。例如下面的示例,並不會輸出 "a", "b", "c" 而是輸出 "c", "c", "c"

func main() {
    done := make(chan bool)

    values := []string{"a""b""c"}
    for _, v := range values {
        go func() {
            fmt.Println(v)
            done <- true
        }()
    }

    // wait for all goroutines to complete before exiting
    for _ = range values {
        <-done
    }
}

正確的做法就是像上文示例一樣,將迭代的變量賦值給函數參數,或者賦值給新的變量

for i, word := range words {
 // ...
    go func(word string, i int) {
        // fmt.Println(word, i)
    }(word, i)
}

for i, word := range words {
    i, word := i, word
    go func() {
        // fmt.Println(word, i)
    }()
}

由於這個錯誤實在太常見,從 Go 1.22 開始 Go 已經修正了這個經典的錯誤:Fixing For Loops in Go 1.22。

不過 Go 1.22 默認不會開啓修正,需要設置環境變量GOEXPERIMENT=loopvar纔會 開啓

🌲  併發安全

簡單理解:當多個 goroutine 對同一個內存區域進行讀寫時,就會產生併發安全的問題,它會導致程序運行的結果不符合預期

上面的示例把最終的結果放入了results = make([]string, len(words))中。雖然我們在 goroutine 中併發的對於results變量進行寫入,但因爲每一個 goroutine 都寫在了獨立的位置,且沒有任何讀取的操作,因此results[i] = result是併發安全的

results = [ xxxxxxxx,    xxxxxxxx,    xxxxxxxx,    .... ]
                ^            ^            ^       
                |            |            |       
           goroutine1   goroutine2    goroutine3

這也意味着如果使用results = append(results, result)的方式併發賦值,因爲會涉及到 slice 的擴容等操作,所以並不是併發安全的,需要利用sync.Mutex{}進行加鎖

如果想盡可能的提高程序的併發性能,推薦使用 results[i] = result這種方式賦值

🌲 sync.Once{}單次賦值

示例coSearch中,會返回第一個出錯的searcherrorerr是一個全局變量,在併發 goroutine 中賦值是併發不安全的操作

//...省略其他代碼
go func(word string, i int) {
    defer wg.Done()

    result, e := search(ctx, word)
    if e != nil && err == nil {
        err = e

        return
    }

    results[i] = result
}(word, i)
//...省略其他代碼

對於全局變量的賦值比較常規做法就是利用sync.Mutex{}進行加鎖。但示例的邏輯爲單次賦值,我們剛好可以利用同在sync庫的sync.Once{}來簡化代碼

sync.Once{}功能如其名,將我們要執行的邏輯放到它的Do()方法中,無論多少併發都只會執行一次

//...省略其他代碼
go func(word string, i int) {
    defer wg.Done()

    result, e := search(ctx, word)
    if e != nil {
        once.Do(func() {
            err = e
        })

        return
    }

    results[i] = result
}(word, i)
//...省略其他代碼

Further more

上面的示例coSearch已經是一個比較完善的函數了,但我們還可以做得更多

🌲 goroutine 數量控制

coSearch入參的數組可能非常大,如果不加以控制可能導致我們的服務器資源耗盡,我們需要控制併發的數量

利用帶緩衝 channel 可以實現

tokens := make(chan struct{}, 10)

for i, word := range words {
    tokens <- struct{}{} // 新增
    wg.Add(1)

    go func(word string, i int) {
        defer func() {
            wg.Done()
            <-tokens  // 新增
        }()

        result, e := search(ctx, word)
        if e != nil {
            once.Do(func() {
                err = e
            })

            return
        }

        results[i] = result
    }(word, i)
}

wg.Wait()

如上,代碼中創建了 10 個緩衝區的 channel,當 channel 被填滿時,繼續寫入會被阻塞;當 goroutine 運行完成之後,除了原有的wg.Done(),我們需要從 channel 讀取走一個數據,來允許新的 goroutine 運行

通過這種方式,我們控制了coSearch最多隻能運行 10 個 goroutine,當超過 10 個時需要等待前面運行的 goroutine 結束

🌲 context.Context

併發執行的 goroutine 只要有一個出錯,其他 goroutine 就可以停止,沒有必要繼續執行下去了。如何把取消的事件傳導到其他 goroutine 呢?context.Context就是用來傳遞類似上下文信息的結構

ctx, cancel := context.WithCancelCause(ctx) // 新增
defer cancel(nil) // 新增

for i, word := range words {
    tokens <- struct{}{}
    wg.Add(1)

    go func(word string, i int) {
        defer func() {
            wg.Done()
            <-tokens
        }()

        result, e := search(ctx, word)
        if e != nil {
            once.Do(func() {
                err = e
                cancel(e) // 新增
            })

            return
        }

        results[i] = result
    }(word, i)
}

wg.Wait()

完整的代碼

最終完成的效果如下

package main

import (
 "context"
 "errors"
 "fmt"
 "sync"
)

func search(ctx context.Context, word string) (string, error) {
 select {
 case <-ctx.Done():
  return "", ctx.Err()
 default:
  if word == "Go" || word == "Java" {
   return "", errors.New("Go or Java")
  }
  return fmt.Sprintf("result: %s", word), nil // 模擬結果
 }
}

func coSearch(ctx context.Context, words []string) ([]string, error) {
 ctx, cancel := context.WithCancelCause(ctx)
 defer cancel(nil)

 var (
  wg   = sync.WaitGroup{}
  once = sync.Once{}

  results = make([]string, len(words))
  tokens  = make(chan struct{}, 2)

  err error
 )

 for i, word := range words {
  tokens <- struct{}{}
  wg.Add(1)

  go func(word string, i int) {
   defer func() {
    wg.Done()
    <-tokens
   }()

   result, e := search(ctx, word)
   if e != nil {
    once.Do(func() {
     err = e
     cancel(e)
    })

    return
   }

   results[i] = result
  }(word, i)
 }

 wg.Wait()

 return results, err
}

併發控制庫 errgroup

可以看到要實現一個較爲完備的併發控制,需要做的工作非常多。不過 Go 官方團隊爲大家準備了 golang.org/x/sync/errgroup

errgroup提供的能力和上文的示例類似,實現方式也類似,包含併發控制,錯誤傳遞,context.Context傳遞等

package main

import (
 "context"
 "fmt"
 "sync"

 "golang.org/x/sync/errgroup"
)

func coSearch(ctx context.Context, words []string) ([]string, error) {
 g, ctx := errgroup.WithContext(ctx)
 g.SetLimit(10)
 
 results := make([]string, len(words))

 for i, word := range words {
  i, word := i, word

  g.Go(func() error {
   result, err := search(ctx, word)
   if err != nil {
    return err
   }

   results[i] = result
   return nil
  })
 }

 err := g.Wait()

 return results, err
}

errgroup的用法也很簡單

利用golang.org/x/sync/errgroup大幅簡化了進行併發控制的邏輯,真是一個併發控制的利器啊!

總結

本篇從基礎的sync.WaitGroup{}庫出發,涉及到了併發安全、sync.Once等內容。最後介紹了併發控制的利器:golang.org/x/sync/errgroup

雖然使用 Go 語言能夠非常簡單的編寫併發程序,但其中要注意的細節非常多,忽略這些細節不僅沒有提升程序運行的效率,還會產生錯誤的結果


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/qxs6ikRy_Km5JlJesYmH4w