深入理解 Go 語言 — 併發控制
Go 語言以其優雅的併發模型而聞名。我們一起考慮這樣一種場景,協程 A 在執行過程中需要創建子協程 A1、A2、A3…An,協程 A 創建完子協程後就等待子協程退出,爲了處理這三種場景,Go 提供了三種解決方案,並且這三種方案各有優劣:
-
Channel:使用 channel 控制子協程,優點是實現簡單,清晰易懂。
-
WaitGroup:使用信號量機制控制子協程,優點是子協程個數可以動態調整。
-
Context:使用上下文控制子協程,優點是對子協程派生出來的孫子協程可以控制。
Go 語言其併發控制主要依靠 Channel、WaitGroup 和 Context 這三種機制,這三種解決方案的優點、缺點是相對而言的,要結合實際應用場景進行選擇。
在這篇文章中,我們將深入探討這三種併發控制機制的工作原理和使用方法。
一、Channel 的併發控制
1.1 Channel 定義和使用
Channel 是 Go 語言中的一種類型,可以用來傳遞類型化的數據。我們可以通過 make 函數來創建一個 Channel:
ch := make(chan int)
在這個例子中,我們創建了一個可以傳遞 int 類型數據的 Channel。
Channel 支持兩種操作:發送和接收。我們可以使用 <- 操作符來進行這兩種操作:
ch <- 1 // 發送操作,將 1 發送到 Channel ch 中
value := <-ch // 接收操作,從 Channel ch 中接收數據,並存儲到變量 value 中
1.2 Channel 併發控制實例
Channel 一般用於協程之間的通信,不過 Channel 也可以用於併發控制,比如主協程啓動 N 個子協程,主協程等待所有的子協程退出後再繼續後續的流程,這種場景可以使用 Channel 輕鬆實現。
下面的程序通過創建 N 個 Channel 來管理 N 個子協程,每個子協程都有一個 Channel 用於跟父協程通信,父協程創建完所有的子協程後等待所有的的子協程結束。
package main
import (
"fmt"
"time"
)
func Process(ch chan int) {
//Do some work...
time.Sleep(time.Second)
// non-buffered channel 發送端,在發送時會一直阻塞,直到接收端已經接收了數據,否則會一直阻塞下去
// 管道中寫入一個元素表示當前協程已結束,
ch <- 1
}
func main() {
// 創建一個10個元素的切片,元素類型爲channel
channels := make([]chan int, 10)
for i := 0; i < 10; i++ {
// 切片中放入一個 channel
channels[i] = make(chan int)
// 啓動協程,傳一個管道用於通信
go Process(channels[i])
}
// 遍歷切片,等待子協程結束
for i, ch := range channels {
// non-buffered channel 接收端也會一直阻塞,直接發送端已經發送了數據,否則會一直阻塞下去。
<-ch
fmt.Println("Routine ", i, " quit!")
}
}
// 打印輸出
Routine 0 quit!
Routine 1 quit!
Routine 2 quit!
Routine 3 quit!
Routine 4 quit!
Routine 5 quit!
Routine 6 quit!
Routine 7 quit!
Routine 8 quit!
Routine 9 quit!
1.3 小節
我們總結一下使用 channel 控制子協程的優點是實現簡單,缺點是當需要大量創建子協程時就需要有相同數量的 channel,而且這樣實現的話對於子協程繼續派生出來的孫子協程是不方便控制的。
接下來我們繼續看起來比 channel 優雅一下的 WaitGroup、Context ,它們在開源組件中使用的頻率更高一些。
二、WaitGroup 的併發控制
2.1 WaitGroup 的定義和使用
WaitGroup 是 Go 開發中經常使用的併發控制技術。WaitGroup 可理解爲 Wait-Goroutine-Group,即等待一組 goroutine 執行結束。
WaitGroup 是 Go 語言中的一個類型,它提供了一種等待一組 goroutines 完成的簡單方式。我們可以通過 Add(n)
方法來增加 WaitGroup 的計數,通過 Done()
方法來減少 WaitGroup 的計數,通過 Wait()
方法來阻塞當前 goroutine,直到 WaitGroup 的計數變爲零。
var wg sync.WaitGroup
wg.Add(1)
go func() {
// 業務在子協程執行
wg.Done()
}()
wg.Wait()
在這個例子中,我們增加了 WaitGroup 的計數,然後啓動了一個新的 goroutine 來執行任務。當任務完成後,我們調用 Done 方法來減少 WaitGroup 的計數。主 goroutine 通過 Wait 方法等待任務完成。
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
// 創建 10 個子協程並且 wg.Add(1)
for i := 0; i < 10; i++ {
wg.Add(1)
go exec(i)
}
wg.Wait()
fmt.Println("main exit!")
}
func exec(i int) {
// 使用 defer 保證子協程發生異常的時候也會正常執行 wg.Done()
defer func() {
wg.Done()
}()
// 類比執行業務
time.Sleep(time.Second * 2)
fmt.Printf("%d exec! \n", i)
}
// 打印輸出,順序每次隨機
5 exec!
6 exec!
7 exec!
9 exec!
8 exec!
0 exec!
2 exec!
1 exec!
4 exec!
3 exec!
main exit!
2.2 WaitGroup 的原理
基礎知識
信號量是 UNIX 提供的一種保護共享資源的機制,作用是防止多個線程同時訪問某個資源。簡單理解:
-
如果信號量 > 0 ,表示資源可用,獲取信號量時系統會自動將信號量減 1。
-
如果信號量 = 0,表示資源暫不可用,獲取信號量是,當前的線程會進入睡眠狀態,當信號量變成 > 0 時將會被喚醒。
信號量 s 只能由兩種特殊的操作來處理,這兩種操作稱爲 P 和 V。
P(s):如果 s 是非零的,則 P 將 s 減 1,並且立即返回。如果 s 爲零,那麼就掛起這個線程,直到 s 變爲非零,等到另一個執行 V(s) 操作的線程喚醒該線程。在喚醒之後,P 操作將 s 減 1,並將控制返回給調用者。
V(s):V 操作將 s 加 1。如果有任何線程阻塞在 P 操作等待 s 變爲非零,那麼 V 操作會喚醒這些線程中的一個,然後該線程將 s 減 1,完成它的 P 操作。
WaitGroup 的實現依賴於 Go 語言的原子操作和內存模型,WaitGroup 實現中也使用了信號量。WaitGroup 內部有一個計數器,當我們調用 Add 方法時,計數器會增加;當我們調用 Done 方法時,計數器會減少;當我們調用 Wait 方法時,如果計數器不爲零,當前 goroutine 會被阻塞。
在 Go 的底層信號量函數中:
runtime_Semacquire(s *uint32)
函數會阻塞 goroutine 直到信號量 s 的值大於 0,然後原子性地減這個值,即 P 操作。
runtime_Semrelease(s *uint32, lifo bool, skipframes int)
函數原子性增加信號量的值,然後通知被 runtime_Semacquire
阻塞的 goroutine,即 V 操作。
數據結構
src/sync/waitgroup.go:WaitGroup
中定義:
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers only guarantee that 64-bit fields are 32-bit aligned.
// For this reason on 32 bit architectures we need to check in state()
// if state1 is aligned or not, and dynamically "swap" the field order if
// needed.
state1 uint64
state2 uint32
}
因爲 WaitGroup 需要等待一組操作完成之後再執行,所以需要等待所有操作完成之後才能繼續執行。爲了實現這個功能,WaitGroup 使用一個計數器 counter 來記錄還有多少個操作沒有完成,如果 counter 的值爲 0,則表示所有操作已經完成。
WaitGroup 在所有任務都完成之後,需要喚醒所有處於等待的協程,此時記錄有多少個協程處於等待狀態。爲實現這個功能,WaitGroup 使用了一個等待計數器 waiter 來記錄當前有多少個協程正在等待操作完成。
WaitGroup 對於計數器和等待計數器的實現,是通過一個 64 位無符號整數來實現的,也就是 WaitGroup 結構體中的 state1,其中高 32 位保存了任務計數器 counter 的值,低 32 位保存了等待計數器 waiter 的值。當我們創建一個 WaitGroup 實例時,該實例的任務計數器和等待計數器都會被初始化爲 0。
而且,等待協程需要等待所有任務完成之後才能繼續執行,所以等待協程在任務未完成時會被阻塞。當任務全部完成後,會自動被喚醒。WaitGroup 使用 state2 用於實現信號量機制。通過調用 runtime_Semacquire()
和 runtime_Semrelease()
函數,可以在不阻塞線程的情況下進行等待和通知操作。
接下來繼續介紹 Add()
,Done()
和 Wait()
方法的具體實現。
Add(delta int)
Add()
做了兩件事:
-
把 delta 值累加到 counter 當中,因爲 delta 可以是負值,也就是說 counter 可以變成 0 或 負值。
-
當 counter = 0 時,根據 waiter 數值釋放等量的信號量,把等待的 goroutine 全部喚醒;如果 counter < 0,則觸發 panic 。
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
// delta 的值可以爲負數,Done方法便是通過Add(-1)來實現的
// statep: 爲state1的地址 semap: 爲state2的地址
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
// 高 32 位的值 加上 delta,增加任務計數器的值
state := atomic.AddUint64(statep, uint64(delta)<<32)
// v: 取高32位數據,獲取到待完成任務數
v := int32(state >> 32)
// 取低32位數據,獲取到等待線程的值
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// v > 0: 說明還有待完成的任務數,此時不應該喚醒等待協程
// w = 0: 說明沒有協程在等待,此時可以直接退出
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
// 此時 v = 0,所有任務都完成了,喚醒等待協程
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Wait()
Wait()
方法也做兩件事:一是累加 waiter,二是阻塞等待信號量。
注意:這裏還用到了 CAS 算法,保證了有多個 goroutine 同時執行 Wait()
方法也能正確累加 waiter。
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
// statep: 爲 state1 的地址 semap: 爲 state2 的地址
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
// 加載state1的值
state := atomic.LoadUint64(statep)
// v: 取高32位數據,獲取到待完成任務數
v := int32(state >> 32)
w := uint32(state)
// 沒有任務待執行,全部都完成了
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
// 增加 waiter 計數器的值
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
// 等待被喚醒
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
Done()
Done()
方法只完成一件事情:即把 counter 減 1。前面分析 Add()
方法可以接受負值,所以這裏 Done()
方法只調用了 Add(-1)
。實際上這裏正是最後一個完成的 goroutine 把等待的 waiter 喚醒的(感嘆設計非常巧妙)。
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
2.3 小結
三、Context 的併發控制
3.1 Context 的定義
context 是 Go 1.7 版本引入的一個新特性,它主要用於限制和傳遞可跨 API 邊界的請求級變量,取消信號和超時時間,幷包含一個併發安全的 map 用於攜帶數據。
context 翻譯成 “上下文”,context 與 WaitGroup 最大的不同點是:context 對於派生的 goroutine 有更強的控制能力,它可以控制多級的 goroutine。
context 的 API 非常簡單, 標準庫實現上乾淨、獨立,接下來我們會從具體的使用場景和源碼分析兩個角度進行分析。
3.2 使用場景
場景一: 請求鏈路傳值
package main
import (
"context"
"fmt"
)
func exec1(ctx context.Context) {
ctx = context.WithValue(ctx, "k1", "v1")
exec2(ctx)
}
func exec2(ctx context.Context) {
fmt.Println(ctx.Value("k1").(string))
}
func main() {
ctx := context.Background()
exec1(ctx)
}
我們在 exec1
通過 WithValue(parent Context, key, val interface{}) Context
,賦值 k1 爲 v1,在其下層函數 exec2
通過ctx.Value(key interface{}) interface{}
獲取 k1 的值,非常簡單。
擴展:如果我們是在 exec2 裏賦值,在 exec1 裏面能夠拿到這個值嗎?
答案是不能,context 只能自上而下攜帶值,這個是需要特別注意的一點。
場景二: 取消操作
主動取消
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
func exec(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
respC := make(chan int)
// 處理邏輯
go func() {
time.Sleep(time.Second * 5)
respC <- 10
}()
// 取消機制
select {
case <-ctx.Done():
fmt.Println("cancel")
return errors.New("cancel")
case r := <-respC:
fmt.Println(r)
return nil
}
}
func main() {
wg := new(sync.WaitGroup)
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go exec(ctx, wg)
time.Sleep(time.Second * 2)
// 觸發取消
cancel()
// 等待goroutine退出
wg.Wait()
}
超時取消
package main
import (
"context"
"fmt"
"time"
)
func exec(ctx context.Context) {
execCtx, execCancel := context.WithTimeout(ctx, time.Second*4)
defer execCancel()
resp := make(chan struct{}, 1)
// 處理邏輯
go func() {
// 處理耗時
time.Sleep(time.Second * 10)
resp <- struct{}{}
}()
// 超時機制
select {
//case <-ctx.Done():
// fmt.Println("ctx timeout")
// fmt.Println(ctx.Err())
case <-execCtx.Done():
fmt.Println("execCtx timeout")
fmt.Println(execCtx.Err())
case v := <-resp:
fmt.Println("exec function handle done")
fmt.Printf("result: %v\n", v)
}
fmt.Println("exec finish")
return
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
exec(ctx)
}
注意:對於有多個超時時間的處理,可以把上述超時取消例子中的註釋打開,會觀察到,當處理兩個 ctx 時,時間短的會優先觸發,這種情況下,如果只判定一個 context 的 Done() 是可以的,但是一定要保證調用到兩個cancel
函數。
execCtx timeout:
ctx timeout:
3.3 Context 的實現原理
context 實際上只定義了接口,凡是實現了該接口的類都可以稱爲一種 context,官方中實現了幾種常用的 context 分別用於不同的場景。
Context 接口定義
// A Context carries a deadline, a cancellation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
// The close of the Done channel may happen asynchronously,
// after the cancel function returns.
Done() <-chan struct{}
// If Done is not yet closed, Err returns nil.
// If Done is closed, Err returns a non-nil error explaining why:
// Canceled if the context was canceled
// or DeadlineExceeded if the context's deadline passed.
// After Err returns a non-nil error, successive calls to Err return the same error.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
Value(key any) any
}
context 接口只定義了 4 個方法:
- Deadline() (deadline time.Time, ok bool)
該方法返回一個 deadline 和 標識是否已設置 deadline 的 bool 值,如果沒有設置 deadline,則 ok == false,此時 deadline 爲一個初始值的 time.Time 值。
- Done() <-chan struct{}
Done() 方法返回一個用於探測 context 是否取消的 channel。當 context 取消時會自動將該 channel 關閉。
需要注意:對於不支持取消的 context ,該方法可能會返回 nil,例如 context.Background()
- Err() error
Err() 方法描述 context 關閉的原因,關閉原因由 context 實現控制,不需要用戶設置,比如 Deadline context,關閉原因可能是因爲 deadline ,也可能是被提前主動關閉了。
因 deadline 關閉:context deadline exceeded。
因 主動 關閉:context canceled。
- Value(key any) any
有一種 context 它不是用於控制呈樹狀分佈的 goroutine,而是用於在樹狀分佈的 goroutine 之間傳遞信息。
Value() 方法就是用於此種類型的 context,該方法根據 key 值查詢 map 中的 value。
3.4 到底有幾種 context?
既然 context 都需要實現 Context 接口,那麼包括不直接可見(非導出)的結構體,一共有幾種 context 呢?
答案是 4 種。
emptyCtx,context 之源頭
定義如下:
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key any) any {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
如何創建根 Context
context 包中定義了一個公用的 emptyCtx 全局變量,名爲 background,可以使用 context.Background()
獲取它,還有一個 todo,可以使用 context.TODO()
,實現代碼如下:
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}
Background 和 TODO 是一模一樣的,官方說:background 它通常由主函數、初始化和測試使用,並作爲傳入請求的頂級上下文;TODO 是當不清楚要使用哪個 Context 或尚不可用時,代碼應使用 context.TODO,後續在在進行替換掉,歸根結底就是語義不同而已。
context 包還提供四個方法創建不同的類型的 context,使用四個方法如果沒有父 context ,則都需要傳入 background ,即 background 作爲其父節點:
WithCancel()
WithDeadline()
WithTimeOut()
WithValue()
context 包中除了 emptyCtx 以外,還有 cancelCtx,timerCtx 和 ValueCtx,共計四種。它們之間的關係如下:
WithValue 的實現
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
valueCtx 類
valueCtx 目的就是爲 Context 攜帶鍵值對,因爲它採用匿名接口的繼承實現方式,它會繼承父 Context,也就相當於嵌入 Context 當中。
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val any
}
func (c *valueCtx) String() string {
return contextName(c.Context) + ".WithValue(type " +
reflectlite.TypeOf(c.key).String() +
", val " + stringify(c.val) + ")"
}
func (c *valueCtx) Value(key any) any {
if c.key == key {
return c.val
}
return value(c.Context, key)
}
func value(c Context, key any) any {
for {
switch ctx := c.(type) {
case *valueCtx:
if key == ctx.key {
return ctx.val
}
c = ctx.Context
case *cancelCtx:
if key == &cancelCtxKey {
return c
}
c = ctx.Context
case *timerCtx:
if key == &cancelCtxKey {
return &ctx.cancelCtx
}
c = ctx.Context
case *emptyCtx:
return nil
default:
return c.Value(key)
}
}
}
用圖來理解:我們在調用 Context 中的 Value 方法時會層層向上調用直到最終的根節點,中間要是找到了 key 就會返回,否會就會找到最終的 emptyCtx 返回 nil。
WithCancel 的實現
這個函數執行步驟如下:
-
創建一個 cancelCtx 對象,作爲子 context
-
然後調用 propagateCancel 構建父子 context 之間的關係,這樣當父 context 被取消時,子 context 也會被取消。
-
返回子 context 對象和子樹取消函數。
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
我們再分析一下 cancelCtx 這個類。
cancelCtx 類
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
-
mu:是一個互斥鎖,用來保證併發安全的,所以 context 是併發安全的。
-
done:用來做 context 的取消通知信號,之前的版本使用的是 chan struct{} 類型,現在用 atomic.Value 做鎖優化。
-
children:key 是接口類型 canceler,目的就是存儲實現當前 canceler 接口的子節點,當根節點發生取消時,遍歷子節點發送取消信號。
-
error:當 context 取消時存儲取消信息。
propagateCancel 方法(傳播 Cancel)
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
// 當前父 context 從來不會被取消,是一個空節點,直接返回即可。
return // parent is never canceled
}
// 提前判斷一個父context是否被取消,如果取消了也不需要構建關聯了,把當前子節點取消掉並返回
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
// 這裏是找到可以"取消"的context
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
// 將當前節點掛到父節點的 childrn map 中,外面調用 cancel 時可以層層取消
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 沒有找到可"取消"的父節點掛載,那麼需要開一個 goroutine
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
cancel 方法
最後我們再來分析一下返回的 cancel 方法是如何實現,該方法會關閉上下文中的 Channel 並向所有的子上下文同步取消信號:
並且通過源碼我們可以知道 cancel 方法是可以被重複調用,是冪等的。
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 取消時傳入的 error 信息不能爲 nil, context 定義了默認 error:var Canceled = errors.New("context canceled"
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // 已經有錯誤信息,說明當前節點已經被取消過了,直接返回
}
c.err = err
// 用來關閉 channel,通知其他協程
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
// 當前節點向下取消,遍歷它的所有子節點,然後執行取消
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
// 子節點置空
c.children = nil
c.mu.Unlock()
// 把當前節點從父節點中移除,只有在外部父節點調用時纔會傳true,其他都是傳 false,內部調用都會因爲c.children = nil被剔除出去
if removeFromParent {
removeChild(c.Context, c)
}
}
withDeadline、WithTimeout 的實現
先看 WithTimeout 方法,它內部就是調用的 WithDeadline 方法:
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
所以我們進一步來看 withDeadline 的實現方式:
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 不能爲空 context 創建衍生 context
if parent == nil {
panic("cannot create context from nil parent")
}
// 判斷當父 context 的結束時間早於要設置的時間,則不需要再去單獨處理子節點的定時器了
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
// 創建一個timerCtx對象
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 將當前節點掛到父節點上
propagateCancel(parent, c)
// 獲取過期時間
dur := time.Until(d)
// 當前時間已經過期了則直接取消
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
// 如果沒被取消,則直接添加一個定時器,定時去取消
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
從上面可以發現 withDeadline 相較於 withCancel 方法也就多了一個定時器去定時調用 cancel 方法,這個 cancel 方法在 timerCtx 類中進行了重寫,我們先來看一下 timerCtx 類,它是基於 cancelCtx 的,多了兩個字段:
timerCtx 類
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
並且 timerCtx 也實現了 cancel 方法,我們發現內部也是調用了 cancelCtx 的 cancel 方法執行取消:
func (c *timerCtx) cancel(removeFromParent bool, err error) {
// 調用 cancelCtx 的 cancel 方法取消掉子節點context
c.cancelCtx.cancel(false, err)
// 把 父 context 移除放到了這做
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
// 停掉定時器,釋放資源
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
至此,源碼部分我們終於看完了,回顧一下,是不是有很多感想?
結語
Channel、WaitGroup 和 Context 是 Go 語言併發控制的三種主要機制,它們各有各的應用場景和優缺點。Channel 提供了一種在 goroutines 之間傳遞數據和同步的手段,WaitGroup 提供了一種等待一組 goroutines 完成的方式,而 Context 則提供了一種跨 API 邊界操作和處理超時或取消請求的方式。理解並熟練掌握這三種機制,是高效使用 Go 語言進行併發編程的關鍵。
同時,我們也要注意,雖然這三種機制能夠幫助我們控制併發,但是並不能避免所有的併發問題,例如數據競態等問題。因此,我們在編寫併發程序時,還需要採用其它的同步機制,例如鎖,以及遵循一些併發編程的最佳實踐,例如避免在 goroutines 之間共享狀態,使用不可變的數據結構等。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/tP2balhaOUO-Vg9_96N4Iw