Golang 實現協程池

Go 協程池

解決的問題:

項目地址 https://github.com/gofish2020/easygpool 歡迎 Fork & Star

邏輯圖

代碼解析

整個代碼只有 500 行,簡單易學

定義協程池結構體

/*
Pool:用來管理協程對象 *goWorker
*/
type Pool struct {
	// 協程對象最大個數
	capactiy int32
	// 活躍的協程對象個數
	running atomic.Int32

	// 阻塞模式
	waiting atomic.Int32 // 阻塞等待的數量
	cond    *sync.Cond

	// Pool是否關閉
	closed chanstruct{}
	// 配置信息
	option *Options

	// 回收對象
	victim sync.Pool
	// 操作local對象的鎖
	lock sync.Locker
	// 協程對象緩存
	local goWorkerCache

	// 判斷goPurge是否已經結束
	purgeDone int32
	// 等待協程對象停止(包括協程池 和用戶協程)
	allRunning *wait.Wait
}

構建協程池對象

// capacity: -1表示不限定 >0表示最大的協程個數
func NewPool(capacity int32, opts ...Option) *Pool {

	option := new(Options)
	for _, opt := range opts {
		opt(option)
	}

	//如果啓用清理,清理間隔的值需要 >0
	if !option.DisablePurge {
		if option.MaxIdleTime <= 0 {
			option.MaxIdleTime = maxIdleTime
		}
	}

	// 日誌打印器
	if option.Logger == nil {
		option.Logger = defaultLogger
	}

	p := Pool{
		capactiy:   capacity,
		closed:     make(chanstruct{}),
		lock:       syncx.NewSpinLock(),
		option:     option,
		allRunning: &wait.Wait{},
		local:      newCacheStack(),
	}

	p.victim.New = func() any {
		return newGoWorker(&p)
	}
	p.cond = sync.NewCond(p.lock)

	go p.goPurge()
	return &p
}
type Options struct {

	// 禁用清理
	DisablePurge bool

	// 協程池中的協程如果長時間不執行任務 超過 MaxIdleTime 就會被清理掉
	MaxIdleTime time.Duration

	// NonBlocking = true非阻塞模式
	// NonBlocking = false 阻塞模式,下面的MaxBlockingTasks參數纔有作用
	NonBlocking bool

	// 阻塞模式:當NonBlocking=false的時候,最多阻塞的協程數 MaxBlockingTasks, MaxBlockingTasks = 0表示可以無限阻塞
	MaxBlockingTasks int32

	// 日誌打印器
	Logger Logger

	// 協程panic的時候調用
	PanicHandler func(any)
}
// spinlock 本質就是一個uint32的正整數
type spinLock uint32

const maxBackoff = 16

func (sl *spinLock) Lock() {
	backoff := 1
	for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
		// 這個的意思就是暫時讓出時間片 1次 2次 4次 8次 16次,backoff 數值越大,讓出的時間片的次數也就越多(最大爲16)
		for i := 0; i < backoff; i++ {
			runtime.Gosched()
		}
		if backoff < maxBackoff {
			backoff <<= 1// backoff *= 2
		}
	}
}

// 修改爲0
func (sl *spinLock) Unlock() {
	atomic.StoreUint32((*uint32)(sl), 0)
}

// 自旋鎖
func NewSpinLock() sync.Locker {
	returnnew(spinLock)
}

向協程池提交任務

// 外部提交任務
func (p *Pool) Submit(task TaskFunc) error {

	if p.IsClosed() {
		return ErrPoolClosed
	}

	w, err := p.getGoWorker()
	if err != nil {
		return err
	}
	w.exec(task)
	returnnil
}
func (p *Pool) getGoWorker() (*goWorker, error) {
	// 上鎖(避免併發問題)
	p.allRunning.Add(1)
	p.lock.Lock()
	deferfunc() {
		p.lock.Unlock()
		p.allRunning.Add(-1)
	}()

	for {

		if p.IsClosed() {
			returnnil, ErrPoolClosed
		}

		// 1.從local緩衝中獲取
		if w := p.local.detach(); w != nil {
			return w, nil
		}

		// 2. 從victim中獲取(新建一個)
		if p.Cap() == -1 || p.Cap() > p.running.Load() { // 說明還有容量
			raw := p.victim.Get()
			w, ok := raw.(*goWorker)
			if !ok {
				returnnil, errors.New("victim cache data is wrong type")
			}
			w.start()
			return w, nil
		}

		//3. 執行到這裏,說明沒有空閒的協程對象
		//  非阻塞模式 or 阻塞模式(但是阻塞的太多了)
		if p.option.NonBlocking || (p.option.MaxBlockingTasks != 0 && p.Waiting() >= p.option.MaxBlockingTasks) {
			returnnil, ErrPoolOverload
		}

		//4. 阻塞等待
		p.waiting.Add(1)
		p.cond.Wait() // 這裏會對p.lock解鎖,然後阻塞等待;被喚醒後,又對p.lock上鎖
		p.waiting.Add(-1)
	}

}

