深入瞭解 Go 語言與併發編程
作者:fitchguo,騰訊 IEG 後臺開發工程師
併發編程,可以說一直都是開發者們關注最多的主題之一。而 Golang 作爲一個出道就自帶 “高併發” 光環的編程語言,其併發編程的實現原理肯定是值得我們深入探究的。
Go 併發編程模型在底層是由操作系統所提供的線程庫支撐的,這裏先簡要介紹一下線程實現模型的相關概念。
線程的實現模型
線程的實現模型主要有 3 個,分別是:用戶級線程模型、內核級線程模型和兩級線程模型。它們之間最大的差異在於用戶線程與內核調度實體(KSE)之間的對應關係上。內核調度實體就是可以被操作系統內核調度器調度的對象,也稱爲內核級線程,是操作系統內核的最小調度單元。
用戶級線程模型
用戶線程與 KSE 爲多對一(N:1)的映射關係。此模型下的線程由用戶級別的線程庫全權管理,線程庫存儲在進程的用戶空間之中,這些線程的存在對於內核來說是無法感知的,所以這些線程也不是內核調度器調度的對象。一個進程中所有創建的線程都只和同一個 KSE 在運行時動態綁定,內核的所有調度都是基於用戶進程的。
對於線程的調度則是在用戶層面完成的,相較於內核調度不需要讓 CPU 在用戶態和內核態之間切換,這種實現方式相比內核級線程模型可以做的很輕量級,對系統資源的消耗會小很多,上下文切換所花費的代價也會小得多。許多語言實現的協程庫基本上都屬於這種方式。但是,此模型下的多線程並不能真正的併發運行。例如,如果某個線程在 I/O 操作過程中被阻塞,那麼其所屬進程內的所有線程都被阻塞,整個進程將被掛起。
內核級線程模型
用戶線程與 KSE 爲一對一(1:1) 的映射關係。此模型下的線程由內核負責管理,應用程序對線程的創建、終止和同步都必須通過內核提供的系統調用來完成,內核可以分別對每一個線程進行調度。所以,一對一線程模型可以真正的實現線程的併發運行,大部分語言實現的線程庫基本上都屬於這種方式。但是,此模型下線程的創建、切換和同步都需要花費更多的內核資源和時間,如果一個進程包含了大量的線程,那麼它會給內核的調度器造成非常大的負擔,甚至會影響到操作系統的整體性能。
兩級線程模型
用戶線程與 KSE 爲多對多(N:M)的映射關係。兩級線程模型吸收前兩種線程模型的優點並且儘量規避了它們的缺點,區別於用戶級線程模型,兩級線程模型中的進程可以與多個內核線程 KSE 關聯,也就是說一個進程內的多個線程可以分別綁定一個自己的 KSE,這點和內核級線程模型相似;其次,又區別於內核級線程模型,它的進程裏的線程並不與 KSE 唯一綁定,而是可以多個用戶線程映射到同一個 KSE,當某個 KSE 因爲其綁定的線程的阻塞操作被內核調度出 CPU 時,其關聯的進程中其餘用戶線程可以重新與其他 KSE 綁定運行。所以,兩級線程模型既不是用戶級線程模型那種完全靠自己調度的也不是內核級線程模型完全靠操作系統調度的,而是一種自身調度與系統調度協同工作的中間態,即用戶調度器實現用戶線程到 KSE 的調度,內核調度器實現 KSE 到 CPU 上的調度。
Go 的併發機制
在 Go 的併發編程模型中,不受操作系統內核管理的獨立控制流不叫用戶線程或線程,而稱爲 Goroutine。Goroutine 通常被認爲是協程的 Go 實現,實際上 Goroutine 並不是傳統意義上的協程,傳統的協程庫屬於用戶級線程模型,而 Goroutine 結合 Go 調度器的底層實現上屬於兩級線程模型。
Go 搭建了一個特有的兩級線程模型。由 Go 調度器實現 Goroutine 到 KSE 的調度,由內核調度器實現 KSE 到 CPU 上的調度。Go 的調度器使用 G、M、P 三個結構體來實現 Goroutine 的調度,也稱之爲 GMP 模型。
GMP 模型
G:表示 Goroutine。每個 Goroutine 對應一個 G 結構體,G 存儲 Goroutine 的運行堆棧、狀態以及任務函數,可重用。當 Goroutine 被調離 CPU 時,調度器代碼負責把 CPU 寄存器的值保存在 G 對象的成員變量之中,當 Goroutine 被調度起來運行時,調度器代碼又負責把 G 對象的成員變量所保存的寄存器的值恢復到 CPU 的寄存器
M:OS 底層線程的抽象,它本身就與一個內核線程進行綁定,每個工作線程都有唯一的一個 M 結構體的實例對象與之對應,它代表着真正執行計算的資源,由操作系統的調度器調度和管理。M 結構體對象除了記錄着工作線程的諸如棧的起止位置、當前正在執行的 Goroutine 以及是否空閒等等狀態信息之外,還通過指針維持着與 P 結構體的實例對象之間的綁定關係
P:表示邏輯處理器。對 G 來說,P 相當於 CPU 核,G 只有綁定到 P(在 P 的 local runq 中)才能被調度。對 M 來說,P 提供了相關的執行環境 (Context),如內存分配狀態(mcache),任務隊列(G) 等。它維護一個局部 Goroutine 可運行 G 隊列,工作線程優先使用自己的局部運行隊列,只有必要時纔會去訪問全局運行隊列,這可以大大減少鎖衝突,提高工作線程的併發性,並且可以良好的運用程序的局部性原理
一個 G 的執行需要 P 和 M 的支持。一個 M 在與一個 P 關聯之後,就形成了一個有效的 G 運行環境(內核線程 + 上下文)。每個 P 都包含一個可運行的 G 的隊列(runq)。該隊列中的 G 會被依次傳遞給與本地 P 關聯的 M,並獲得運行時機。
M 與 KSE 之間總是一一對應的關係,一個 M 僅能代表一個內核線程。M 與 KSE 之間的關聯非常穩固,一個 M 在其生命週期內,會且僅會與一個 KSE 產生關聯,而 M 與 P、P 與 G 之間的關聯都是可變的,M 與 P 也是一對一的關係,P 與 G 則是一對多的關係。
G
運行時,G 在調度器中的地位與線程在操作系統中差不多,但是它佔用了更小的內存空間,也降低了上下文切換的開銷。它是 Go 語言在用戶態提供的線程,作爲一種粒度更細的資源調度單元,使用得當,能夠在高併發的場景下更高效地利用機器的 CPU。
g 結構體部分源碼(src/runtime/runtime2.go):
type g struct {
stack stack // Goroutine的棧內存範圍[stack.lo, stack.hi)
stackguard0 uintptr // 用於調度器搶佔式調度
m *m // Goroutine佔用的線程
sched gobuf // Goroutine的調度相關數據
atomicstatus uint32 // Goroutine的狀態
...
}
type gobuf struct {
sp uintptr // 棧指針
pc uintptr // 程序計數器
g guintptr // gobuf對應的Goroutine
ret sys.Uintewg // 系統調用的返回值
...
}
gobuf 中保存的內容會在調度器保存或恢復上下文時使用,其中棧指針和程序計數器會用來存儲或恢復寄存器中的值,改變程序即將執行的代碼。
atomicstatus 字段存儲了當前 Goroutine 的狀態,Goroutine 主要可能處於以下幾種狀態:
Goroutine 的狀態遷移是一個十分複雜的過程,觸發狀態遷移的方法也很多。這裏主要介紹一下比較常見的五種狀態_Grunnable、_Grunning、_Gsyscall、_Gwaiting 和_Gpreempted。
可以將這些不同的狀態聚合成三種:等待中、可運行、運行中,運行期間會在這三種狀態來回切換:
-
等待中:Goroutine 正在等待某些條件滿足,例如:系統調用結束等,包括_Gwaiting、_Gsyscall 和_Gpreempted 幾個狀態
-
可運行:Goroutine 已經準備就緒,可以在線程運行,如果當前程序中有非常多的 Goroutine,每個 Goroutine 就可能會等待更多的時間,即_Grunnable
-
運行中:Goroutine 正在某個線程上運行,即_Grunning
G 常見的狀態轉換圖:
G 狀態轉換
進入死亡狀態的 G 可以重新初始化並使用。
M
Go 語言併發模型中的 M 是操作系統線程。調度器最多可以創建 10000 個線程,但是最多隻會有 GOMAXPROCS(P 的數量)個活躍線程能夠正常運行。在默認情況下,運行時會將 GOMAXPROCS 設置成當前機器的核數,我們也可以在程序中使用 runtime.GOMAXPROCS 來改變最大的活躍線程數。
例如,對於一個四核的機器,runtime 會創建四個活躍的操作系統線程,每一個線程都對應一個運行時中的 runtime.m 結構體。在大多數情況下,我們都會使用 Go 的默認設置,也就是線程數等於 CPU 數,默認的設置不會頻繁觸發操作系統的線程調度和上下文切換,所有的調度都會發生在用戶態,由 Go 語言調度器觸發,能夠減少很多額外開銷。
m 結構體源碼(部分):
type m struct {
g0 *g // 一個特殊的goroutine,執行一些運行時任務
gsignal *g // 處理signal的G
curg *g // 當前M正在運行的G的指針
p puintptr // 正在與當前M關聯的P
nextp puintptr // 與當前M潛在關聯的P
oldp puintptr // 執行系統調用之前使用線程的P
spinning bool // 當前M是否正在尋找可運行的G
lockedg *g // 與當前M鎖定的G
}
g0 表示一個特殊的 Goroutine,由 Go 運行時系統在啓動之處創建,它會深度參與運行時的調度過程,包括 Goroutine 的創建、大內存分配和 CGO 函數的執行。curg 是在當前線程上運行的用戶 Goroutine。
P
調度器中的處理器 P 是線程和 Goroutine 的中間層,它能提供線程需要的上下文環境,也會負責調度線程上的等待隊列,通過處理器 P 的調度,每一個內核線程都能夠執行多個 Goroutine,它能在 Goroutine 進行一些 I/O 操作時及時讓出計算資源,提高線程的利用率。
P 的數量等於 GOMAXPROCS,設置 GOMAXPROCS 的值只能限制 P 的最大數量,對 M 和 G 的數量沒有任何約束。當 M 上運行的 G 進入系統調用導致 M 被阻塞時,運行時系統會把該 M 和與之關聯的 P 分離開來,這時,如果該 P 的可運行 G 隊列上還有未被運行的 G,那麼運行時系統就會找一個空閒的 M,或者新建一個 M 與該 P 關聯,滿足這些 G 的運行需要。因此,M 的數量很多時候都會比 P 多。
p 結構體源碼(部分):
type p struct {
// p 的狀態
status uint32
// 對應關聯的 M
m muintptr
// 可運行的Goroutine隊列,可無鎖訪問
runqhead uint32
runqtail uint32
runq [256]guintptr
// 緩存可立即執行的G
runnext guintptr
// 可用的G列表,G狀態等於Gdead
gFree struct {
gList
n int32
}
...
}
P 可能處於的狀態如下:
調度器
兩級線程模型中的一部分調度任務會由操作系統之外的程序承擔。在 Go 語言中,調度器就負責這一部分調度任務。調度的主要對象就是 G、M 和 P 的實例。每個 M(即每個內核線程)在運行過程中都會執行一些調度任務,他們共同實現了 Go 調度器的調度功能。
g0 和 m0
運行時系統中的每個 M 都會擁有一個特殊的 G,一般稱爲 M 的 g0。M 的 g0 不是由 Go 程序中的代碼間接生成的,而是由 Go 運行時系統在初始化 M 時創建並分配給該 M 的。M 的 g0 一般用於執行調度、垃圾回收、棧管理等方面的任務。M 還會擁有一個專用於處理信號的 G,稱爲 gsignal。
除了 g0 和 gsignal 之外,其他由 M 運行的 G 都可以視爲用戶級別的 G,簡稱用戶 G,g0 和 gsignal 可稱爲系統 G。Go 運行時系統會進行切換,以使每個 M 都可以交替運行用戶 G 和它的 g0。這就是前面所說的 “每個 M 都會運行調度程序” 的原因。
除了每個 M 都擁有屬於它自己的 g0 外,還存在一個 runtime.g0。runtime.g0 用於執行引導程序,它運行在 Go 程序擁有的第一個內核線程之中,這個線程也稱爲 runtime.m0,runtime.m0 的 g0 就是 runtime.g0。
核心元素的容器
上面講了 Go 的線程實現模型中的 3 個核心元素——G、M 和 P,下面看看承載這些元素實例的容器:
和 G 相關的四個容器值得我們特別注意,任何 G 都會存在於全局 G 列表中,其餘四個容器只會存放當前作用域內的、具有某個狀態的 G。兩個可運行的 G 列表中的 G 都擁有幾乎平等的運行機會,只不過不同時機的調度會把 G 放在不同的地方,例如,從 Gsyscall 狀態轉移出來的 G 都會被放入調度器的可運行 G 隊列,而剛剛被初始化的 G 都會被放入本地 P 的可運行 G 隊列。此外,這兩個可運行 G 隊列之間也會互相轉移 G,例如,本地 P 的可運行 G 隊列已滿時,其中一半的 G 會被轉移到調度器的可運行 G 隊列中。
調度器的空閒 M 列表和空閒 P 列表用於存放暫時不被使用的元素實例。運行時系統需要時,會從中獲取相應元素的實例並重新啓用它。
調度循環
調用 runtime.schedule 進入調度循環:
func schedule() {
_g_ := getg()
top:
var gp *g
var inheritTime bool
if gp == nil {
// 爲了公平,每調用schedule函數61次就要從全局可運行G隊列中獲取
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 從P本地獲取G任務
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
// 運行到這裏表示從本地運行隊列和全局運行隊列都沒有找到需要運行的G
if gp == nil {
// 阻塞地查找可用G
gp, inheritTime = findrunnable()
}
// 執行G任務函數
execute(gp, inheritTime)
}
runtime.schedule 函數會從下面幾個地方查找待執行的 Goroutine:
-
爲了保證公平,當全局運行隊列中有待執行的 Goroutine 時,通過 schedtick 保證有一定幾率會從全局的運行隊列中查找對應的 Goroutine
-
從處理器本地的運行隊列中查找待執行的 Goroutine
-
如果前兩種方法都沒有找到 G,會通過 findrunnable 函數去其他 P 裏面去 “偷” 一些 G 來執行,如果 “偷” 不到,就阻塞查找直到有可運行的 G
接下來由 runtime.execute 執行獲取的 Goroutine:
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// 將G綁定到當前M上
_g_.m.curg = gp
gp.m = _g_.m
// 將g正式切換爲_Grunning狀態
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
// 搶佔信號
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
// 調度器調度次數增加1
_g_.m.p.ptr().schedtick++
}
...
// gogo完成從g0到gp的切換
gogo(&gp.sched)
}
當開始執行 execute 後,G 會被切換到_Grunning 狀態,並將 M 和 G 進行綁定,最終調用 runtime.gogo 將 Goroutine 調度到當前線程上。runtime.gogo 會從 runtime.gobuf 中取出 runtime.goexit 的程序計數器和待執行函數的程序計數器,並將:
-
runtime.goexit 的程序計數器被放到棧 SP 上
-
待執行函數的程序計數器被放到了寄存器 BX 上
MOVL gobuf_sp(BX), SP // 將runtime.goexit函數的PC恢復到SP中
MOVL gobuf_pc(BX), BX // 獲取待執行函數的程序計數器
JMP BX // 開始執行
當 Goroutine 中運行的函數返回時,程序會跳轉到 runtime.goexit 所在位置,最終在當前線程的 g0 的棧上調用 runtime.goexit0 函數,該函數會將 Goroutine 轉換爲_Gdead 狀態、清理其中的字段、移除 Goroutine 和線程的關聯並調用 runtime.gfput 將 G 重新加入處理器的 Goroutine 空閒列表 gFree 中:
func goexit0(gp *g) {
_g_ := getg()
// 設置當前G狀態爲_Gdead
casgstatus(gp, _Grunning, _Gdead)
// 清理G
gp.m = nil
...
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
// 解綁M和G
dropg()
...
// 將G扔進gfree鏈表中等待複用
gfput(_g_.m.p.ptr(), gp)
// 再次進行調度
schedule()
}
最後 runtime.goexit0 會重新調用 runtime.schedule 觸發新一輪的 Goroutine 調度,調度器從 runtime.schedule 開始,最終又回到 runtime.schedule,這就是 Go 語言的調度循環。
Channel
Go 中經常被人提及的一個設計模式:不要通過共享內存的方式進行通信,而是應該通過通信的方式共享內存。Goroutine 之間會通過 channel 傳遞數據,作爲 Go 語言的核心數據結構和 Goroutine 之間的通信方式,channel 是支撐 Go 語言高性能併發編程模型的重要結構。
channel 在運行時的內部表示是 runtime.hchan,該結構體中包含了用於保護成員變量的互斥鎖,從某種程度上說,channel 是一個用於同步和通信的有鎖隊列。hchan 結構體源碼:
type hchan struct {
qcount uint // 循環列表元素個數
dataqsiz uint // 循環隊列的大小
buf unsafe.Pointer // 循環隊列的指針
elemsize uint16 // chan中元素的大小
closed uint32 // 是否已close
elemtype *_type // chan中元素類型
sendx uint // chan的發送操作處理到的位置
recvx uint // chan的接收操作處理到的位置
recvq waitq // 等待接收數據的Goroutine列表
sendq waitq // 等待發送數據的Goroutine列表
lock mutex // 互斥鎖
}
type waitq struct { // 雙向鏈表
first *sudog
last *sudog
}
waitq 中連接的是一個 sudog 雙向鏈表,保存的是等待中的 Goroutine。
創建 chan
使用 make 關鍵字來創建管道,make(chan int, 3) 會調用到 runtime.makechan 函數中:
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 計算需要分配的buf空間大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// chan的大小或者elem的大小爲0,不需要創建buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// elem不含指針,分配一塊連續的內存給hchan數據結構和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// elem包含指針,單獨分配buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 更新hchan的elemsize、elemtype、dataqsiz字段
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
上述代碼根據 channel 中收發元素的類型和緩衝區的大小初始化 runtime.hchan 和緩衝區:
-
若緩衝區所需大小爲 0,就只會爲 hchan 分配一段內存
-
若緩衝區所需大小不爲 0 且 elem 不包含指針,會爲 hchan 和 buf 分配一塊連續的內存
-
若緩衝區所需大小不爲 0 且 elem 包含指針,會單獨爲 hchan 和 buf 分配內存
發送數據到 chan
發送數據到 channel,ch <- i 會調用到 runtime.chansend 函數中,該函數包含了發送數據的全部邏輯:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 對於非阻塞的發送,直接返回
if !block {
return false
}
// 對於阻塞的通道,將goroutine掛起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 加鎖
lock(&c.lock)
// channel已關閉,panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}
block 表示當前的發送操作是否是阻塞調用。如果 channel 爲空,對於非阻塞的發送,直接返回 false,對於阻塞的發送,將 goroutine 掛起,並且永遠不會返回。對 channel 加鎖,防止多個線程併發修改數據,如果 channel 已關閉,報錯並中止程序。
runtime.chansend 函數的執行過程可以分爲以下三個部分:
-
當存在等待的接收者時,通過 runtime.send 直接將數據發送給阻塞的接收者
-
當緩衝區存在空餘空間時,將發送的數據寫入緩衝區
-
當不存在緩衝區或緩衝區已滿時,等待其他 Goroutine 從 channel 接收數據
直接發送
如果目標 channel 沒有被關閉且 recvq 隊列中已經有處於讀等待的 Goroutine,那麼 runtime.chansend 會從接收隊列 recvq 中取出最先陷入等待的 Goroutine 並直接向它發送數據,注意,由於有接收者在等待,所以如果有緩衝區,那麼緩衝區一定是空的:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 從recvq中取出一個接收者
if sg := c.recvq.dequeue(); sg != nil {
// 如果接收者存在,直接向該接收者發送數據,繞過buf
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
直接發送會調用 runtime.send 函數:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
// 直接把要發送的數據copy到接收者的棧空間
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 設置對應的goroutine爲可運行狀態
goready(gp, skip+1)
}
sendDirect 方法調用 memmove 進行數據的內存拷貝。goready 方法將等待接收數據的 Goroutine 標記成可運行狀態(Grunnable)並把該 Goroutine 發到發送方所在的處理器的 runnext 上等待執行,該處理器在下一次調度時會立刻喚醒數據的接收方。注意,只是放到了 runnext 中,並沒有立刻執行該 Goroutine。
發送到緩衝區
如果緩衝區未滿,則將數據寫入緩衝區:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 如果緩衝區沒有滿,直接將要發送的數據複製到緩衝區
if c.qcount < c.dataqsiz {
// 找到buf要填充數據的索引位置
qp := chanbuf(c, c.sendx)
...
// 將數據拷貝到buf中
typedmemmove(c.elemtype, qp, ep)
// 數據索引前移,如果到了末尾,又從0開始
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 元素個數加1,釋放鎖並返回
c.qcount++
unlock(&c.lock)
return true
}
...
}
找到緩衝區要填充數據的索引位置,調用 typedmemmove 方法將數據拷貝到緩衝區中,然後重新設值 sendx 偏移量。
阻塞發送
當 channel 沒有接收者能夠處理數據時,向 channel 發送數據會被下游阻塞,使用 select 關鍵字可以向 channel 非阻塞地發送消息:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 緩衝區沒有空間了,對於非阻塞調用直接返回
if !block {
unlock(&c.lock)
return false
}
// 創建sudog對象
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 將sudog對象入隊
c.sendq.enqueue(mysg)
// 進入等待狀態
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
...
}
對於非阻塞的調用會直接返回,對於阻塞的調用會創建 sudog 對象並將 sudog 對象加入發送等待隊列。調用 gopark 將當前 Goroutine 轉入 waiting 狀態。調用 gopark 之後,在使用者看來向該 channel 發送數據的代碼語句會被阻塞。
發送數據整個流程大致如下:
注意,發送數據的過程中包含幾個會觸發 Goroutine 調度的時機:
-
發送數據時發現從 channel 上存在等待接收數據的 Goroutine,立刻設置處理器的 runnext 屬性,但是並不會立刻觸發調度
-
發送數據時並沒有找到接收方並且緩衝區已經滿了,這時會將自己加入 channel 的 sendq 隊列並調用 gopark 觸發 Goroutine 的調度讓出處理器的使用權
從 chan 接收數據
從 channel 獲取數據最終調用到 runtime.chanrecv 函數:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
// 如果c爲空且是非阻塞調用,直接返回
if !block {
return
}
// 阻塞調用直接等待
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
···
lock(&c.lock)
// 如果c已經關閉,並且c中沒有數據,返回
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
···
}
當從一個空 channel 接收數據時,直接調用 gopark 讓出處理器使用權。如果當前 channel 已被關閉且緩衝區中沒有數據,直接返回。
runtime.chanrecv 函數的具體執行過程可以分爲以下三個部分:
-
當存在等待的發送者時,通過 runtime.recv 從阻塞的發送者或者緩衝區中獲取數據
-
當緩衝區存在數據時,從 channel 的緩衝區中接收數據
-
當緩衝區中不存在數據時,等待其他 Goroutine 向 channel 發送數據
直接接收
當 channel 的 sendq 隊列中包含處於發送等待狀態的 Goroutine 時,調用 runtime.recv 直接從這個發送者那裏提取數據。注意,由於有發送者在等待,所以如果有緩衝區,那麼緩衝區一定是滿的。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 從發送者隊列獲取數據
if sg := c.sendq.dequeue(); sg != nil {
// 發送者隊列不爲空,直接從發送者那裏提取數據
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
...
}
主要看一下 runtime.recv 的實現:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是無緩衝區chan
if c.dataqsiz == 0 {
if ep != nil {
// 直接從發送者拷貝數據
recvDirect(c.elemtype, sg, ep)
}
// 有緩衝區chan
} else {
// 獲取buf的存放數據指針
qp := chanbuf(c, c.recvx)
// 直接從緩衝區拷貝數據給接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 從發送者拷貝數據到緩衝區
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
// 設置對應的goroutine爲可運行狀態
goready(gp, skip+1)
}
該函數會根據緩衝區的大小分別處理不同的情況:
-
如果 channel 不存在緩衝區
-
直接從發送者那裏提取數據
-
如果 channel 存在緩衝區
-
將緩衝區中的數據拷貝到接收方的內存地址
-
將發送者數據拷貝到緩衝區,並喚醒發送者
無論發生哪種情況,運行時都會調用 goready 將等待發送數據的 Goroutine 標記成可運行狀態(Grunnable)並將當前處理器的 runnext 設置成發送數據的 Goroutine,在調度器下一次調度時將阻塞的發送方喚醒。
從緩衝區接收
如果 channel 緩衝區中有數據且發送者隊列中沒有等待發送的 Goroutine 時,直接從緩衝區中 recvx 的索引位置取出數據:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 如果緩衝區中有數據
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
// 從緩衝區複製數據到ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 接收數據的指針前移
c.recvx++
// 環形隊列,如果到了末尾,再從0開始
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 緩衝區中現存數據減一
c.qcount--
unlock(&c.lock)
return true, true
}
...
}
阻塞接收
當 channel 的發送隊列中不存在等待的 Goroutine 並且緩衝區中也不存在任何數據時,從管道中接收數據的操作會被阻塞,使用 select 關鍵字可以非阻塞地接收消息:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 非阻塞,直接返回
if !block {
unlock(&c.lock)
return false, false
}
// 創建sudog
gp := getg()
mysg := acquireSudog()
···
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 將sudog添加到等待接收隊列中
c.recvq.enqueue(mysg)
// 阻塞Goroutine,等待被喚醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
如果是非阻塞調用,直接返回。阻塞調用會將當前 Goroutine 封裝成 sudog,然後將 sudog 添加到等待接收隊列中,調用 gopark 讓出處理器的使用權並等待調度器的調度。
注意,接收數據的過程中包含幾個會觸發 Goroutine 調度的時機:
-
當 channel 爲空時
-
當 channel 的緩衝區中不存在數據並且 sendq 中也不存在等待的發送者時
關閉 chan
關閉通道會調用到 runtime.closechan 方法:
func closechan(c *hchan) {
// 校驗邏輯
...
lock(&c.lock)
// 設置chan已關閉
c.closed = 1
var glist gList
// 獲取所有接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
// 獲取所有發送者
for {
sg := c.sendq.dequeue()
...
}
unlock(&c.lock)
// 喚醒所有glist中的goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
將 recvq 和 sendq 兩個隊列中的 Goroutine 加入到 gList 中,並清除所有 sudog 上未被處理的元素。最後將所有 glist 中的 Goroutine 加入調度隊列,等待被喚醒。注意,發送者在被喚醒之後會 panic。
總結一下發送 / 接收 / 關閉操作可能引發的結果:
Goroutine 和 channel 的實現共同支撐起了 Go 語言的併發機制。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/obFUsRnppgEsGkoo08nWeQ