Go 運行時的併發原語
這篇文章我們來了解一下隱藏在 Go 運行時中的一些併發原語, 因爲運行時是底座和包循環依賴等原因,運行時中很少使用標準庫中的併發原語,它有自己的併發原語。
mutex
在 runtime/runtime2.go[1] 定義了一個互斥鎖,它的定義如下:
type mutex struct {
lockRankStruct
key uintptr
}
它可是運行時中的大紅人了,在很多數據結構中都被廣泛的使用,凡事涉及到併發訪問的地方都會用到它,你在runtime2.go
文件中就能看到多處使用它的地方,因爲很多地方都在使用它,我就不一一列舉了在 runtime 這個文件夾中搜mutex
這個關鍵子就都搜出來了。
舉一個大家常用來底層分析的數據結構channel
爲例,channel 的數據結構定義如下:
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
最後哪個字段lock mutex
就是使用的這個互斥鎖。因爲一個通道在發送和接收的時候都會涉及到對通道的修改,在多發送者或者接收者情況下,需要使用互斥鎖來保護。
這個互斥鎖的使用需要調用幾個函數。
-
lockInit: 需要初始化這個鎖,比如在 channel 的實現中,有如下的初始化代碼:
lockInit(&c.lock, lockRankHchan)
, 它將 lock 初始化 (lockInit) 時設置鎖的等級(rank)。如果不明確去初始化一個鎖, 那麼可以在調用 lock 自身的時候通過 lockWithRank 指定這個鎖的等級。這個等級在啓用GOEXPERIMENT=staticlockranking
用來加強鎖的靜態分析。 -
lock: 加鎖,在不同的操作系統下有不同的實現。如 channel 使用這個代碼進行加鎖:
lock(&c.lock)
-
unlock: 解鎖,在不同的操作系統下有不同的實現。如 channel 使用這個代碼進行解鎖:
unlock(&c.lock)
我在 Go 運行時中的 Mutex[2] 中詳細介紹了它,這裏就不再贅述了。
rwmutex
運行時中還實現了讀寫鎖 rwmutex[3]。這個讀寫鎖完全是從sync.RWMutex
中拷貝過來的,只是將sync.RWMutex
中的sync
包替換成了runtime
包,因爲sync
包依賴了runtime
包,所以不能直接使用。
你看它的數據結構定義和sync.RWMutex
幾乎是一樣的:
type rwmutex struct {
rLock mutex // protects readers, readerPass, writer
readers muintptr // list of pending readers
readerPass uint32 // number of pending readers to skip readers list
wLock mutex // serializes writers
writer muintptr // pending writer waiting for completing readers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
readRank lockRank // semantic lock rank for read locking
writeRank lockRank // semantic lock rank for write locking
}
mutex
和rwmutex
會直接阻塞M
。
gopark/goready
在其它編程語言中,會直接提供park
和unpark
的功能,比如 rust, 提供對併發單元的更底層的控制。
park
就是停止一會,很形象,就是暫時讓併發單元阻塞,不再參與調度,直到unpark
它,它纔會重新參與調度。
Go 運行時並沒有直接提供park
和unpark
的功能,它提供了gopark
和goready
的功能,它們的實現在 runtime/proc.go[4]。
gopark
會將 goroutine 放到等待隊列中,從調度器的運行隊列中移出去,等待被喚醒。goready
會將 goroutine 放到可運行隊列中,加入到調度器的運行隊列,等待被調度。
note
note
實現一次性的通知機制。
note
的數據結構如下:
type note struct {
key uintptr
}
可以使用notesleep
和notewakeup
進行休眠和喚醒。
就像mutex
一樣,notesleep
會阻塞M
,notewakeup
會喚醒一個M
,並且不會重新調度G
和P
, 而notetsleepg
就像一個阻塞的系統調用一樣,允許P
選擇另外一個G
運行。noteclear
用來重置note
總結一下, 上面幾種同步原語阻塞的角色如下:
filelock
"filelock"(文件鎖)通常是指在計算機系統中使用的一種機制,用於確保對文件的獨佔性訪問,以防止多個進程或線程同時修改文件而導致數據不一致或損壞。
一些應用程序經常利用文件鎖,來控制只有一個實例在運行,在 linux 環境下非常常見,比如 mysql 等。
在不同的操作系統和編程語言中,文件鎖的實現方式可能會有所不同。一般而言,文件鎖可以分爲兩種主要類型:
-
共享鎖(Shared Lock):多個進程或線程可以同時獲取共享鎖,允許它們同時讀取文件,但阻止其他進程或線程獲取獨佔鎖進行寫操作。
-
獨佔鎖(Exclusive Lock):只允許一個進程或線程獲取獨佔鎖,阻止其他進程或線程同時進行讀或寫操作。
文件鎖的代碼在 cmd/go/internal/lockedfile[5] 中, 我們以 Linux 爲例,看看它的實現:
type lockType int16
const (
readLock lockType = syscall.LOCK_SH
writeLock lockType = syscall.LOCK_EX
)
func lock(f File, lt lockType) (err error) {
for {
err = syscall.Flock(int(f.Fd()), int(lt))
if err != syscall.EINTR {
break
}
}
if err != nil {
return &fs.PathError{
Op: lt.String(),
Path: f.Name(),
Err: err,
}
}
return nil
}
func unlock(f File) error {
return lock(f, syscall.LOCK_UN)
}
可以看到它實際是調用系統調用syscall.Flock
實現的。
這不屬於運行時內定義的同步原語,但是它給我們提供了一個實現文件鎖的思路,它甚至還封裝了一個Mutex
供我們使用。如果有類似的需求,我們可以參考它的實現。
semaphore
不太清楚 Go 爲啥不在運行時或者標準庫 sync 中實現信號量,而是在擴展包中去實現,信號量可以說是一個非常廣泛使用的同步原語了。
雖然沒有在運行時中沒有明確實現,但是運行時中的 runtime/sema.go[6] 提供了與信號量相近功能,而且sync.Mutex
嚴重依賴它。
這個實現旨在提供一個可以在其他同步原語爭用的情況下使用的睡眠和喚醒原語,因此,它的目標與 Linux 的 futex 相同,但語義要簡單得多。Go 團隊說你不要將這些視爲信號量,而是將它們視爲一種實現睡眠和喚醒的方式,以確保每個睡眠都與單個喚醒配對, 這是有歷史原因,這些從貝爾實驗室出來的大佬,對於先前他們在 Plan 9 中的一些想法一脈相承的繼承下來,這個設計可以參見 Mullender 和 Cox 的 Plan 9 中的信號量 [7]。
比如sync.Mutex
睡眠和喚醒的函數其實就是這裏實現的:
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
}
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
}
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes, waitReasonSyncMutexLock)
}
// RWMutex使用的一些函數
...
atomic
atomic[8] 提供原子操作,獨立於sync/atomic
,僅供運行時使用。
在大多數平臺上,編譯器能夠識別此包中定義的函數,並用平臺特定的內部函數替換它們。在其他平臺上,提供了通用的實現。除非另有說明,在此包中定義的操作在處理它們所操作的值時對線程是有序一致的 (sequentially consistent)。更具體地說,在一個線程上按特定順序發生的操作,將始終被另一個線程觀察到以完全相同的順序發生。
因爲和特定的 CPU 架構有關,它的實現針對不同的 CPU 架構,由不同的指令實現而成,而且基本使用匯編實現,比如 AMD64 下的 Cas 實現,使用了LOCK
+ CMPXCHGL
指令:
TEXT ·Cas(SB),NOSPLIT,$0-17
MOVQ ptr+0(FP), BX
MOVL old+8(FP), AX
MOVL new+12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+16(FP)
RET
其實sync/atomic
下的實現,也是調用這裏的實現,否則維護兩套代碼就太麻煩了,而且可能出現不一致的現象。你看sync/atomic/asm.s
:
...
TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0
JMP runtime∕internal∕atomic·Cas64(SB)
TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0
JMP runtime∕internal∕atomic·Cas64(SB)
...
它也是調用untime∕internal∕atomic
下對應的函數。
singleflight
singleflight
特別適合大併發情況下許多請求做同一件事情的場景,這個時候只處理一個請求就可以了,其它請求等待那一個請求的結果,這樣對下游的壓力大大減少,比如在讀取 cache 的時候。
因爲它在特定場景下很有用,Go 的擴展庫中也同樣實現了它。
它沒有定義在運行時中,而是定義在 internal/singleflight[9] 中。
比如在包net
中,我們查找一臺主機的 IP 地址時,如果併發的請求,對資源是很大的浪費,這個時候我們只讓一個請求處理就好了:
type Resolver struct {
...
// lookupGroup merges LookupIPAddr calls together for lookups for the same
// host. The lookupGroup key is the LookupIPAddr.host argument.
// The return values are ([]IPAddr, error).
lookupGroup singleflight.Group
}
func (r *Resolver) lookupIPAddr(ctx context.Context, network, host string) ([]IPAddr, error) {
...
ch := r.getLookupGroup().DoChan(lookupKey, func() (any, error) {
return testHookLookupIP(lookupGroupCtx, resolverFunc, network, host)
})
...
}
參考資料
[1] runtime/runtime2.go: https://github.com/golang/go/blob/master/src/runtime/runtime2.go#L164
[2] Go 運行時中的 Mutex: https://colobu.com/2020/12/06/mutex-in-go-runtime/
[3] rwmutex: https://github.com/golang/go/blob/master/src/runtime/rwmutex.go
[4] runtime/proc.go: https://github.com/golang/go/blob/master/src/runtime/proc.go#L385
[5] cmd/go/internal/lockedfile: https://github.com/golang/go/tree/8db131082d08e497fd8e9383d0ff7715e1bef478/src/cmd/go/internal/lockedfile
[6] runtime/sema.go: https://github.com/golang/go/blob/master/src/runtime/sema.go
[7] Plan 9 中的信號量: https://swtch.com/semaphore.pdf
[8] atomic: https://github.com/golang/go/tree/master/src/runtime/internal/atomic
[9] internal/singleflight: https://github.com/golang/go/tree/master/src/internal/singleflight
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Lhw_VFL8UHD9edfbKt_QjQ?poc_token=HOa-nGWj0tob576QGv2jzE0M-L1rCsFivoxZ0p8f