雲原生系列 Go 語言篇 - 併發

併發是一個計算機科學用語,將一個進程分割成獨立組件並指明這些組件如何安全共享數據。大部分語言通過庫提供併發,使用的是嘗試通過獲取鎖操作執行系統級共享數據的線程。Go 獨樹一幟。它的主要併發模塊,很多認認爲是 Go 的最著名的特性,基於 CSP(通訊順序過程)。它依據快速排序算法的發明人 Tony Hoare 在 1978 年的論文所描述的併發風格。根據 CSP 實現的模式和標準併發同樣強大,但容易理解多了。

本章中,我們將快速複習支持 Go 併發的核心特性:協程、通道以及select關鍵字。然後我們會學習一些常見的 Go 併發模式,接着我們會學習一些底層技術是更好方法的場景。

何時使用併發

我們先給出一些告誡。併發肯定讓程序受益。Go 新手在嘗試使用併發時,通常會經歷一系列階段:

  1. 它給棒,把所有代碼都塞到協程中。

  2. 程序並沒有變快。對通道添加緩衝。

  3. 通道發生阻塞並出現了死鎖。開始使用超大緩衝的有緩衝通道。

  4. 通道仍出現阻塞。開始使用互斥鎖。

  5. 算了,再也不使用併發了。

人們使用併發是因爲相信併發的程序運行速度更快。可惜有時卻事與願違。更多的併發並不會自動讓程序變快,而且會讓程序更難理解。核心點在於明白併發不是並行。併發是一種更好地組織待解決問題的框架。併發代碼是否並行(同時執行)取決於硬件以及算法是否允許。1967 年,計科的先驅之一 Gene Amdahl,提出了阿姆達爾定律。這是一個在給定了多少任務必須順序執行的情況下,並行處理能在多大程度上提升性能的公式。如果想深入瞭解阿姆達爾定律,可以學習 Clay Breshears 所著的併發的藝術一書。這裏我們只需要知道更多的併發並不表示更快的速度。

廣義來說,所有程序都遵循三步流程:接收數據、處理數據、輸出結果。是否該在程序中使用併發取決於數據在程序中如何流動。有時兩個步驟可以併發執行,因爲相互之間不依賴另一步驟的數據做進一步處理,而其中一步依賴於另一步的輸出時就需要順序執行。在合併多個可獨立運行的操作生成的數據時可使用併發。

另外要重點提一下所運行的任務耗時很短並不值得使用併發。併發是有開銷的,很多常見內存中的算法非常快,通過併發傳遞數據所帶來的開銷遠大於並行運行併發代碼所節省的時間。這也是爲什麼併發操作常用於 I/O,對磁盤或網絡進行數千次讀寫非常緩慢,而它們又是最複雜的內存進程。如果不確定併發是否有益,先編寫順序執行代碼,然後編寫基準測試來與併發實現進行性能比較。(參見編寫測試一章中如何編寫 benchmark 代碼的內容。)

考慮這樣一個例子。假設在編寫一個調用其它三個 web 服務的服務。向其中兩個服務發送數據,接收這兩次調用的結果發送給第三個服務,再返回結果。整個過程必須在 50 毫秒以內,否則會返回錯誤。這就是使用併發的一個好場景,因爲存在彼此不進行交互的 I/O 操作,以及合併結果的代碼,同時對於代碼時長還有限制。在本章結尾,我們就能明白如何實現這段代碼。

協程

協程是 Go 併發模型的核心概念。要理解協程,我們先做幾個名詞解釋。第一個是進程。進程是操作系統中運行中的一個程序實例。操作系統將進程關聯一些資源,比如內存,來確保其它進程不會佔用它們。進程由多個線程組成。線程是由操作系統分配了一定時間的執行單元。進程中的線程共享資源。CPU 根據核數可同時執行一個或多個線程的指令。操作系統的一項任務是調度 CPU 上的線程來保障進程(以及進程中的每個線程)有運行的機會。

協程是由 Go 運行時所管理的輕量進程。在 Go 程序開始運行時,Go 運行時會創建一些線程並啓動一個協程來運行程序。程序所創建的所有協程,包括初始協程,會自動由 Go 運行時調度器分配給這些線程,這就和操作系統在多核 CPU 間調度線程是一樣的。看上去可能是畫蛇添足,因爲底層操作系統已經有管理線程和進程的調度器了,但這麼做其實有如下好處:

這些優點可以讓 Go 程序產生成百上千甚至是幾萬個同步的協程。如果嘗試使用原生線程在語言中啓動幾千線程,程序會像蝸牛一樣慢。

小貼士:如果深入瞭解調度器的原理,可以聽聽 Kavya Joshi 在 GopherCon 2018 上主題爲 The Scheduler Saga 的演講。

通過在函數調用前添加go關鍵字來開啓協助。和其它函數一樣,可以向其傳遞參數來初始化狀態。但是函數的返回值會被忽略。

所有函數都能以協程啓動。這和 JavaScript 不同,JS 中必須由作者使用async關鍵字來聲明函數才能異步運行。但是 Go 中習慣上使用包裹着業務邏輯的閉包來啓動協程。閉包負責併發的登記。比如,閉包從通道中讀取值並傳遞給業務邏輯,業務完全不知道運行於協程之中。然後函數的結果會寫回到其它通道。(我們會在下一節中簡單地概覽通道。)責任的分享使得代碼模塊化、可測試,並將併發維護在你的 API 之外:

func process(val int) int {
    // do something with val
}

