一文搞懂啥是信號量 Semaphore?

信號量是併發編程中常見的同步機制,在標準庫的併發原語中使用頻繁,比如 Mutex、WaitGroup 等,這些併發原語的實現都有信號量的影子,所以我們很有必要學好弄清楚信號量的實現原理,做到 “知其然,更要知其所以然”,我們纔能有更多的“武器” 去應對實際面臨的業務場景問題。

今天我們就來搞定信號量,通過這篇文章你能掌握:

  1. 信號量是什麼?都有什麼操作?

  2. Go 官方是如何實現信號量的?

  3. 實際場景中如何使用信號量?

  4. 使用信號量應該注意哪些問題?

  5. 實現信號量的其他方式?

信號量是什麼?都有什麼操作?

維基百科上是這樣解釋信號量的:

信號量的概念是計算機科學家 Dijkstra (Dijkstra 算法的發明者)提出來的,廣泛應用在不同的操作系統中。系統中,會給每一個進程一個信號量,代表每個進程當前的狀態,未得到控制權的進程,會在特定的地方被迫停下來,等待可以繼續進行的信號到來。

下文用 G 代表 goroutine。

通俗點解釋就是,信號量通常使用一個整型變量 S 表示一組資源,當 G 完成對此信號量的等待(wait)時,S 就減 1,當 G 完成對此信號量的釋放(release)時,S 就加 1。當計數值爲 0 的時候,G 調用 wait 等待該信號量會阻塞,除非 S 又大於 0,等待的 G 纔會解除阻塞,成功返回。

舉個例子,假如圖書館有 10 本《Go 語言編程之旅》,有 1 萬個人都想讀這本書,“僧多粥少”。所以,圖書館管理員先會讓這 1 萬個人進行登記,按照登記的順序,借閱此書。如果書全部被借走,那麼,其他想看此書的人就需要等待,如果有人還書了,圖書館管理員就會通知下一位同學來借閱這本書。這裏的資源是《Go 語言編程之旅》這十本書,想讀此書的同學就是 goroutine,圖書管理員就是信號量。

從上面的解釋中我們可以得知什麼是信號量,其實信號量就是一種變量或者抽象數據類型,用於控制併發系統中多個進程對公共資源的訪問,訪問具有原子性。信號量主要分爲兩類:

PV 操作

信號量定義有兩個操作 P 和 V,P 操作是減少信號量的計數值,而 V 操作是增加信號量的計數值。

通常初始化時,將信號量 S 指定數值爲 n,就像是一個有 n 個資源的池子。P 操作相當於請求資源,如果資源可用,就立即返回;如果沒有資源或者不夠,那麼,G 會阻塞等待。V 操作會釋放持有的資源,把資源返還給信號量。

信號量的值除了初始化的操作以外,只能由 P/V 操作改變。

我們一般用信號量保護一組資源,比如數據庫連接池、幾個打印機資源等等。如果信號量蛻變成二值信號量,那麼,它的 P/V 就和互斥鎖的 Lock/Unlock 一樣了。

信號量的實現 -- 官方擴展包 Semaphore

在看 Go 源碼時,我們經常能夠看到下面這幾個關於信號量的函數:

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

這幾個函數就是信號量的 PV 操作,遺憾的是,它是 Go 運行時內部使用的,並沒有封裝暴露成一個對外的信號量併發原語,我們沒有辦法使用。不過沒關係,Go 在它的擴展包中提供了信號量 semaphore,不過這個信號量的類型名並不叫 Semaphore,而是叫 Weighted。

這是一個帶權重的信號量,接下來我們就重點分析一下這個庫。

Weighted 的實現思路:使用互斥鎖 + List 實現的。互斥鎖實現其它字段的保護,而 List 實現了一個等待隊列,等待者的通知是通過 Channel 的通知機制實現的。

Weighted 主要包括兩個結構體和幾個常用方法。

結構體

type Weighted struct {
 size    int64       // 最大資源個數,初始化的時候指定
 cur     int64      // 計數器,當前已使用資源數
 mu      sync.Mutex   // 互斥鎖,對字段保護
 waiters list.List  // 等待者列表,當前處於阻塞等待的請求者 goroutine
}

