golang 的 channels 行爲

【導讀】除了 GMP 外,Go 語言的 channel 還有什麼角度切入能讓我們在實際編程中更好地利用它?本文梳理了 channel 的用法。

簡介

當我第一次使用 Go 的 channels 工作的時候,我犯了一個錯誤,把 channels 考慮爲一個數據結構。我把 channels 看作爲 goroutines 之間提供自動同步訪問的隊列。這種結構上的理解導致我寫了很多不好且結構複雜的併發代碼。

隨着時間的推移,我認識到最好的方式是忘記 channels 是數據結構,轉而關注它的行爲。所以現在談論到 channels,我只考慮一件事情:signaling(信號)。一個 channel 允許一個 goroutine 給另外一個發特定事件的信號。信號是使用 channel 做一切事情的核心。將 channel 看作是一種信號機制,可以讓你寫出明確定義和精確行爲的更好代碼。

爲了理解信號怎樣工作,我們必須理解以下三個特性:

這三個特性共同構成了圍繞信號的設計哲學,在討論這些特性之後,我將提供一系列代碼示例,這些示例將演示使用這些屬性的信號。

交付保證

交付保證基於一個問題:“我是否需要保證由特定的 goroutine 發送的信號已經被接收?”

換句話說,我們可以給出清單 1 的示例:

清單 1

01 go func() {
02     p := <-ch // Receive
03 }()
04
05 ch <- "paper" // Send

發送的 goroutine 是否需要保證在第五行中發送給 channel 的 paper,在繼續執行前, 會被第二行的 goroutine 接收。

基於這個問題的答案,你將知道使用兩種類型的 channels 中的哪種:無緩衝有緩衝。每個 channel 圍繞交付保證提供不同的行爲。

保證很重要,並且如果你不這樣認爲,我有很多東西兜售給你。當然,我想開個玩笑,當你的生活沒有保障的時候你不會害怕嗎?在編寫併發代碼時,對是否需要一項保證有很強的理解是至關重要的。隨着繼續,你將學會如何做決策。

狀態

一個 channel 的行爲直接被它當前的狀態所影響。一個 channel 的狀態是:nilopenclosed

下面的清單 2 展示了怎樣聲明或把一個 channel 放進這三個狀態。

清單 2

// ** nil channel

// A channel is in a nil state when it is declared to its zero value
var ch chan string

// A channel can be placed in a nil state by explicitly setting it to nil.
ch = nil


// ** open channel

// A channel is in a open state when it’s made using the built-in function make.
ch := make(chan string)    


// ** closed channel

// A channel is in a closed state when it’s closed using the built-in function close.
close(ch)

狀態決定了怎樣 send(發送)和 receive(接收)操作行爲。

信號通過一個 channel 發送和接收。不要說讀和寫,因爲 channels 不執行 I/O。

當一個 channel 是 nil 狀態,任何試圖在 channel 的發送或接收都將會被阻塞。當一個 channel 是在 open 狀態,信號可以被髮送和接收。當一個 channel 被置爲 closed 狀態,信號將不在被髮送,但是依然可以接收信號。

這些狀態將在你遭遇不同的情況的時候可以提供不同的行爲。當結合狀態交付保證,作爲你設計選擇的結果,你可以分析你承擔的成本 / 收益。你也可以僅僅通過讀代碼快速發現錯誤,因爲你懂得 channel 將表現出什麼行爲。

有數據和無數據

最後的信號特性需要考慮你是否需要信號有數據或者無數據。

在一個 channel 中有數據的信號被執行一個發送。

清單 3

01 ch <- "paper"

當你的信號有數據,它通常是因爲:

無數據信號通過關閉一個 channel。

清單 4

01 close(ch)

當信號沒有數據的時候,它通常是因爲:

這些規則也有例外,但這些都是主要的用例,並且我們將在本文中重點討論這些問題。我認爲這些規則例外的情況是最初的代碼味道。

無數據信號的一個好處是一個單獨的 goroutine 可以立刻給很多 goroutines 信號。有數據的信號通常是在 goroutines 之間一對一的交換數據。

有數據信號

當你使用有數據信號的時候,依賴於你需要保證的類型,有三個 channel 配置選項可以選擇。

這三個 channel 選項是:Unbuffered, Buffered >1Buffered =1

緩衝大小絕對不能是一個隨機數字,它必須是爲一些定義好的約束而計算出來的。在計算中沒有無窮大,無論是空間還是時間,所有的東西都必須要有良好的定義約束。