func runThingConcurrently(in <-chan int, out chan<- int) {
    go func() {
        for val := range in {
            result := process(val)
            out <- result
        }
    }()
}

通道

協程使用通道進行通訊。與切片和字典一樣,通道也是使用make函數創建的內置類型:

ch := make(chan int)

和字典一樣,通道也是引用類型。在將通道傳遞給函數時,實際是向通道傳遞一個指針。還是與字典和切片一樣,通道的零值是nil

讀、寫和緩衝

使用<-運算符來與通道通信。把<-運算符放到通道變量的左邊來讀取通道,而寫入通道時則放到右邊:

a := <-ch // 讀取ch中的值並賦值給a
ch <- b   // 將b中的值寫入ch

寫入通道的每個值只能進行一次讀取。如果多個協程從同一個通道中讀取,寫入到通道的值只會被其中的一個讀取。

協程很少讀取並寫入同一通道。在將通道賦值給一個變量或字段,或是傳遞給函數時,將箭頭放到chan關鍵字前 (ch <-chan int) 來表示協程僅從通道中進行讀取。將箭頭放到chan關鍵字之後 (ch chan<- int) 來表示協程僅向通道寫入。這樣 Go 編譯器就能保障通道僅由函數讀取或寫入。

默認通道是無緩衝的。每次對打開的無緩衝通道寫入都會導致寫協程暫停,直到另一個協程從同一個通道讀取。類似地,每次對打開的無緩衝通道讀取都會導致讀協程暫停,直到另一個協程對同一個通道寫入。這表示至少要有兩個併發運行的協程才能對無緩衝通道寫入或讀取。

Go 還有帶緩衝通道。這些通道會在不阻塞的情況下緩衝一定數量的寫入。如果緩衝滿了又沒有對通道的讀取,隨後對通道的寫入會暫停寫協程直到有對通道的讀取。就像向滿緩衝通道寫入一樣,讀取空緩衝通道也會阻塞住。

在創建通道時通過指定緩衝容量來創建有緩衝通道:

ch := make(chan int, 10)

內置函數lencap返回緩衝通道的相關信息。使用len找出緩衝中當前有多少值,使用cap來找出最大緩衝尺寸。緩衝的容量無法修改。

注:對lencap傳遞無緩衝通道會返回 0。這可以理解,因爲從定義上來說,無緩衝通道沒有用於存儲值的緩衝。

大部分時候都應當使用無緩衝通道。在何時使用緩衝和無緩衝通道一節中,我們會討論使用緩衝通道的場景。

for-range 和通道

也可以使用for-range循環來讀取通道:

for v := range ch {
    fmt.Println(v)
}

與其它for-range循環不同的是,通道中只聲明瞭一個變量,也就是其值。循環會一直持續,直至通道關閉或是出現了breakreturn語句。

關閉通道

在完成對通道寫入後,可以使用內置的close函數關閉通道:

close(ch)

通道在關閉後,寫入通道或再次關閉通道都會 panic。有趣的是,對關閉的通道讀取卻總是成功的。如果是有緩衝通道且值尚未被讀取,會按順序進行返回。如果是無緩衝通道或是有緩衝通道中沒有值,會返回通道類型的零值。

這就出現和字典相同的問題:在讀取通道時,怎麼區分寫入的就是零值還是因爲通道關閉而返回了零值?因爲 Go 致力於語言的一致性,答案也類似:我們使用逗號 ok 語法來檢測通道是否關閉了:

v, ok := <-ch

如果ok設爲了true,那麼通道是打開的。如若設爲了false,通道就是關閉的。

小貼士:在讀取有可能關閉的通道時,使用逗號 ok 語句來確保通道仍是開啓的。

關閉通道是寫入通道的協程的職責。注意只在協程等待通道關閉時才需要關閉通道(比如使用for-range循環來讀取通道)。因爲通道只是一種變量,Go 運行時可以檢測到其不再使用而進行垃圾回收。

通道是讓 Go 併發模型獨樹一幟的兩大特性之一。它引導我們把代碼看成一系列階段,並讓數據依賴清晰,也就更容易對併發進行推理。其它語言依靠全局共享狀態來在線程間通訊。可變共享狀態不利於理解程序中的數據流動,也讓我們瞭解線程是否是獨立的變得很難。

通道的行爲

通道有多種狀態,每個狀態的讀取、寫入或關閉的行爲都不同。通過表 10-1 來輔助理解。

圖 10-1 通道的行爲

b6tsdn

必須避免導致 Go 程序 panic 的場景。前面提到,標準模式是讓寫協程在沒內容再寫時負責關閉通道。在有多個協程向同一個通道寫入時,問題就變複雜了,因爲向同一個通道反覆調用close會 panic。此外,如果在一個協程中關閉通道,另一個協程在向其寫入時也會 panic。解決這一問題的方法是使用sync.WaitGroup。我們會在使用 WaitGroup 一節中通過案例學習。

nil通道也會很危險,但它也有使用場景。我們會在關閉 select 中的分支一節中講到。

select

select語句是另一個讓 Go 併發模型別具一格的功能。它是 Go 中併發的控制結構,可優雅解決一個常見問題:如果可以執行兩個併發操作,先執行哪一個呢?不能優先其中一個操作,否則可能一些情況永遠不會得到處理。這稱之爲飢餓(starvation)。

select關鍵字允許協程對一組多個通道讀取或寫入。它很像是一個空switch語句:

select {
case v := <-ch:
    fmt.Println(v)
case v := <-ch2:
    fmt.Println(v)
case ch3 <- x:
    fmt.Println("wrote", x)
case <-ch4:
    fmt.Println("got value on ch4, but ignored it")
}

select中的每個case爲對通道的讀取或寫入。如果某一case可進行讀取或寫入,那麼case的內容體就會執行。和switch一樣,select中的每個case有其獨立代碼塊。

如果有多條分支存在通道可讀取或寫入會怎麼樣呢?select算法很簡單:它隨機可執行的分支,順序並不重要。這與switch語言截然不同,後者總是選擇第一個解析爲true的分支。它還利落地解決了飢餓問題,沒有哪個case有優先級,全部同時進行檢測。

select隨機選擇的另一個好處是防止了最常見的死鎖歸因:以不一致的順序獲取鎖。如果有兩個協程都訪問同樣的兩個通道,必須在兩個協程中以同樣的順序進行訪問,否則會造成死鎖。這意味着兩者都不能繼續執行,因爲都在等待另一個。如果 Go 應用中的每個協程都出現了死鎖,Go 運行時會殺死程序(見例 10-1)。

例 10-1 死鎖協程

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func() {
        v := 1
        ch1 <- v
        v2 := <-ch2
        fmt.Println(v, v2)
    }()
    v := 2
    ch2 <- v
    v2 := <-ch1
    fmt.Println(v, v2)
}

在 The Go Playground 中運行這段程序,會得到如下錯誤:

fatal error: all goroutines are asleep - deadlock!

別忘了我們的main運行於啓動時 Go 運行時所開啓的協程。我們開啓協程必須在讀取到ch1之後才能繼續,而主協程必須在讀取到ch2之後才能繼續。

如果將主協程訪問的通道放到select中,就可以避免死鎖(見例 10-2)。

例 10-2 使用select來避免死鎖

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func() {
        v := 1
        ch1 <- v
        v2 := <-ch2
        fmt.Println(v, v2)
    }()
    v := 2
    var v2 int
    select {
    case ch2 <- v:
    case v2 = <-ch1:
    }
    fmt.Println(v, v2)
}

在 The Go Playground 中運行程序得到的輸出如下:

2 1

因爲select會檢測有沒有分支可以繼續,這就避免了死鎖。我們所開啓的協程將值 1 寫入ch1,因此主協程中從ch1中將值讀入v2可以執行。

因爲常常配合使用,這種組合通常被稱爲for-select循環。在使用for-select循環時,必須包含退出循環的方式。我們會在 done 通道模式一節中學習一種方法。

switch語句一樣,select語句可以加default分支。還是和switch一樣,在沒有分支的通道可讀取或寫入時會使用default分支。如果希望對通道實現非阻塞讀或寫,對select使用default。以下代碼在ch中無值可以讀取時不會等待,它會立即執行default內容體:

select {
case v := <-ch:
    fmt.Println("read from ch:", v)
default:
    fmt.Println("no value written to ch")
}

我們會在背壓(backpressure) 一節使用到default

注:在for-select循環中添加default分支通常都是有問題的。每次循環各分支中沒有內容可以讀寫時就會觸發該分支。這會讓for循環持續運行,耗費大量的 CPU。

併發實踐和模式

既然已經講解了 Go 爲併發所提供的基礎工具,我們就來學習一些併發的最佳實踐和模式吧。

保持 API 無併發

併發是一種實現細節,好的 API 設計應當儘可能隱藏實現細節。這樣在修改代碼時無需修改其調用方式。

在實踐中,這意味着永遠不要在 API 的類型、函數及方法中暴露通道或互斥鎖(我們會在何時用互斥鎖替換通道中討論互斥鎖)。如果暴露了通道,就將通道管理的職責交給 API 的使用者了。這表示使用者要關心通道是否有緩衝、是否關閉或是 nil。還有可能因訪問通道或互斥鎖的順序出問題而導致死鎖。

注:這並不是說不能將通道作爲函數參數或結構體參數。只是說不應導出。

這一規則也有一些例外。如果 API 是一個帶有併發幫助函數的庫(比如time.After,我們會在如何讓代碼超時一節中使用),通道就會是 API 的一部分。

協程、for 循環及各種變量

大部分時候,用於啓動協程的閉包沒有任何參數。它是通過聲明它的環境中捕獲變量。有一個通用場景這種方法不適用,也就是嘗試從獲取for循環的索引或值時。以下代碼包含一個隱藏的 bug:

func main() {
    a := []int{2, 4, 6, 8, 10}
    ch := make(chan int, len(a))
    for _, v := range a {
        go func() {
            ch <- v * 2
        }()
    }
    for i := 0; i < len(a); i++ {
        fmt.Println(<-ch)
    }
}

我們爲a中的每個開啓一個協程。看起來我們爲每個協程傳遞了不同的值,但運行代碼得到的結果卻是:

20
20
20
20
20

每個協程對ch所寫入的都是 20 的原因是,每個協程的閉包獲取的是同一個變量。for循環中的索引和值變量在每次迭代中是複用的。最後一次對v所賦的值是 10。運行協程時,這就是對協程可見的值。這一問題不只是對for循環,只要協程依賴的變量的值有可能發生變化,就必須將值傳遞給協程。有兩種實現方式。第一種是在循環內遮蔽該值:

for _, v := range a {
    v := v
    go func() {
        ch <- v * 2
    }()
}

如果希望避免遮蔽,讓代碼流更爲清晰,也可以把值作爲參數傳遞給協程:

for _, v := range a {
    go func(val int) {
        ch <- val * 2
    }(v)
}

