Go 併發編程 — Semaphore 信號量
信號量 是一個同步對象,用於保持在 0 至指定最大值之間的一個計數值。當線程完成一次對該 semaphore 對象的等待(wait)時,該計數值減一;當線程完成一次對 semaphore 對象的釋放(release)時,計數值加一。當計數值爲 0,則線程等待該 semaphore 對象不再能成功直至該 semaphore 對象變成 signaled 狀態。semaphore 對象的計數值大於 0,爲 signaled 狀態;計數值等於 0,爲 nonsignaled 狀態。
簡單的理解,信號量是一種同步手段,就是一個計數值,信號量定義了 2 個操作 P 和 V,P 操作(Wait)減少信號量的計數值,V 操作(Signal)增加信號量的計數值。
初始化信號量相當於指定數量爲 n 的資源,它就像是一個有 n 個資源的池子,P 操作相當於請求資源,如果資源可用,就立即返回;如果沒有資源或者不夠,那麼,它可以不斷嘗試或者阻塞等待。V 操作會釋放自己持有的資源,把資源返還給信號量。信號量的值除了初始化的操作以外,只能由 P/V 操作改變。
Golang 拓展包說明
Go 標準庫中並沒有提供開箱即用的信號量包,擴展包golang.org/x/sync/semaphore
提供了一種帶權重的信號量實現方式。
數據結構
Weighted
結構就是信號量,之所以叫 Weighted
,是因爲是一個帶權重的信號量。主要是變量有 信號量資源總數(size)、當前已申請資源數(cur)、鎖實例和 waiters。
type Weighted struct {
size int64 // 信號量資源總數
cur int64 // 當前已申請資源數
mu sync.Mutex // 鎖
waiters list.List // 等待者,鏈表存儲
}
複製代碼
方法
-
Acquire 方法:相當於
P
操作,可以一次獲取多個資源,如果沒有足夠多的資源,調用者就會被阻塞,第一個參數是Context
,相當於可以通過Context
增加超時或者cancel
的機制。如果是正常獲取了資源,就返回nil
;否則,就返回ctx.Err()
。 -
Release 方法:相當於
V
操作,可以將n
個資源釋放,返還給信號量。 -
TryAcquire 方法:嘗試獲取
n
個資源,但是它不會阻塞,要麼成功獲取n
個資源,返回true
,要麼一個也不獲取,返回false
。
案例
下面來看一下官方提供的 example
案例吧,我修改了一些代碼,主要是 worker
執行內容上做了簡單的調整,但是思路是一樣的。這裏創建和 CPU
核數一樣多的 Worker
,去處理 32 個任務,最終將任務結果輸出。
sem.Acquire(ctx, 1)
就是信號量的 P
操作,1
就是請求信號量的資源數量,可以同時請求多個。sem.Release(1)
就類型信號量的 V
操作,1
代表增加信號量的資源數量,可以同時增加多個。
在輸出前的 sem.Acquire(ctx, int64(maxWorkers))
語句值得說一下,這樣在請求 最大核數 的信號量資源得話,如果成功的話,就代表之前的 worker
工作全部做完。就可以正常輸出了,我們在實際使用信號量時也可以進行使用。
package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"log"
"runtime"
"time"
)
func main() {
ctx := context.TODO()
var (
maxWorkers = runtime.GOMAXPROCS(0) // 獲取CPU核數作爲worker數量
sem = semaphore.NewWeighted(int64(maxWorkers)) // 信號量
out = make([]int, 32) // 輸出
)
for i := range out {
// 如果沒有worker可用,會阻塞在這裏,直到某個worker被釋放
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
// 啓動 worker goruntine
go func(i int) {
defer sem.Release(1)
time.Sleep(time.Second * 1) // 模擬耗時操作
out[i] = i
}(i)
}
// 請求所有的worker,這樣能確保前面的worker都執行完
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}
fmt.Println(out)
}
複製代碼
輸出結果:
[0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
複製代碼
源碼分析
- Acquire 方法
如果有足夠的資源,就直接申請成功;不成功就創建 waiter
,然後調用 select
變成等待者,等待 waiter
的 ready
被喚醒,就獲得資源了。如果 waiter
被取消了,需要刪除該 waiter
,如果刪除的鏈表頭,調用 notifyWaiters
來嘗試喚醒其他的 waiter
。這段代碼裏面有 2
個 select
,大家看的時候注意一下。
這裏 s.size-s.cur >= n && s.waiters.Len() == 0
既判斷了富餘的資源數量,也判斷了是否有等待者。說明在有等待者的情況下,即使有富餘的資源可以被新等待者申請也不能進行申請,需要進行排隊等待。
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock() // 加鎖
// 沒有人在等待,並且有富餘資源可以被申請,就增加已申請資源數,並退出
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n // 增加已申請資源數
s.mu.Unlock()
return nil
}
// 不能申請比size大的資源數量
if n > s.size {
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
ready := make(chan struct{}) // 創建ready chan
w := waiter{n: n, ready: ready} // 創建等待者,等待者有等待資源數量和ready chan
elem := s.waiters.PushBack(w) // 添加到鏈表的尾部
s.mu.Unlock() // 解鎖
// 阻塞
select {
case <-ctx.Done(): // context的Done被取消
err := ctx.Err()
s.mu.Lock() // 加鎖
select {
case <-ready: // 被喚醒了
err = nil
default:
isFront := s.waiters.Front() == elem // 是否是第一個等待者
s.waiters.Remove(elem) // 刪除取消的等待者
if isFront && s.size > s.cur { // 如果是第一個等待者,並且size大於cur,通知其他等待者
s.notifyWaiters()
}
}
s.mu.Unlock() // 解鎖
return err
case <-ready: // 被喚醒,代表獲取到資源了
return nil
}
}
複製代碼
- Release 方法
Release
邏輯比較簡單,就是將 cur
的值減去 n
,然後嘗試通知其他等待者。
func (s *Weighted) Release(n int64) {
s.mu.Lock() // 加鎖
s.cur -= n // 釋放n個佔用的資源
if s.cur < 0 { // 如果當前可用資源小於0,就panic,可能n值傳的不對
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters() // 嘗試通知其他等待者
s.mu.Unlock() // 解鎖
}
複製代碼
- 內部的 notifyWaiters 方法
循環檢測可以被喚醒的 waiter
,如果沒有等待者就退出,獲取第一個等待的 waiter
,資源數量不夠則不進行分配,繼續等待;夠的話就增加已獲得的資源數量,溢出等待者,並喚醒 waiter
。這個方法沒有加鎖的原因是因爲調用這個函數前都進行加鎖處理了。
func (s *Weighted) notifyWaiters() {
for { // 循環,一次調用可能會喚醒多個waiter
next := s.waiters.Front() // 獲取鏈表第一個waiter
// 沒有等待者,退出
if next == nil {
break
}
w := next.Value.(waiter) // 獲取waiter
// 當前資源數量還不足以分配給該waiter,退出,waiter繼續等待
if s.size-s.cur < w.n {
break
}
s.cur += w.n // 資源數夠分配,增加已獲得的資源數量
s.waiters.Remove(next) // 移除等待者
close(w.ready) // close 該 waiter 的 ready 的 chan,喚醒該 waiter
}
}
複製代碼
- TryAcquire 方法
這個方法的實現也比較簡單,可以看下注釋。
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock() // 加鎖
success := s.size-s.cur >= n && s.waiters.Len() == 0 // 資源數量夠,並且沒有等待者,代表可以進行申請
if success {
s.cur += n // 增加已申請資源數量
}
s.mu.Unlock() // 解鎖
return success
}
複製代碼
總結
Weighted semaphore
實現的是先等待先獲得的資源申請方式,實現的是公平模式- 注意請求多少資源,就釋放多少資源
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://juejin.cn/post/7095701003172839432