無數據信號

無數據信號主要用於取消,它允許一個 goroutine 發送信號給另外一個來取消它們正在做的事情。取消可以被有緩衝和無緩衝的 channels 實現,但是在沒有數據發送的情況下使用緩衝 channel 會更好。

內建的函數 close 被用於無數據信號。正如上面狀態章節所解釋的那樣,你依然可以在 channel 關閉的時候接收信號。實際上,在一個關閉的 channel 上的任何接收都不會被阻塞,並且接收操作將一直返回。

在大多數情況下,你想使用標準的庫 context 包來實現無數據信號。context 包使用一個無緩衝 channel 傳遞信號以及內建函數close發送無數據信號。

如果你選擇使用你自己的 channel 而不是 context包來取消,你的 channel 應該是chan struct{} 類型,這是一種零空間的慣用方式,用來表示一個信號僅僅用於信號傳遞。

場景

有了這些特性,更進一步理解它們在實踐中怎樣工作的最好方式就是運行一系列的代碼場景。當我在讀寫 channel 基礎代碼的時候,我喜歡把 goroutines 想像成人。這個形象對我非常有幫助,我將把它用作下面的輔助工具。

有數據信號 - 保證 - 無緩衝 Channels

當你需要知道一個被髮送的信號已經被接收的時候,有兩種情況需要考慮。它們是 等待任務等待結果

場景 1 - 等待任務

考慮一下作爲一名經理,需要僱傭一名新員工。在本場景中,你想你的新員工執行一個任務,但是他們需要等待直到你準備好。這是因爲在他們開始前你需要遞給他們一份報告。

清單 5

01 func waitForTask() {
02     ch := make(chan string)
03
04     go func() {
05         p := <-ch
06
07         // Employee performs work here.
08
09         // Employee is done and free to go.
10     }()
11
12     time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
13
14     ch <- "paper"
15 }

在清單 5 的第 2 行,一個帶有屬性的無緩衝 channel 被創建,string 數據將與信號一起被髮送。在第 4 行,一名員工被僱傭並在開始工作前,被告訴等待你的信號【在第 5 行】。第 5 行是一個 channel 接收,引起員工阻塞直到等到你發送的報告。一旦報告被員工接收,員工將執行工作並在完成的時候可以離開。

你作爲經理正在併發的與你的員工工作。因此在第 4 行你僱傭員工之後,你發現你自己需要做什麼來解鎖並且發信號給員工(第 12 行)。值得注意的是,不知道要花費多長的時間來準備這份報告(paper)。

最終你準備好給員工發信號,在第 14 行,你執行一個有數據信號,數據就是那份報告。由於一個無緩衝的 channel 被使用,你得到一個保證就是一旦你操作完成,員工就已經接收到了這份報告。接收發生在發送之前。

技術上你所知道的一切就是在你的 channel 發送操作完成的同時員工接收到了這份報告。在兩個 channel 操作之後,調度器可以選擇執行它想要執行的任何語句。下一行被執行的代碼是被你還是員工是不確定的。這意味着使用 print 語句會欺騙你關於事件的執行順序。

場景 2 - 等待結果

在下一個場景中,事情是相反的。這時你想你的員工一被僱傭就立即執行他們的任務。然後你需要等待他們工作的結果。你需要等待是因爲在你繼續前你需要他們發來的報告。

清單 6

01 func waitForResult() {
02     ch := make(chan string)
03
04     go func() {
05         time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
06
07         ch <- "paper"
08
09         // Employee is done and free to go.
10     }()
11
12     p := <-ch
13 }

成本 / 收益

無緩衝 channel 提供了信號被髮送就會被接收的保證,這很好,但是沒有任何東西是沒有代價的。這個成本就是保證是未知的延遲。在等待任務場景中,員工不知道你要花費多長時間發送你的報告。在等待結果場景中,你不知道員工會花費多長時間把報告發送給你。

在以上兩個場景中,未知的延遲是我們必須面對的,因爲它需要保證。沒有這種保證行爲,邏輯就不會起作用。

有數據信號 - 無保證 - 緩衝 Channels > 1

場景 1 - 扇出(Fan Out)

扇出模式允許你拋出明確定義數量的員工在同時工作的問題上。由於你每個任務都有一個員工,你很明確的知道你會接收多少個報告。你可能需要確保你的盒子有適量的空間來接收所有的報告。這就是你員工的收益,不需要等待你來提交他們的報告。但是他們確實需要輪流把報告放進你的盒子,如果他們幾乎同一時間到達盒子。