小貼士:在協程使用的變量值會發生變化時,可以把值作爲參數傳遞給協程:

一定要清理好協程

在啓動協程函數時,必須要保證它最終會退出。與變量不同,Go 運行時無法監測到協程是否不再使用。如果協程不退出,調度器仍然會定期給它時間,什麼工作也不做,這會拖慢程序。這稱爲協程泄漏(goroutine leak)。

協程是否會退出可能並不那麼明顯。比如,使用協程作爲生成器:

func countTo(max int) <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < max; i++ {
            ch <- i
        }
        close(ch)
    }()
    return ch
}

func main() {
    for i := range countTo(10) {
        fmt.Println(i)
    }
}

注:這只是一個簡短示例,不要使用協程生成數字列表。操作太過簡單,違反了我們 “何時使用併發” 的指導方針。

在這個常見用例中,我們使用所有值的地方協程退出了。但如果循環過早退出,協程就會一直阻塞,等待從通道中讀取值:

func main() {
    for i := range countTo(10) {
        if i > 5 {
            break
        }
        fmt.Println(i)
    }
}

done 通道模式

done 通道模式提供了一種發送信號通知協程停止進程的方式。它使用一個通道來發送退出信號。我們來看向多個函數發送相同數據、但只需要最快函數的結果的示例:

func searchData(s string, searchers []func(string) []string) []string {
    done := make(chan struct{})
    result := make(chan []string)
    for _, searcher := range searchers {
        go func(searcher func(string) []string) {
            select {
            case result <- searcher(s):
            case <-done:
            }
        }(searcher)
    }
    r := <-result
    close(done)
    return r
}

在我的函數中,聲明瞭一個名稱爲done的通道,包含struct{}類型的數據。我們使用了空結構體類型,因爲其值並不重要,我們不會向該通道寫入,只是會關閉它。我們爲每個傳入的搜索函數開啓一個協程。worker 協程中的select語句會等待對result通道的寫入(在searcher函數返回之時)或是對done通道的讀取。回顧下讀取開啓的通道會等待有數據可讀並且讀取已關閉通知總是會返回通道的零值。這意味着從done讀取的分支會在關閉done前保持等待狀態。在searchData中,我們讀取第一個寫入result的值,然後關閉done。這會向協程發送信息讓其退出,防止協程泄漏。

有時希望根據調用棧中前面函數中的內容來停止協程。在上下文一章中,我們會學習如何使用上下文來告知一個或多個協程該關閉了。

使用 cancel 函數來終止協程

我們也可以使用 done 通道來實現函數一章中所看到的一種模式:與通道一起返回撤銷函數。我們回到前面的countTo示例來了解是如何使用的。撤銷函數必須在for循環之後調用:

func countTo(max int) (<-chan int, func()) {
    ch := make(chan int)
    done := make(chan struct{})
    cancel := func() {
        close(done)
    }
    go func() {
        for i := 0; i < max; i++ {
            select {
            case <-done:
                return
            case ch<-i:

            }
        }
        close(ch)
    }()
    return ch, cancel
}

func main() {
    ch, cancel := countTo(10)
    for i := range ch {
        if i > 5 {
            break
        }
        fmt.Println(i)
    }
    cancel()
}

countTo函數創建了兩個通道,一個返回數據,另一個發出完成的信息。這裏沒有直接返回完成通道,而是創建一個關閉完成通道的閉包並返回該閉包。通過閉包來撤銷讓我們可以在需要時執行一些額外的清理工作。

何時使用緩衝和無緩衝通道

掌握 Go 併發最複雜的一項技術是決定何時使用緩衝通道。默認,通道是無緩衝的,這很容易理解:一個協程寫入並等待另一個協程接收,就像是接力賽中的接力棒一樣。緩衝通道就更復雜了。需要選擇大小,因爲緩衝通道中的緩衝是有限度的。恰當的使用緩衝通道意味着我們必須處理緩衝滿了寫入協程等待讀取的阻塞情況。那怎樣算是恰當地使用緩衝通道呢?

緩衝通道的場景很微妙。可以一句話總結如下:

緩衝通道用於的場景是知道要啓動多少個協程、希望限定啓動的協程的數量或是限定排隊處理任務的數量。

緩衝通道可很好處理的任務有從一組所啓動的協程中收集數據或是希望限制併發的使用。它們有助於管理系統中排隊的任務數量、防止服務來不及處理而崩潰。下面有一些示例可展示其使用場景。

第一個例子中,我們處理通道上的前 10 條結果。這時我們啓動 10 個協程,每個協程將結果寫入到緩衝通道上:

func processChannel(ch chan int) []int {
    const conc = 10
    results := make(chan int, conc)
    for i := 0; i < conc; i++ {
        go func() {
            v := <- ch
            results <- process(v)
        }()
    }
    var out []int
    for i := 0; i < conc; i++ {
        out = append(out, <-results)
    }
    return out
}

我們確切地知道所啓動的協程數量,並且希望每個協程在完成任務後退出。這表示我們可以爲每個啓動協程創建一個帶一個空間的緩衝通道,並讓每個協程無阻塞地寫入到這個協程。可以遍歷這個緩衝通道,讀取其中寫入的值。讀取完所有值後,返回結果,我們知道不會產生協程泄漏。

背壓(backpressure)

另一項可通過緩衝通道實現的技術是背壓機制。這有些反直覺,但在組件限定了希望執行的工作量後系統的性能會整體變好。我們可以使用緩衝通道和select語句來限定系統中同步請求的數量:

type PressureGauge struct {
    ch chan struct{}
}

func New(limit int) *PressureGauge {
    ch := make(chan struct{}, limit)
    for i := 0; i < limit; i++ {
        ch <- struct{}{}
    }
    return &PressureGauge{
        ch: ch,
    }
}

func (pg *PressureGauge) Process(f func()) error {
    select {
    case <-pg.ch:
        f()
        pg.ch <- struct{}{}
        return nil
    default:
        return errors.New("no more capacity")
    }
}

在這段代碼中,我們創建了一個帶緩衝通道結構體,具有一些 “令牌” 和一個函數。每次協程希望使用函數時,它會調用Processselect嘗試從通道讀取令牌。如果可以讀取則運行函數,並將令牌返回給緩衝通道。如果無法讀取到令牌,則運行default分支,就會返回錯誤。下面有一個快速示例對內置的 HTTP 服務器使用這段代碼(我們會在標準庫一章學習到如何使用 HTTP 服務器):

func doThingThatShouldBeLimited() string {
    time.Sleep(2 * time.Second)
    return "done"
}

func main() {
    pg := New(10)
    http.HandleFunc("/request", func(w http.ResponseWriter, r *http.Request) {
        err := pg.Process(func() {
            w.Write([]byte(doThingThatShouldBeLimited()))
        })
        if err != nil {
            w.WriteHeader(http.StatusTooManyRequests)
            w.Write([]byte("Too many requests"))
        }
    })
    http.ListenAndServe(":8080", nil)
}

關閉 select 中的分支

在需要從多個併發源中合併數據時,select關鍵字可完美勝任。但需要適當地處理關閉的通道。如果select中的一個分支在讀取關閉的通道,總是會成功,返回的是零值。每次選取一個分支時,需要檢測值是有效的並跳過分支。如果讀取出現問題,程序會浪費大量時間讀取垃圾值。

這時,我們依賴這樣的錯誤:讀取一個nil通道。前面學到過,讀取或寫入nil通道會導致代碼永遠掛起。雖然在由 bug 引發時會很糟糕,但我們可以使用nil通道來讓select中的case無效。在監測到通道關閉時,將通道變量設置爲nil。關聯的分支就無法運行,因爲從nil通道讀取不會返回任何值:

// in和in2都是通道, done是完結channel.
for {
    select {
    case v, ok := <-in:
        if !ok {
            in = nil // 這一分支永遠不再會成功!
            continue
        }
        // 處理從in中讀取的v
    case v, ok := <-in2:
        if !ok {
            in2 = nil // 這一分支永遠不再會成功!
            continue
        }
        // 處理從in2中讀取的v
    case <-done:
        return
    }
}

如何讓代碼超時

大部分交互程序需要在一定時間內返回響應。Go 併發可以做的一個任務是管理請求(或請求的一部分)要運行多長時間。其它語言在 promise 和 future 之上引入了額外的特性來添加這一功能,但 Go 的超時語句展示瞭如何通過已有功能構建複雜的特性。我們來一窺究竟:

func timeLimit() (int, error) {
    var result int
    var err error
    done := make(chan struct{})
    go func() {
        result, err = doSomeWork()
        close(done)
    }()
    select {
    case <-done:
        return result, err
    case <-time.After(2 * time.Second):
        return 0, errors.New("work timed out")
    }
}

在需要對 Go 中的操作進行限時時,就會看到這一模式的變體。這裏的select有兩個分支。第一個分支使用了前面學過的完結通道模式。我們使用協程閉包來對resulterr賦值,並關閉done通道。如果done通道先關閉了,對done的讀取成功並返回該值。

第二個通道由time包中的After函數返回。在傳遞完指定的time.Duration之後會寫入一個值。(我們會在標準庫一章中講到time包)。在doSomeWork完成前讀取到這個值時,timeLimit會返回超時錯誤。

注:如果在協程完成處理前退出timeLimit,協程會繼續運行。我們只是不再對其(最終)返回的結果進行處理。如果希望停止不再等待的協程的任務,可使用上下文撤銷。在上下文一章中會進行討論。

使用 WaitGroup

有時一個協程需要等待多個協程先完成任務。如果等待的是單個協程,可以使用之前學習的完結通道模式。但如果等待的是多個協程,就需要使用WaitGroup,它位於標準庫的sync包中。下面是一個簡單示例,可在 The Go Playground 中運行:

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() {
        defer wg.Done()
        doThing1()
    }()
    go func() {
        defer wg.Done()
        doThing2()
    }()
    go func() {
        defer wg.Done()
        doThing3()
    }()
    wg.Wait()
}

sync.WaitGroup聲明時無需進行初始化,因爲其零值也是有用的。sync.WaitGroup有三個方法:Add用於增加所等待的協程數;Done用於減少其計數器,在協程完成時調用;Wait等待協程直到計數器變爲 0。Add通常只調用一次,傳遞的是要啓動的協程數。Done在協程內調用。要保證即使協程崩潰也會被調用,我們使用了defer

讀者會注意到我們沒有顯式傳遞sync.WaitGroup。有兩個原因。其一是必須保證所有使用sync.WaitGroup的地方都使用的是同一個實例。如傳將sync.WaitGroup傳遞給協程函數而又沒使用指針,那麼函數得到的就是一個拷貝,Done就不會減少原始sync.WaitGroup的計算器。通過使用閉包來獲取sync.WaitGroup,就能保證所有的協程都指向同一個實例。

