Go 協程池 -2-: 如何實現協程池
線程池的出現,是因爲頻繁地創建和銷燬線程開銷比較大。通過線程池,一個線程不僅僅是處理一個任務就被銷燬,而是可以處理多個任務,任務被處理完時才被銷燬。下圖是 Java ExecutorService 類的結構:
Java ExecutorService
協程池的作用是一樣的,實現原理當然也一樣。一個協程池的數據結構,在邏輯上必須 3 類數據:
-
協程池的容量
-
task pool 任務池
-
worker pool 協程池
task pool 比較容易實現,通過一個加鎖的鏈表就可以實現。
對於 worker pool,由於協程對用戶不暴露任何 ID 和管理的 API,啓動後無法從外部主動管理,無法進行池化。我們能做的只有記錄並限制協程的數量。
最常用的方法是創建一個緩衝區大小爲 N 的 channel,創建協程時,就向 channel 發送一條數據;協程退出時,就消費一條數據;通過 len 檢查協程的數量;
另一種方法是通過一個原子變量進行控制,創建協程時,將原子變量加一;協程退出時,將原子變量減一;通過 Load 檢查協程的數量;我們後面也採用這種方式。
除了這 3 塊數據,還有一點比較關鍵:活躍的協程能夠不斷從 task pool 獲取新的任務,以達到重複利用的效果,因此協程需要獲得 task pool 的一個指針。
基於上面提到的理念,我們先把數據結構創建出來:
type pool struct {
cap int32 // 容量
workerCount int32 // worker數量
taskHead *task // task pool 頭指針
taskTail *task // task pool 尾指針
taskLock sync.Mutex // task lock
}
type worker struct {
p *pool // 指向pool,需要獲取task
}
type task struct {
f func() // 要執行的函數
next *task // 指向下一個task
}
在使用層面上,pool 需要對外暴露一個方法,以實現類似於 go func(... 的功能。在我們的定義裏,它應該包含兩部分邏輯: 1) 將任務添加到 task pool, 供 worker 消費; 2) 按需創建新的 worker。其邏輯可以這樣寫:
func (p *pool) Go(f func()) {
t := &task{f: f}
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t;
}
p.taskLock.Unlock()
// 創建新worker
if (atomic.LoadInt32(&p.workerCount) < p.cap) {
atomic.AddInt32(&p.workerCount, 1)
w := &worker{pool: p}
w.run() // run 待實現
}
}
注意,這裏會校驗協程的數量,並保證不會超過指定的容量。
worker 的 run 方法中,首先創建一個協程,在協程裏不斷消費 task pool 裏的 task:
func (w *worker) run() {
go func() {
for {
var t *task
w.pool.taskLock.Lock()
// 從 task pool 獲取 task
if w.pool.taskHead != nil {
t = w.pool.taskHead
w.pool.taskHead = t.next;
}
// 如果沒有任何task,則關閉該worker
if t == nil {
atomic.AddInt32(&w.pool.workerCount, -1)
w.pool.taskLock.Unlock()
return
}
w.pool.taskLock.Unlock()
// 執行函數
t.f()
}
}
}
在 for 循環裏,有三步操作:
-
1. 從 task pool 獲取 task
-
2. 如果獲取不到 task,則 worker 直接退出,退出前將 workerCount 計數減一
-
3. 獲取到 task 以後,執行該 task
至此,一個最基本的協程池的核心邏輯都有了。
在實際生產環境的線程池中,在內存上做了很大的優化。因爲 task 和 worker 對象的創建和銷燬非常頻繁,會頻繁地觸發 GC,可以通過 sync.Pool 去管理,以減少不必要的分配和銷燬。
還有一塊涉及到異常處理的優化,比如在啓動 goroutine 時設置 defer function 去捕獲異常並打印函數調用棧:
defer func() {
if r := recover(); r != nil {
msg := fmt.Sprintf("GOPOOL: panic %v: %s", r, debug.Stack())
logger.Errorf(msg)
}
}()
到這裏,協程池就叨完了,完整的代碼參考 bytedance/gopkg: Github 下的 util/gopool 目錄。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ktMff61A9vvqeRGkGwQnwg