再次假設你是經理,但是這次你僱傭一個團隊的員工,你有一個單獨的任務,你想每個員工都執行它。作爲每個單獨的員工完成他們的任務,他們需要給你提供一張報告放進你桌子上的盒子裏面。

清單 7

01 func fanOut() {
02     emps := 20
03     ch := make(chan string, emps)
04
05     for e := 0; e < emps; e++ {
06         go func() {
07             time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
08             ch <- "paper"
09         }()
10     }
11
12     for emps > 0 {
13         p := <-ch
14         fmt.Println(p)
15         emps--
16     }
17 }

在清單 7 的第 3 行,一個帶有屬性的有緩衝 channel 被創建,string 數據將與信號一起被髮送。這時,由於在第 2 行聲明的 emps 變量,將創建有 20 個緩衝的 channel。

在第 5 行和第 10 行之間,20 個員工被僱傭,並且他們立即開始工作。在第 7 行你不知道每個員工將花費多長時間。這時在第 8 行,員工發送他們的報告,但這一次發送不會阻塞等待接收。因爲在盒子裏爲每位員工準備的空間,在 channel 上的發送僅僅與其他在同一時間想發送他們報告的員工競爭。

在 12 行和 16 行之間的代碼全部是你的操作。在這裏你等待 20 個員工來完成他們的工作並且發送報告。在 12 行,你在一個循環中,在 13 行你被阻塞在一個 channel 等待接收你的報告。一旦報告接收完成,報告在 14 被打印,並且本地的計數器變量被消耗來表明一個員工意見完成了他的工作。

場景 2 - Drop

Drop 模式允許你在你的員工在滿負荷的時候丟掉工作。這有利於繼續接受客戶端的工作,並且從不施加壓力或者是這項工作可接受的延遲。這裏的關鍵是知道你什麼時候是滿負荷的,因此你不承擔或過度承諾你將嘗試完成的工作量。通常集成測試或度量可以幫助你確定這個數字。

假設你是經理,你僱傭了單個員工來完成工作。你有一個單獨的任務想員工去執行。當員工完成他們任務時,你不在乎知道他們已經完成了。最重要的是你能或不能把新工作放入盒子。如果你不能執行發送,這時你知道你的盒子滿了並且員工是滿負荷的。這時候,新工作需要丟棄以便讓事情繼續進行。

清單 8

01 func selectDrop() {
02     const cap = 5
03     ch := make(chan string, cap)
04
05     go func() {
06         for p := range ch {
07             fmt.Println("employee : received :", p)
08         }
09     }()
10
11     const work = 20
12     for w := 0; w < work; w++ {
13         select {
14             case ch <- "paper":
15                 fmt.Println("manager : send ack")
16             default:
17                 fmt.Println("manager : drop")
18         }
19     }
20
21     close(ch)
22 }

在清單 8 的第 3 行,一個有屬性的有緩衝 channel 被創建,string 數據將與信號一起被髮送。由於在第 2 行聲明的cap 常量,這時創建了有 5 個緩衝的 channel。

從第 5 行到第 9 行,一個單獨的員工被僱傭來處理工作,一個 for range被用於循環處理 channel 的接收。每次一份報告被接收,在第 7 行被處理。

在第 11 行和 19 行之間,你嘗試發送 20 分報告給你的員工。這時一個 select語句在第 14 行的第一個case被用於執行發送。因爲default從句被用於第 16 行的select語句。如果發送被堵塞,是因爲緩衝中沒有多餘的空間,通過執行第 17 行發送被丟棄。

最後在第 21 行,內建函數close被調用來關閉 channel。這將發送沒有數據的信號給員工表明他們已經完成,並且一旦他們完成分派給他們的工作可以立即離開。

成本 / 收益

有緩衝的 channel 緩衝大於 1 提供無保證發送的信號被接收到。離開保證是有好處的,在兩個 goroutine 之間通信可以降低或者是沒有延遲。在扇出場景,這有一個有緩衝的空間用於存放員工將被髮送的報告。在 Drop 場景,緩衝是測量能力的,如果容量滿,工作被丟棄以便工作繼續。

在兩個選擇中,這種缺乏保證是我們必須面對的,因爲延遲降低非常重要。0 到最小延遲的要求不會給系統的整體邏輯造成問題。