每個字段的含義見代碼註釋,其中 waiters 存儲的數據是 waiter 對象,waiter 數據結構如下:

type waiter struct {
 n     int64        // 調用者申請的資源數
 ready chan<- struct{}  // 當調用者可以獲取到信號量資源時, close chan,調用者便會收到通知,成功返回
}

字段含義見註釋。

這裏提下初始化資源數方法 NewWeighted,很簡單:

// 創建資源數爲 n 的信號量
func NewWeighted(n int64) *Weighted {
 w := &Weighted{size: n}
 return w
}

方法

  1. 阻塞獲取資源的方法 -- Acquire(),源碼如下:
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
 s.mu.Lock()
 // 有可用資源,直接返回
 if s.size-s.cur >= n && s.waiters.Len() == 0 {
  s.cur += n
  s.mu.Unlock()
  return nil
 }

 // 程序執行到這裏說明無足夠資源使用

 if n > s.size {
  s.mu.Unlock()
  <-ctx.Done()
  return ctx.Err()
 }

 // 資源不足,構造 waiter,將其加入到等待隊列
 // ready channel 用於通知阻塞的調用者有資源可用,由釋放資源的 goroutine 負責 close,起到消息通知的作用
 ready := make(chan struct{})
 w := waiter{n: n, ready: ready}
 elem := s.waiters.PushBack(w)      // 加入到等待隊列
 s.mu.Unlock()

 // 調用者陷入 select 阻塞,除非收到外部 ctx 的取消信號或者被通知有資源可用
 select {
 case <-ctx.Done():     // 收到外面的控制信號
  err := ctx.Err()
  s.mu.Lock()
  select {
  case <-ready:    // 再次確認是否可能是被喚醒的,如果被喚醒了則忽略控制信號,返回 nil 表示成功
   err = nil
  default:      // 收到控制信息且還沒有獲取到資源,就直接將原來添加的 waiter 刪除掉
   isFront := s.waiters.Front() == elem     // 當前 waiter 是否是鏈表頭元素
   s.waiters.Remove(elem)     // 刪除 waiter
   if isFront && s.size > s.cur {    // 如果是鏈表頭元素且有資源可用則嘗試喚醒鏈表第一個等待的 waiter
    s.notifyWaiters()
   }
  }
  s.mu.Unlock()
  return err
 case <-ready:      // 消息通知,請求資源的 goroutine 被釋放資源的 goroutine 喚醒了
  return nil
 }
}

詳細說明可以看註釋,Acquire() 相當於 P 操作,可以一次獲取多個資源,如果沒有足夠多的資源,調用者就會被阻塞。可以通過第一個參數 Context 增加超時或者 cancel 的機制。如果正常獲取了資源,就返回 nil;否則,就返回 ctx.Err(),信號量不改變。

  1. 非阻塞獲取資源的方法 -- TryAcquire,源碼如下:
func (s *Weighted) TryAcquire(n int64) bool {
 s.mu.Lock()
 success := s.size-s.cur >= n && s.waiters.Len() == 0
 if success {
  s.cur += n
 }
 s.mu.Unlock()
 return success
}

這個方法比較簡單,非阻塞地獲取指定數量的資源,如果當前沒有空閒資源,就直接返回 false。

  1. 通知等待者 notifyWaiters,源碼如下:
func (s *Weighted) notifyWaiters() {
 for {
  next := s.waiters.Front()     // 獲取隊頭元素
  if next == nil {        // 隊列裏沒有元素
   break
  }

  w := next.Value.(waiter)
  if s.size-s.cur < w.n {       // 資源不滿足請求者的要求
   break
  }

  s.cur += w.n              // 增加已用資源
  s.waiters.Remove(next)
  close(w.ready)         // 關閉 ready channel,用於通知調用者 goroutine 已經獲取到資源,繼續運行
 }
}

通過 for 循環從鏈表頭部開始依次遍歷鏈表中的所有 waiter,並更新計數器 weighted.cur,同時將其從鏈表中移除,直到遇到空閒資源小於 waiter.n 爲止。

