Go 協程池 -2-: 如何實現協程池

線程池的出現,是因爲頻繁地創建和銷燬線程開銷比較大。通過線程池,一個線程不僅僅是處理一個任務就被銷燬,而是可以處理多個任務,任務被處理完時才被銷燬。下圖是 Java ExecutorService 類的結構:

Java ExecutorService

協程池的作用是一樣的,實現原理當然也一樣。一個協程池的數據結構,在邏輯上必須 3 類數據:

  1. 協程池的容量

  2. task pool 任務池

  3. worker pool 協程池

task pool 比較容易實現,通過一個加鎖的鏈表就可以實現。

對於 worker pool,由於協程對用戶不暴露任何 ID 和管理的 API,啓動後無法從外部主動管理,無法進行池化。我們能做的只有記錄並限制協程的數量。

最常用的方法是創建一個緩衝區大小爲 N 的 channel,創建協程時,就向 channel 發送一條數據;協程退出時,就消費一條數據;通過 len 檢查協程的數量;

另一種方法是通過一個原子變量進行控制,創建協程時,將原子變量加一;協程退出時,將原子變量減一;通過 Load 檢查協程的數量;我們後面也採用這種方式。

除了這 3 塊數據,還有一點比較關鍵:活躍的協程能夠不斷從 task pool 獲取新的任務,以達到重複利用的效果,因此協程需要獲得 task pool 的一個指針。

基於上面提到的理念,我們先把數據結構創建出來:

type pool struct {
  cap int32 // 容量
  workerCount int32 // worker數量

  taskHead *task  // task pool 頭指針 
  taskTail *task  // task pool 尾指針
  taskLock  sync.Mutex // task lock
}

type worker struct {
  p *pool // 指向pool,需要獲取task 
}

type task struct {
  f   func()  // 要執行的函數
  next *task // 指向下一個task
}

在使用層面上,pool 需要對外暴露一個方法,以實現類似於 go func(... 的功能。在我們的定義裏,它應該包含兩部分邏輯: 1) 將任務添加到 task pool, 供 worker 消費; 2) 按需創建新的 worker。其邏輯可以這樣寫:

func (p *pool) Go(f func()) {
  t := &task{f: f}
  p.taskLock.Lock()
  if p.taskHead == nil {
    p.taskHead = t
    p.taskTail = t
  } else {
    p.taskTail.next = t
    p.taskTail = t;
  }
  p.taskLock.Unlock()

  // 創建新worker
  if (atomic.LoadInt32(&p.workerCount) < p.cap) {
    atomic.AddInt32(&p.workerCount, 1)
    w := &worker{pool: p}
    w.run() // run 待實現
  }
}

注意,這裏會校驗協程的數量,並保證不會超過指定的容量。

worker 的 run 方法中,首先創建一個協程,在協程裏不斷消費 task pool 裏的 task:

func (w *worker) run() {
  go func() {
    for {
      var t *task
      w.pool.taskLock.Lock()

      // 從 task pool 獲取 task
      if w.pool.taskHead != nil {
        t = w.pool.taskHead
        w.pool.taskHead = t.next;
      }

      // 如果沒有任何task,則關閉該worker
      if t == nil {
        atomic.AddInt32(&w.pool.workerCount, -1)
        w.pool.taskLock.Unlock()
        return
      }
      w.pool.taskLock.Unlock()

      // 執行函數
      t.f()
    }
  }
}

在 for 循環裏,有三步操作:

  1. 1. 從 task pool 獲取 task

  2. 2. 如果獲取不到 task,則 worker 直接退出,退出前將 workerCount 計數減一

  3. 3. 獲取到 task 以後,執行該 task

至此,一個最基本的協程池的核心邏輯都有了。

在實際生產環境的線程池中,在內存上做了很大的優化。因爲 task 和 worker 對象的創建和銷燬非常頻繁,會頻繁地觸發 GC,可以通過 sync.Pool 去管理,以減少不必要的分配和銷燬。

還有一塊涉及到異常處理的優化,比如在啓動 goroutine 時設置 defer function 去捕獲異常並打印函數調用棧:

defer func() {
  if r := recover(); r != nil {
    msg := fmt.Sprintf("GOPOOL: panic %v: %s", r, debug.Stack())
    logger.Errorf(msg)
  }
}()

到這裏,協程池就叨完了,完整的代碼參考 bytedance/gopkg: Github 下的 util/gopool 目錄。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ktMff61A9vvqeRGkGwQnwg