有數據信號 - 延遲保證 - 緩衝 1 的 channel

場景 1 - 等待任務

清單 9

01 func waitForTasks() {
02     ch := make(chan string, 1)
03
04     go func() {
05         for p := range ch {
06             fmt.Println("employee : working :", p)
07         }
08     }()
09
10     const work = 10
11     for w := 0; w < work; w++ {
12         ch <- "paper"
13     }
14
15     close(ch)
16 }

在清單 9 的第 2 行,一個帶有屬性的一個緩衝大小的 channel 被創建,string 數據將與信號一起被髮送。在第 4 行和第 8 行之間,一個員工被僱傭來處理工作。for range被用於循環處理 channel 的接收。在第 6 行每次一份報告被接收就被處理。

在第 10 行和 13 行之間,你開始發送你的任務給員工。如果你的員工可以跑的和你發送的一樣快,你們之間的延遲會降低。但是每次發送你成功執行,你需要保證你提交的最後一份工作正在被進行。

在最後的第 15 行,內建函數close 被調用關閉 channel,這將會發送無數據信號給員工告知他們工作已經完成,可以離開了。儘管如此,你提交的最後一份工作將在 for range中斷前被接收。

無數據信號 - Context

在最後這個場景中,你將看到從 Context 包中使用 Context 值怎樣取消一個正在運行的 goroutine。這所有的工作是通過改變一個已經關閉的無緩衝 channel 來執行一個無數據信號。

最後一次你是經理,你僱傭了一個單獨的員工來完成工作,這次你不會等待員工未知的時間完成他的工作。你分配了一個截止時間,如果你的員工沒有按時完成工作,你將不會等待。

清單 10

01 func withTimeout() {
02     duration := 50 * time.Millisecond
03
04     ctx, cancel := context.WithTimeout(context.Background(), duration)
05     defer cancel()
06
07     ch := make(chan string, 1)
08
09     go func() {
10         time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
11         ch <- "paper"
12     }()
13
14     select {
15     case p := <-ch:
16         fmt.Println("work complete", p)
17
18     case <-ctx.Done():
19         fmt.Println("moving on")
20     }
21 }

在清單 10 的第 2 行,一個時間值被聲明,它代表了員工將花費多長時間完成他們的工作。這個值被用在第 4 行來創建一個 50 毫秒超時的 context.Context 值。context 包的 WithTimeout 函數返回一個 Context 值和一個取消函數。

context包創建一個 goroutine,一旦時間值到期,將關閉與Context 值關聯的無緩衝 channels。不管事情如何發生,你需要負責調用cancel 函數。這將清理被Context創建的東西。cancel被調用不止一次是可以的。

在第 5 行,一旦函數中斷,cancel函數被 deferred 執行。在第 7 行,1 個緩衝的 channels 被創建,它被用於被員工發送他們工作的結果給你。在第 09 行和 12 行,員工被僱傭兵立即投入工作,你不需要指定員工花費多長時間完成他們的工作。

在第 14 行和 20 行之間,你使用 select 語句來在兩個 channels 接收。在第 15 行的接收,你等待員工發送他們的結果。在第 18 行的接收,你等待看context 包是否正在發送信號 50 毫秒的時間到了。無論你首先收到哪個信號,都將有一個被處理。

這個算法的一個重要方面是使用一個緩衝的 channels。如果員工沒有按時完成,你將離開而不會給員工任何通知。對於員工而言,在第 11 行他將一直髮送他的報告,你在或者不在那裏接收,他都是盲目的。如果你使用一個無緩衝 channels,如果你離開,員工將一直阻塞在那嘗試你給發送報告。這會引起 goroutine 泄漏。因此一個緩衝的 channels 用來防止這個問題發生。

總結

當使用 channels(或併發) 時,在保證,channel 狀態和發送過程中信號屬性是非常重要的。它們將幫助你實現你併發程序需要的更好的行爲以及你寫的算法。它們將幫助你找出 bug 和聞出潛在的壞代碼。

在本文中,我分享了一些程序示例來展示信號屬性工作在不同的場景中。凡事都有例外,但是這些模式是非常良好的開端。

作爲總結回顧下這些要點,何時,如何有效地思考和使用 channels:

語言機制

設計哲學

轉自:

segmentfault.com/a/1190000014524388

Go 開發大全

參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/wCCY_UR4wW4B0HW6zWhRuQ