仔細分析,我們會發現,notifyWaiters 方法是按照 FIFO 方式喚醒調用者。這樣做的目的是爲了避免調用者出現 “餓死” 的情況,當釋放 10 個資源的時候,如果第一個等待者需要 11 個資源,那麼,隊列中的所有等待者都會繼續等待,即使有的等待者只需要 1 個資源,否則的話,資源可能總是被那些請求資源數小的調用者獲取,這樣一來,請求資源數巨大的調用者,就沒有機會獲得資源了。

  1. 釋放佔用的資源 -- Release(),源碼如下:
func (s *Weighted) Release(n int64) {
 s.mu.Lock()
 s.cur -= n       // 釋放佔用資源數
 if s.cur < 0 {
  s.mu.Unlock()
  panic("semaphore: released more than held")
 }
 s.notifyWaiters()   // 喚醒等待請求資源的 goroutine
 s.mu.Unlock()
}

Release() 相當於 V 操作,可以將 n 個資源釋放,返還給信號量。

怎麼用?

現在我們知道了信號量的實現原理,針對實際業務場景中又該如何使用呢?我們舉個 worker pool 的例子,也是官網提供的:考拉茲猜想。

“考拉茲猜想” 說的是:對於每一個正整數,如果它是奇數,則對它乘 3 再加 1,如果它是偶數,則對它除以 2,如此循環,最終都能夠得到 1。

我們的例子需要實現的是,對於給出的正整數,計算循環多少次之後能得到 1,代碼如下:

func main() {
 var (
  maxWorkers = runtime.GOMAXPROCS(0)    // worker 數量
  sem        = semaphore.NewWeighted(int64(maxWorkers))  // 信號量
  out        = make([]int, 32)    // 任務數
 )
 ctx := context.TODO()

 for i := range out {
  if err := sem.Acquire(ctx, 1); err != nil {
   log.Printf("Failed to acquire semaphore: %v", err)
   break
  }

  go func(i int) {
   defer sem.Release(1)
   out[i] = collatzSteps(i + 1)
  }(i)
 }

 // 等待所有的任務執行完成,也可以通過 WaitGroup 實現
 if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
  log.Printf("Failed to acquire semaphore: %v", err)
 }

 fmt.Println(out)
}

func collatzSteps(n int) (steps int) {
 if n <= 0 {
  panic("nonpositive input")
 }

 for ; n > 1; steps++ {
  if steps < 0 {
   panic("too many steps")
  }

  if n%2 == 0 {
   n /= 2
   continue
  }

  const maxInt = int(^uint(0) >> 1)
  if n > (maxInt-1)/3 {
   panic("overflow")
  }
  n = 3*n + 1
 }

 return steps
}

上面的代碼創建數量與 CPU 核數相同的 worker,假設是 4, 相當於池子裏只有 4 個資源可用,每個 worker 處理完一個整數,才能處理下一個,相當於控制住了併發數量。

輸出:

[0 1 7 2 5 8 16 3 19 6 14 9 9 17 17 4 12 20 20 7 7 15 15 10 23 10 111 18 18 18 106 5]

如何正確使用信號量?

閱讀完源碼之後,會發現使用 semaphore 過程中一不小心就會導致錯誤,比如:如果請求的資源數比最大的資源數還大,那麼,調用者可能永遠被阻塞;調用 Release() 方法時,可以傳遞任意的整數。但如果傳遞一個比請求到的數量大的錯誤的數值,程序就會 panic;如果傳遞一個負數,會導致資源永久被持有,等等。

使用時有哪些常犯的錯誤:

使用一項技術,保證不出錯的前提是正確地使用它,對於信號量來說也是一樣,所以使用信號量是應該格外小心,確保正確地傳遞參數,請求多少資源,就釋放多少資源

總結

本篇文章詳細介紹了什麼是信號量,什麼是 PV 操作,官方擴展包 semaphore 實現原理,剖析了實際場景中的例子,以及使用信號量時的注意事項,相信你已經掌握了信號量。

除了官方擴展包 semaphore 的實現方式外,還有別的辦法可以實現信號量,你還知道哪些方式可以實現嗎?

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/5zedcA-3lYw5qqie7qUR9A