其二是出於設計原因。還記得我們應將併發保留在 API 之外吧。在前面的通道里我們看到,通常的模式是使用包含業務邏輯的閉包啓動協程。閉包管理併發的問題而函數提供算法。

我們再來看一個更真實的示例。前面提到在多個協程寫入同一個通道時,我們需要確保所寫入的通道只會關閉一次。sync.WaitGroup就很能勝任這一要求。我們來看併發處理通道中值、將結果收集到切片再返回切片的函數是如何工作的:

func processAndGather(in <-chan int, processor func(int) int, num int) []int {
    out := make(chan int, num)
    var wg sync.WaitGroup
    wg.Add(num)
    for i := 0; i < num; i++ {
        go func() {
            defer wg.Done()
            for v := range in {
                out <- processor(v)
            }
        }()
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    var result []int
    for v := range out {
        result = append(result, v)
    }
    return result
}

在這個例子中,我們啓動了監控協程等待所有處理的協程退出。在都退出時,監控協程會對輸出通道調用close。在out關閉及緩衝爲空時for-range通道循環會退出。最後,函數返回處理所得到值。

雖然WaitGroup很方便,在調配協程時不應將其作爲首選。僅在所有工作協程退出後需要進行清理時(比如關閉寫入的通道)才使用它。

GOLANG.ORG/X 和 ERRGROUP

Go 作者維護了一些補充標準庫的工具。整體稱爲golang.org/x包,包含有一個ErrGroup類型,構建於WaitGroup之上用於創建一組在其中之一出現問題就停止處理的協程。閱讀ErrGroup文檔瞭解更多內容。

代碼精確地只運行一次

init 函數:能免則免中我們講到,init應保留用於初始化有效的不可變包級狀態。但有時我們希望進行懶加載,或是有些代碼要求在程序運行後只初始化一次。這通常是因爲初始化相對較慢,甚至是並不是每次運行時都需要。sync包有一個方便的類型Once,實現了這一功能。我們來快速看看如何使用:

type SlowComplicatedParser interface {
    Parse(string) string
}

var parser SlowComplicatedParser
var once sync.Once

func Parse(dataToParse string) string {
    once.Do(func() {
        parser = initParser()
    })
    return parser.Parse(dataToParse)
}

func initParser() SlowComplicatedParser {
    // 在這裏做各種配置和加載
}

我們聲明瞭兩個包級變量,parser的類型爲Compl⁠ica⁠tedParseronce的類型爲sync.Once。類似sync.WaitGroup,我們不需要配置sync.Once的實例(這稱爲讓零值有價值)。還是類似sync.WaitGroup,我們必須保證不生成sync.Once的拷貝,因爲每個拷貝都使用其自身的狀態來表明是否已使用。通常不應在函數內聲明sync.Once實例,因爲每次函數調用會創建新實例,並不會記錄之前的調用。

在本例,我們希望確保parser只初始化了一次,因我們在傳遞給onceDo方法內設置了parser的值。如果Parse調用了多次,once.Do不會反覆執行閉包。

組合併發工具

我們回到本章第一節中的示例。有一個函數調用三個 web 服務。我們向其中兩個服務發送數據,然後接收這兩個調用的結果發送給第三個服務,返回結果 。整個過程要小於 50 毫秒,否則返回錯誤。

先從調用的函數開始:

func GatherAndProcess(ctx context.Context, data Input) (COut, error) {
    ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
    defer cancel()
    p := processor{
        outA: make(chan AOut, 1),
        outB: make(chan BOut, 1),
        inC:  make(chan CIn, 1),
        outC: make(chan COut, 1),
        errs: make(chan error, 2),
    }
    p.launch(ctx, data)
    inputC, err := p.waitForAB(ctx)
    if err != nil {
        return COut{}, err
    }
    p.inC <- inputC
    out, err := p.waitForC(ctx)
    return out, err
}

首先我們設置了 50 毫秒超時的上下文。在沒上下文時,使用其計時器而不是調用time.After。使用上下文計時器的一個好處是它讓我們可以考慮調用該函數的函數所設定的超時。我們會在上下文一章討論上下文,並在其中的計時器一節詳細講解超時的使用。現在讀者只需要知道超時後會取消上下文。上下文的Done會返回上下文撤銷時返回值的通道,取消可以是超時或顯式調用上下文的取消方法。

在創建上下文之後,我們使用defer來確保會調用上下文的cancel函數。在上下文一章中撤銷一節中會講到,必須調用這一函數,否則會出現資源泄漏。

然後會通過一系列用於與協程通訊的通道來填充processor實例。每個通道都有緩衝,因此執行寫入的協程可以完成寫入不等待讀取就退出。(errs通道緩衝大小爲 2,因爲寫入時可能會產生兩個錯誤。)

processor結構如下:

type processor struct {
    outA chan AOut
    outB chan BOut
    outC chan COut
    inC  chan CIn
    errs chan error
}

接着,我們對processor調用launch方法來開啓三個協程:一個用於調用getResultA,一個調用getResultB,還有一個調用getResultC

func (p *processor) launch(ctx context.Context, data Input) {
    go func() {
        aOut, err := getResultA(ctx, data.A)
        if err != nil {
            p.errs <- err
            return
        }
        p.outA <- aOut
    }()
    go func() {
        bOut, err := getResultB(ctx, data.B)
        if err != nil {
            p.errs <- err
            return
        }
        p.outB <- bOut
    }()
    go func() {
        select {
        case <-ctx.Done():
            return
        case inputC := <-p.inC:
            cOut, err := getResultC(ctx, inputC)
            if err != nil {
                p.errs <- err
                return
            }
            p.outC <- cOut
        }
    }()
}

getResultAgetResultB的協程差不多。它們分別調用各自的方法。如果返回了錯誤,將錯誤寫入p.errs通道。如果返回了有效值,將值寫入通道中(getResultA的結果寫入p.outAgetResultB的結果寫入p.outB)。

因爲只有在getResultAgetResultB成功並且在 50 毫秒內完成才調用getResultC,第三個協程稍顯複雜。它包含帶兩個分支的select。第一個在上下文撤銷時觸發。第二個在調用getResultC的數據存在時觸發。如果數據存在,函數進行了調用,這個邏輯與前兩個協程的邏輯類似。

在協程啓動後,我們調用processorwaitForAB方法:

func (p *processor) waitForAB(ctx context.Context) (CIn, error) {
    var inputC CIn
    count := 0
    for count < 2 {
        select {
        case a := <-p.outA:
            inputC.A = a
            count++
        case b := <-p.outB:
            inputC.B = b
            count++
        case err := <-p.errs:
            return CIn{}, err
        case <-ctx.Done():
            return CIn{}, ctx.Err()
        }
    }
    return inputC, nil
}

這使用for-select循環來對CIn實例同時也是的getResultC參數inputC賦值。共 4 個分支。前兩個讀取前兩個協程所寫入的通道並對inputC的字段賦值。如果這兩個分支都執行了,我們會退出for-select循環並返回inputC的值,和nil錯誤。

後兩個分支處理錯誤條件。如果p.errs通道中寫入了錯誤,就返回該錯誤。如果上下文被撤銷了,我們返回表示請求被撤銷的錯誤。

回到GatherAndProcess,我們執行了一個標準的nil錯誤檢測。如果正常,將inputC的值寫入p.inC通道,然後調用processorwaitForC方法:

func (p *processor) waitForC(ctx context.Context) (COut, error) {
    select {
    case out := <-p.outC:
        return out, nil
    case err := <-p.errs:
        return COut{}, err
    case <-ctx.Done():
        return COut{}, ctx.Err()
    }
}

這個方法包含一個select。如果getResultC成功完成,我們從p.outC通道讀取輸出並返回。如果getResultC返回錯誤,我們從p.errs讀取錯誤並返回。最後,如果上下文被撤銷了,我們返回一個相應的錯誤。在waitForC完成後,GatherAndProcess將結果返回給其調用者。

如果確定getResultC的作者會做正確的事,代碼可進行簡化。因爲上下文傳遞給了getResultC,該函數可以考慮超時進行寫入,在超時後返回錯誤。這樣,我們可以在GatherAndProcess中直接調用getResultC。這就可以去掉processor中的inCoutClaunch中的一個協程以及整個waitForC方法。總的原則是在程序正確的情況下使用盡量少的併發。

通過使用協程、通道和select語句架構代碼,我們分成了不同的步驟,允許各部分以任意順序運行和完成,並且在各部分間清晰地交的數據。此外我們還保障了程序的任意部分不會掛起,並且恰當地處理了函數本身及調用歷史中其它函數的超時。如果不相信這是實現併發更好的方法,請嘗試使用其它語言進行實現。可能會驚訝於其實現難度。

何時用互斥鎖替換通道

如在其它編程語言中調配跨線程數據訪問,可能會使用互斥鎖(*mutex-*mutual exclusion 的縮寫)。互斥鎖的任務是限制一些代碼的併發執行或是訪問同一塊數據。所保護的部分稱爲關鍵段(critical section)。

Go 作者們設計通道和select來管理併發有很多很好的原因。互斥鎖的主要問題是它模糊了程序內的數據流。數據通過一系列通道從一個協程傳入另一個協程時,數據流是清晰的。對值的訪問在一段時間內會本地化某個協程中。在使用互斥鎖保護一個值時,無法表明哪個協程當前擁有值的所有權,因爲對值的訪問由所有併發進程共享。這就很難理解處理順序。Go 社區中有一個描述這一哲學的名言:“通過通信共享內存,而不是通過共享內存來通信”。

話雖如此,有時使用互斥鎖會更爲清晰,所以 Go 標準庫包含了適用這些場景的互斥鎖實現。最常見的情況是協程讀取或寫入一個共享值,但不對值進行處理。我們以多玩家遊戲的內存計分板爲例。首先看如何使用通道實現。下面是一個可使用協程啓動管理計分板的函數:

func scoreboardManager(in <-chan func(map[string]int), done <-chan struct{}) {
    scoreboard := map[string]int{}
    for {
        select {
        case <-done:
            return
        case f := <-in:
            f(scoreboard)
        }
    }
}

該函數聲明瞭一個字典,然後監聽通道中讀取或修改字典的函數,以及一個確定何時關閉的通道。我們創建類型和將值寫入字典的方法:

type ChannelScoreboardManager chan func(map[string]int)

func NewChannelScoreboardManager() (ChannelScoreboardManager, func()) {
    ch := make(ChannelScoreboardManager)
    done := make(chan struct{})
    go scoreboardManager(ch, done)
    return ch, func() {
        close(done)
    }
}

func (csm ChannelScoreboardManager) Update(name string, val int) {
    csm <- func(m map[string]int) {
        m[name] = val
    }
}

更新方法非常簡潔,只是傳遞一個將值放入字典的函數。但怎麼讀取計分板呢?我們需要返回一個值。這意味着使用完結模式等待傳入ScoreboardManager的函數完成運行:

func (csm ChannelScoreboardManager) Read(name string) (int, bool) {
    var out int
    var ok bool
    done := make(chan struct{})
    csm <- func(m map[string]int) {
        out, ok = m[name]
        close(done)
    }
    <-done
    return out, ok
}

雖然代碼運行正常,但這很笨重並且一次只能有一個讀取器。更好的方法是使用互斥鎖。標準庫中有兩個互斥鎖實現,都位於sync包中。第一個名爲Mutex,它有兩個方法LockUnlock。只要另一個協程處於關鍵段調用Lock會導致當前協程暫停。在清楚了關鍵段後,當前協程會獲取到鎖,關鍵段中的代碼會執行。調用Mutex中的Unlock方法標誌着關鍵段的終結。

第二種互斥鎖的實現名爲RWMutex,它讓我們獲取讀鎖和寫鎖。關鍵段中一次只能獲取一個 writer,但讀鎖是共享的,關鍵段中一次可獲取多個 reader。寫鎖通過LockUnlock方法來管理,而讀鎖由RLockRUnlock方法管理。

在獲取互斥鎖時,必須要確保你會釋放鎖。在調用LockRLock後使用defer語句來調用Unlock

type MutexScoreboardManager struct {
    l          sync.RWMutex
    scoreboard map[string]int
}

func NewMutexScoreboardManager() *MutexScoreboardManager {
    return &MutexScoreboardManager{
        scoreboard: map[string]int{},
    }
}

func (msm *MutexScoreboardManager) Update(name string, val int) {
    msm.l.Lock()
    defer msm.l.Unlock()
    msm.scoreboard[name] = val
}

func (msm *MutexScoreboardManager) Read(name string) (int, bool) {
    msm.l.RLock()
    defer msm.l.RUnlock()
    val, ok := msm.scoreboard[name]
    return val, ok
}

我們已經看到互斥鎖的實現了,請在使用時仔細考慮你的選擇。Katherine Cox-Buday 傑出的《Go 語言併發之道》中有一個決策樹,可幫助我們決定該使用通道還是互斥鎖:

因爲計分板是結構體中的一個字段,沒有對計分板的傳輸,使用互斥鎖在情理之中。這裏使用互斥鎖很好,因爲數據在內存中存儲。如果數據存儲在外部服務中,比如在 HTTP 服務器或數據庫中,不要使用互斥鎖來守衛對系統的訪問。

互斥鎖要求我們做更多的管理。比如,必須正確地配對加鎖和解鎖,否則程序可能會死鎖。我們示例在同一個方法中獲取並釋放了鎖。另一個問題是 Go 中互斥鎖並不是可重入的(reentrant)。如果一個協程嘗試重複獲取同一個鎖,會出現死鎖,等待它自己釋放鎖。這與 Java 這類語言不同,它們的鎖是可重入的。

不可重入鎖讓遞歸調用自己的函數獲取鎖變得麻煩。必須在遞歸函數調用前釋放鎖。總之,在持有鎖時注意函數的調用,因爲不知道在這些調用中會獲取哪些鎖。如果函數調用了另一個嘗試獲取同一把鎖的函數,協程就會死鎖。

sync.WaitGroupsync.Once一樣,不要拷貝互斥鎖。如果將它們傳入函數或以結構體中的一個字段進行訪問,必須通過指針。如果拷貝了互斥鎖,其鎖無法共享。

警告:不要嘗試用多個協程訪問同一個變量,除非先獲得到了該變量的互斥鎖。它可能會導致難以追蹤的奇怪錯誤。參見編寫測試一章中的通過競爭檢測查找併發問題來學習如何監測這些問題。

SYNC.MAP - 這是不你以爲的字典

在查看sync包時,會發現一個名爲的Map的類型。它提供了 Go 內置的map的併發安全版本。因其實現中所做的權衡,sync.Map僅適用於特定場景:

此外,因爲 Go 早期沒有泛型,sync.Map使用interface{}作爲其鍵和值的類型,編譯器無法幫助我們確定所使用的正確的數據類型。

因爲有這些限制,在極少數場景中我們需要在多個協程間共享字典,使用由sync.RWMutex保護的內置map

Atomic - 你可能用不上

除了互斥鎖,Go 提供了其它方式可保持跨線程的數據一致性。sync/atomic包提供了對內置到現代 CPU 中原子變量運算的訪問,用於增加、交換、加載、存儲或比較交換(CAS)一個能裝到單個寄存器中的值。

如果需要壓榨出最後一點性能,並且是編寫併發代碼的專家,你會樂於見到 Go 包含對原子運算的支持。對於剩下的人,請使用協程和互斥鎖管理併發需求。

在哪裏深入學習併發

這裏我們講解了一些簡單併發模式,但還有很多其它知識。事實上,可以寫一整本書來講解正確實現 Go 中各種併發模式,所幸 Katherine Cox-Buday 就寫了這樣一本書。前面在討論該決定使用通道還是互斥鎖時已經提到了這本書,《Go 語言併發之道》,它對於與 Go 和併發相關的知識都是很好的讀物。可以閱讀這本書學習更多知識。

小結

本章中,我們講解了併發並學習了爲什麼 Go 的方式比其它的傳統併發機制更簡單。在講解過程中,我們還說明了什麼時候該使用併發以及一些併發規則和模式。下一章中,我們會快速學習 Go 的標準庫,它全面擁抱現代計算機的 “內置電池” 價值觀。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/80C2mM6RffavsuQ50akBeg