Golang 實現協程池
Go 協程池
解決的問題:
-
當需要創建大量的
goroutine
的時候,如果不限定goroutine
的數量,將是對程序的巨大災難 -
使用完的
goroutinue
可以複用繼續執行下一個任務(而不是立即銷燬),如果每次都是創建新goroutinue
執行任務,頻繁的創建銷燬goroutinue
導致利用率低下
項目地址 https://github.com/gofish2020/easygpool 歡迎 Fork & Star
邏輯圖
代碼解析
整個代碼只有 500 行,簡單易學
定義協程池結構體
/*
Pool:用來管理協程對象 *goWorker
*/
type Pool struct {
// 協程對象最大個數
capactiy int32
// 活躍的協程對象個數
running atomic.Int32
// 阻塞模式
waiting atomic.Int32 // 阻塞等待的數量
cond *sync.Cond
// Pool是否關閉
closed chanstruct{}
// 配置信息
option *Options
// 回收對象
victim sync.Pool
// 操作local對象的鎖
lock sync.Locker
// 協程對象緩存
local goWorkerCache
// 判斷goPurge是否已經結束
purgeDone int32
// 等待協程對象停止(包括協程池 和用戶協程)
allRunning *wait.Wait
}
-
capacity: -1 表示不限制協程中協程數量
-
waiting:表示等待協程池中有空閒的協程對象前,外部阻塞的協程數量(阻塞模式的時候,纔會操作該值)
-
cond: 條件變量,用於阻塞等待協程池中協程對象
-
victim:表示釋放的協程對象
-
lock: 目的在於操作
local
前上鎖,因爲local
不是併發安全的 -
local:表示緩存的協程對象
-
allRunning:爲了實現關閉時的超時等待,對執行的協程計數(包括:協程池中的協程 和 外部的調用協程)
構建協程池對象
// capacity: -1表示不限定 >0表示最大的協程個數
func NewPool(capacity int32, opts ...Option) *Pool {
option := new(Options)
for _, opt := range opts {
opt(option)
}
//如果啓用清理,清理間隔的值需要 >0
if !option.DisablePurge {
if option.MaxIdleTime <= 0 {
option.MaxIdleTime = maxIdleTime
}
}
// 日誌打印器
if option.Logger == nil {
option.Logger = defaultLogger
}
p := Pool{
capactiy: capacity,
closed: make(chanstruct{}),
lock: syncx.NewSpinLock(),
option: option,
allRunning: &wait.Wait{},
local: newCacheStack(),
}
p.victim.New = func() any {
return newGoWorker(&p)
}
p.cond = sync.NewCond(p.lock)
go p.goPurge()
return &p
}
option *Options
是可選的配置項
type Options struct {
// 禁用清理
DisablePurge bool
// 協程池中的協程如果長時間不執行任務 超過 MaxIdleTime 就會被清理掉
MaxIdleTime time.Duration
// NonBlocking = true非阻塞模式
// NonBlocking = false 阻塞模式,下面的MaxBlockingTasks參數纔有作用
NonBlocking bool
// 阻塞模式:當NonBlocking=false的時候,最多阻塞的協程數 MaxBlockingTasks, MaxBlockingTasks = 0表示可以無限阻塞
MaxBlockingTasks int32
// 日誌打印器
Logger Logger
// 協程panic的時候調用
PanicHandler func(any)
}
lock
是自定義實現的輕量自旋鎖
// spinlock 本質就是一個uint32的正整數
type spinLock uint32
const maxBackoff = 16
func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
// 這個的意思就是暫時讓出時間片 1次 2次 4次 8次 16次,backoff 數值越大,讓出的時間片的次數也就越多(最大爲16)
for i := 0; i < backoff; i++ {
runtime.Gosched()
}
if backoff < maxBackoff {
backoff <<= 1// backoff *= 2
}
}
}
// 修改爲0
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
// 自旋鎖
func NewSpinLock() sync.Locker {
returnnew(spinLock)
}
向協程池提交任務
-
檢測協程池是否已經關閉
p.IsClosed()
-
核心函數
p.getGoWorker()
獲取一個協程對象執行task TaskFunc
// 外部提交任務
func (p *Pool) Submit(task TaskFunc) error {
if p.IsClosed() {
return ErrPoolClosed
}
w, err := p.getGoWorker()
if err != nil {
return err
}
w.exec(task)
returnnil
}
-
在
getGoWorker()
函數內部 -
先上鎖保證操作
local
的併發安全 -
從
local
中獲取一個協程對象 -
如果沒有獲取到,正在運行的協程對象數量沒有超過容量
capacity
的前提下,創建新的協程對象 -
如果都沒有成功,在阻塞模式下,外部的調用
Submit
的協程就會被阻塞在p.cond.Wait()
,等待空閒協程對象 -
如果是非阻塞模式,直接返回
ErrPoolOverload
超載錯誤
func (p *Pool) getGoWorker() (*goWorker, error) {
// 上鎖(避免併發問題)
p.allRunning.Add(1)
p.lock.Lock()
deferfunc() {
p.lock.Unlock()
p.allRunning.Add(-1)
}()
for {
if p.IsClosed() {
returnnil, ErrPoolClosed
}
// 1.從local緩衝中獲取
if w := p.local.detach(); w != nil {
return w, nil
}
// 2. 從victim中獲取(新建一個)
if p.Cap() == -1 || p.Cap() > p.running.Load() { // 說明還有容量
raw := p.victim.Get()
w, ok := raw.(*goWorker)
if !ok {
returnnil, errors.New("victim cache data is wrong type")
}
w.start()
return w, nil
}
//3. 執行到這裏,說明沒有空閒的協程對象
// 非阻塞模式 or 阻塞模式(但是阻塞的太多了)
if p.option.NonBlocking || (p.option.MaxBlockingTasks != 0 && p.Waiting() >= p.option.MaxBlockingTasks) {
returnnil, ErrPoolOverload
}
//4. 阻塞等待
p.waiting.Add(1)
p.cond.Wait() // 這裏會對p.lock解鎖,然後阻塞等待;被喚醒後,又對p.lock上鎖
p.waiting.Add(-1)
}
}
這裏額外說一點:p.cond.Wait()
: 在getGoWorker()
函數入口處,我們執行了 p.lock.Lock()
函數,只有在 getGoWorker
退出的時候,纔會釋放鎖p.lock.Unlock()
。
那一旦阻塞,導致鎖沒有釋放,其他的協程調用getGoWorker()
函數,不就直接阻塞在 p.lock.Lock()
處了,而不是 p.cond.Wait()
???
這個要看下p.cond.Wait()
的內部代碼
// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
可以看到內部有對 c.L.Unlock
鎖解鎖,runtime_notifyListWait(&c.notify, t)
阻塞等待,當有信號以後,會執行c.L.Lock()
再上鎖。所以在p.cond.Wait()
的時候,【鎖其實處於解鎖的狀態】。
Ps:c.L
變量的賦值在 p.cond = sync.NewCond(p.lock)
用協程對象執行任務
-
獲取到協程對象後,就要開始執行任務了
w.exec(task)
。 其實這裏用到了生產者消費者模型,執行任務就是gw.taskChan <- task
, 將task
保存到chan
中(生產者). -
在
goWorker
協程對象的start
函數會啓動一個goroutine
,監視gw.taskChan
的任務(消費者) -
任務執行完成以後,會將當前的
goWorker
協程對象,歸還到local
中,等待其他的調度;如果任務執行中出現了panic
orlocal
已滿,則會將goWorker
協程對象,放入到victim
中
type TaskFunc func()
type goWorker struct {
taskChan chan TaskFunc
pool *Pool
// 最後一次使用的時間
lastUsedTime time.Time
}
func newGoWorker(pool *Pool) *goWorker {
gw := goWorker{}
gw.pool = pool
gw.taskChan = make(chan TaskFunc)
return &gw
}
// 啓動協程(1.等待任務 2.任務處理完成,自動回收到local中)
func (gw *goWorker) start() {
gw.pool.running.Add(1)
gw.pool.allRunning.Add(1)
gofunc() {
deferfunc() {
gw.pool.running.Add(-1)
gw.pool.allRunning.Add(-1)
gw.pool.victim.Put(gw) // 執行失敗,回收到victim中
if p := recover(); p != nil {
handler := gw.pool.option.PanicHandler
if handler != nil {
handler(p)
} else {
gw.pool.option.Logger.Printf("*goWorker panic err:%+v\n stack info: %s\n", p, debug.Stack())
}
}
gw.pool.cond.Signal()
}()
for task := range gw.taskChan { // 阻塞等待任務
if task == nil {
return
}
task()
// 執行成功,自動回收到local中
if !gw.pool.recycle(gw) {
return
}
}
}()
}
func (gw *goWorker) stop() {
gw.taskChan <- nil
}
func (gw *goWorker) exec(task TaskFunc) {
gw.taskChan <- task
}
動態調整協程對象數量
調整後,記得發送p.cond.Broadcast()
喚醒阻塞等待的任務
// 調整容量
func (p *Pool) Tune(capacity int32) {
oldCap := p.Cap()
if capacity < 0 {
capacity = -1
}
if oldCap == capacity {
return
}
atomic.StoreInt32(&p.capactiy, capacity)
if p.Cap() < 0 { // 調整爲無限容量
p.cond.Broadcast()
return
}
// 調整的容量大於之前的容量(說明有更多的空閒)
if oldCap > 0 && p.Cap() > oldCap {
p.cond.Broadcast()
}
}
關閉協程池
分爲阻塞關閉 和 阻塞超時關閉 allRunning: 表示 【協程池中的協程】 + 【阻塞等待中的用戶協程】
func (p *Pool) Close() {
close(p.closed)
p.lock.Lock()
p.local.reset()
p.lock.Unlock()
p.cond.Broadcast()
p.allRunning.Wait()
}
func (p *Pool) CloseWithTimeOut(timeout time.Duration) error {
close(p.closed)
p.lock.Lock()
p.local.reset()
p.lock.Unlock()
p.cond.Broadcast()
p.allRunning.WaitWithTimeOut(timeout)
if p.Running() != 0 || p.Waiting() != 0 || (!p.option.DisablePurge && atomic.LoadInt32(&p.purgeDone) == 0) {
return ErrTimeout
}
returnnil
}
這裏我們自己實現了一個WaitGroup
,擁有超時等待的功能
/*
對系統 WaitGroup的封裝
*/
type Wait struct {
wait sync.WaitGroup
}
func (w *Wait) Add(delta int) {
w.wait.Add(delta)
}
func (w *Wait) Done() {
w.wait.Done()
}
func (w *Wait) Wait() {
w.wait.Wait()
}
// 超時等待
func (w *Wait) WaitWithTimeOut(timeout time.Duration) bool {
ch := make(chanstruct{})
gofunc() {
deferclose(ch)
w.Wait()
}()
select {
case <-ch:
returnfalse// 正常
case <-time.After(timeout):
returntrue// 超時
}
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/6ZkUt9UtvVDXZktMKiBuJg