Go 併發編程 — Semaphore 信號量

信號量 是一個同步對象,用於保持在 0 至指定最大值之間的一個計數值。當線程完成一次對該 semaphore 對象的等待(wait)時,該計數值減一;當線程完成一次對 semaphore 對象的釋放(release)時,計數值加一。當計數值爲 0,則線程等待該 semaphore 對象不再能成功直至該 semaphore 對象變成 signaled 狀態。semaphore 對象的計數值大於 0,爲 signaled 狀態;計數值等於 0,爲 nonsignaled 狀態。

簡單的理解,信號量是一種同步手段,就是一個計數值,信號量定義了 2 個操作 P 和 V,P 操作(Wait)減少信號量的計數值,V 操作(Signal)增加信號量的計數值。

初始化信號量相當於指定數量爲 n 的資源,它就像是一個有 n 個資源的池子,P 操作相當於請求資源,如果資源可用,就立即返回;如果沒有資源或者不夠,那麼,它可以不斷嘗試或者阻塞等待。V 操作會釋放自己持有的資源,把資源返還給信號量。信號量的值除了初始化的操作以外,只能由 P/V 操作改變。

Golang 拓展包說明

Go 標準庫中並沒有提供開箱即用的信號量包,擴展包golang.org/x/sync/semaphore提供了一種帶權重的信號量實現方式。

數據結構

Weighted 結構就是信號量,之所以叫 Weighted,是因爲是一個帶權重的信號量。主要是變量有 信號量資源總數(size)、當前已申請資源數(cur)、鎖實例和 waiters。

type Weighted struct {
   size    int64      // 信號量資源總數
   cur     int64      // 當前已申請資源數
   mu      sync.Mutex // 鎖
   waiters list.List  // 等待者,鏈表存儲
}
複製代碼

方法

案例

下面來看一下官方提供的 example 案例吧,我修改了一些代碼,主要是 worker 執行內容上做了簡單的調整,但是思路是一樣的。這裏創建和 CPU 核數一樣多的 Worker,去處理 32 個任務,最終將任務結果輸出。

sem.Acquire(ctx, 1) 就是信號量的 P 操作,1 就是請求信號量的資源數量,可以同時請求多個。sem.Release(1) 就類型信號量的 V 操作,1 代表增加信號量的資源數量,可以同時增加多個。

在輸出前的 sem.Acquire(ctx, int64(maxWorkers)) 語句值得說一下,這樣在請求 最大核數 的信號量資源得話,如果成功的話,就代表之前的 worker 工作全部做完。就可以正常輸出了,我們在實際使用信號量時也可以進行使用。

package main

import (
   "context"
   "fmt"
   "golang.org/x/sync/semaphore"
   "log"
   "runtime"
   "time"
)

func main() {
   ctx := context.TODO()

   var (
      maxWorkers = runtime.GOMAXPROCS(0)                    // 獲取CPU核數作爲worker數量
      sem        = semaphore.NewWeighted(int64(maxWorkers)) // 信號量
      out        = make([]int, 32)                          // 輸出
   )

   for i := range out {

      // 如果沒有worker可用,會阻塞在這裏,直到某個worker被釋放
      if err := sem.Acquire(ctx, 1); err != nil {
         log.Printf("Failed to acquire semaphore: %v", err)
         break
      }

      // 啓動 worker goruntine
      go func(i int) {
         defer sem.Release(1)
         time.Sleep(time.Second * 1) // 模擬耗時操作
         out[i] = i
      }(i)

   }

   // 請求所有的worker,這樣能確保前面的worker都執行完
   if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
      log.Printf("Failed to acquire semaphore: %v", err)
   }

   fmt.Println(out)
}
複製代碼

輸出結果:

[0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
複製代碼

源碼分析

如果有足夠的資源,就直接申請成功;不成功就創建 waiter,然後調用 select 變成等待者,等待 waiterready 被喚醒,就獲得資源了。如果 waiter 被取消了,需要刪除該 waiter ,如果刪除的鏈表頭,調用 notifyWaiters 來嘗試喚醒其他的 waiter 。這段代碼裏面有 2select,大家看的時候注意一下。

這裏 s.size-s.cur >= n && s.waiters.Len() == 0 既判斷了富餘的資源數量,也判斷了是否有等待者。說明在有等待者的情況下,即使有富餘的資源可以被新等待者申請也不能進行申請,需要進行排隊等待。

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
   s.mu.Lock() // 加鎖
   
   // 沒有人在等待,並且有富餘資源可以被申請,就增加已申請資源數,並退出
   if s.size-s.cur >= n && s.waiters.Len() == 0 {
      s.cur += n // 增加已申請資源數
      s.mu.Unlock()
      return nil
   }

   // 不能申請比size大的資源數量
   if n > s.size {
      s.mu.Unlock()
      <-ctx.Done()
      return ctx.Err()
   }

   ready := make(chan struct{}) // 創建ready chan
   w := waiter{n: n, ready: ready} // 創建等待者,等待者有等待資源數量和ready chan
   elem := s.waiters.PushBack(w) // 添加到鏈表的尾部
   s.mu.Unlock() // 解鎖

   // 阻塞
   select {
   case <-ctx.Done(): // context的Done被取消
      err := ctx.Err()
      s.mu.Lock()  // 加鎖
      select {
      case <-ready: // 被喚醒了
         err = nil 
      default:
         isFront := s.waiters.Front() == elem  // 是否是第一個等待者
         s.waiters.Remove(elem) // 刪除取消的等待者
         if isFront && s.size > s.cur {  // 如果是第一個等待者,並且size大於cur,通知其他等待者
            s.notifyWaiters()
         }
      }
      s.mu.Unlock() // 解鎖
      return err

   case <-ready: // 被喚醒,代表獲取到資源了
      return nil
   }
}
複製代碼

Release 邏輯比較簡單,就是將 cur 的值減去 n ,然後嘗試通知其他等待者。

func (s *Weighted) Release(n int64) {
   s.mu.Lock()  // 加鎖
   s.cur -= n  // 釋放n個佔用的資源  
   if s.cur < 0 {  // 如果當前可用資源小於0,就panic,可能n值傳的不對
      s.mu.Unlock()
      panic("semaphore: released more than held")
   }
   s.notifyWaiters() // 嘗試通知其他等待者
   s.mu.Unlock() // 解鎖
}
複製代碼

循環檢測可以被喚醒的 waiter ,如果沒有等待者就退出,獲取第一個等待的 waiter ,資源數量不夠則不進行分配,繼續等待;夠的話就增加已獲得的資源數量,溢出等待者,並喚醒 waiter。這個方法沒有加鎖的原因是因爲調用這個函數前都進行加鎖處理了。

func (s *Weighted) notifyWaiters() {
   for { // 循環,一次調用可能會喚醒多個waiter
      next := s.waiters.Front() // 獲取鏈表第一個waiter
      
      // 沒有等待者,退出
      if next == nil {
         break 
      }

      w := next.Value.(waiter) // 獲取waiter

      // 當前資源數量還不足以分配給該waiter,退出,waiter繼續等待
      if s.size-s.cur < w.n {
         break
      }

      s.cur += w.n // 資源數夠分配,增加已獲得的資源數量
      s.waiters.Remove(next) // 移除等待者
      close(w.ready) // close 該 waiter 的 ready 的 chan,喚醒該 waiter
   }
}
複製代碼

這個方法的實現也比較簡單,可以看下注釋。

func (s *Weighted) TryAcquire(n int64) bool {
   s.mu.Lock() // 加鎖
   success := s.size-s.cur >= n && s.waiters.Len() == 0 // 資源數量夠,並且沒有等待者,代表可以進行申請
   if success {
      s.cur += n // 增加已申請資源數量
   }
   s.mu.Unlock() // 解鎖
   return success
}
複製代碼

總結

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://juejin.cn/post/7095701003172839432