這裏額外說一點:p.cond.Wait() : 在getGoWorker() 函數入口處,我們執行了 p.lock.Lock()函數,只有在 getGoWorker退出的時候,纔會釋放鎖p.lock.Unlock()

那一旦阻塞,導致鎖沒有釋放,其他的協程調用getGoWorker() 函數,不就直接阻塞在 p.lock.Lock()處了,而不是 p.cond.Wait()???

這個要看下p.cond.Wait()的內部代碼

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
	return &Cond{L: l}
}

func (c *Cond) Wait() {
	c.checker.check()
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()
}

可以看到內部有對 c.L.Unlock鎖解鎖,runtime_notifyListWait(&c.notify, t)阻塞等待,當有信號以後,會執行c.L.Lock()再上鎖。所以在p.cond.Wait()的時候,【鎖其實處於解鎖的狀態】

Ps:c.L 變量的賦值在 p.cond = sync.NewCond(p.lock)

用協程對象執行任務

type TaskFunc func()

type goWorker struct {
	taskChan chan TaskFunc
	pool     *Pool

	// 最後一次使用的時間
	lastUsedTime time.Time
}

func newGoWorker(pool *Pool) *goWorker {
	gw := goWorker{}
	gw.pool = pool
	gw.taskChan = make(chan TaskFunc)
	return &gw
}

// 啓動協程(1.等待任務 2.任務處理完成,自動回收到local中)
func (gw *goWorker) start() {
	gw.pool.running.Add(1)
	gw.pool.allRunning.Add(1)
	gofunc() {
		deferfunc() {
			gw.pool.running.Add(-1)
			gw.pool.allRunning.Add(-1)
			gw.pool.victim.Put(gw) // 執行失敗,回收到victim中

			if p := recover(); p != nil {
				handler := gw.pool.option.PanicHandler
				if handler != nil {
					handler(p)
				} else {
					gw.pool.option.Logger.Printf("*goWorker panic err:%+v\n stack info: %s\n", p, debug.Stack())
				}
			}
			gw.pool.cond.Signal()
		}()

		for task := range gw.taskChan { // 阻塞等待任務
			if task == nil {
				return
			}
			task()
			// 執行成功,自動回收到local中
			if !gw.pool.recycle(gw) {
				return
			}
		}
	}()
}

func (gw *goWorker) stop() {
	gw.taskChan <- nil
}

func (gw *goWorker) exec(task TaskFunc) {
	gw.taskChan <- task
}

動態調整協程對象數量

調整後,記得發送p.cond.Broadcast()喚醒阻塞等待的任務

// 調整容量
func (p *Pool) Tune(capacity int32) {
	oldCap := p.Cap()
	if capacity < 0 {
		capacity = -1
	}
	if oldCap == capacity {
		return
	}

	atomic.StoreInt32(&p.capactiy, capacity)

	if p.Cap() < 0 { // 調整爲無限容量
		p.cond.Broadcast()
		return
	}

	// 調整的容量大於之前的容量(說明有更多的空閒)
	if oldCap > 0 && p.Cap() > oldCap {
		p.cond.Broadcast()
	}
}

關閉協程池

分爲阻塞關閉阻塞超時關閉 allRunning: 表示 【協程池中的協程】 + 【阻塞等待中的用戶協程】

func (p *Pool) Close() {
	close(p.closed)

	p.lock.Lock()
	p.local.reset()
	p.lock.Unlock()
	p.cond.Broadcast()

	p.allRunning.Wait()
}

func (p *Pool) CloseWithTimeOut(timeout time.Duration) error {
	close(p.closed)

	p.lock.Lock()
	p.local.reset()
	p.lock.Unlock()
	p.cond.Broadcast()

	p.allRunning.WaitWithTimeOut(timeout)

	if p.Running() != 0 || p.Waiting() != 0 || (!p.option.DisablePurge && atomic.LoadInt32(&p.purgeDone) == 0) {
		return ErrTimeout
	}
	returnnil
}

這裏我們自己實現了一個WaitGroup,擁有超時等待的功能

/*
對系統 WaitGroup的封裝
*/
type Wait struct {
	wait sync.WaitGroup
}

func (w *Wait) Add(delta int) {
	w.wait.Add(delta)
}

func (w *Wait) Done() {
	w.wait.Done()
}

func (w *Wait) Wait() {
	w.wait.Wait()
}

// 超時等待
func (w *Wait) WaitWithTimeOut(timeout time.Duration) bool {

	ch := make(chanstruct{})
	gofunc() {
		deferclose(ch)
		w.Wait()
	}()

	select {
	case <-ch:
		returnfalse// 正常
	case <-time.After(timeout):
		returntrue// 超時
	}
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/6ZkUt9UtvVDXZktMKiBuJg