Go 實現 Master-Worker 任務模式
簡介
將一組任務分別調度到多個 goroutine 上進行處理,處理之後將結果返回到主 goroutine 中進行合併整理, 減少一個 goroutine 處理任務的壓力。你可能會說,每個任務開一個 goroutine 不是更快,快確實快。但是一個 goroutine 初始的棧大小是 2k,並且在需要的時候可以擴展到 1GB,大量的 goroutine 還是很耗資源的。同時,大量的 goroutine 對於調度和垃圾回收的耗時還是會有影響的,因此,goroutine 並不是越多越好。舉個栗子,常見的數據庫連接池,總不能每個請求打過來都連接一次吧,當併發量小的時候無所謂,高併發的時候就會達到連接上限,拒絕服務。此時就需要對連接進行池化,當需要連接數據庫的時候從預先生成的連接池中拿,用完後返還,與本文講的是同樣的道理。
設計思路
代碼
代碼量不多,註釋很詳細,別的就不囉嗦了。
// Filename: main.go
func main() {
pool := workerPool.NewPool(10, 50)
defer pool.Release()
// 10個任務
pool.WaitTotal(10)
for i := 0; i < 10; i++ {
count := i
pool.AddTask(func() {
fmt.Printf(" Worker! Number %d\n", count)
defer pool.Done()
})
}
// 等待任務處理完畢
pool.Wait()
}
// Filename: pool.go
type Pool struct {
// 工作worker數量
workerNum int
// 任務隊列(全局公用)
taskQueue chan Task
// 調度器
dispatcher *dispatcher
wg sync.WaitGroup
}
func NewPool(workerNum int, taskQueueLen int) *Pool {
taskQueue := make(chan Task, taskQueueLen)
workerQueue := make(chan *worker, workerNum)
return &Pool{
taskQueue: taskQueue,
dispatcher: newDispatcher(workerQueue, taskQueue),
}
}
// 投遞任務
func (p *Pool) AddTask(task Task) {
p.taskQueue <- task
}
// wg Done
func (p *Pool) Done() {
p.wg.Done()
}
// wg add total
func (p *Pool) WaitTotal(total int) {
p.wg.Add(total)
}
// wg wait
func (p *Pool) Wait() {
p.wg.Wait()
}
// release resources
func (p *Pool) Release() {
p.dispatcher.stop <- struct{}{}
<-p.dispatcher.stop
}
// Filename: dispatcher.go
// 調度器
type dispatcher struct {
// 空閒的worker隊列
workerQueue chan *worker
// 待處理的任務隊隊列
taskQueue chan Task
// 停止
stop chan struct{}
}
func newDispatcher(workerQueue chan *worker, taskQueue chan Task) *dispatcher {
d := &dispatcher{
workerQueue: workerQueue,
taskQueue: taskQueue,
stop: make(chan struct{}),
}
for i := 0; i < cap(d.workerQueue); i++ {
// 生產worker
worker := newWorker(d.workerQueue)
// 讓worker監控自己的任務隊列,有就處理,沒就阻塞
worker.start()
}
// 從任務隊列拿任務,拿到後從worker隊列找一個空閒worker,投遞給它
go d.dispatcher()
return d
}
// 從任務隊列讀數據投遞到worker工作隊列
func (d *dispatcher) dispatcher() {
for {
select {
// 獲取任務
case task := <-d.taskQueue:
// 獲取可用的worker,如果沒有了這裏會阻塞等待
worker := <-d.workerQueue
// 拿到worker 投遞任務給它。worker處理完會自動塞回worker隊列
worker.taskQueue <- task
case <-d.stop:
// 任務結束,通知所有的worker停止
for i := 0; i < cap(d.workerQueue); i++ {
// 如果所有的worker都在忙,還沒出完,這裏會阻塞等處理完畢
worker := <-d.workerQueue
// 通知worker退出
worker.stop <- struct{}{}
// 這句啥意思?阻塞等待worker退出,worker退出後會再發一個消息,確保一定退出
<-worker.stop
}
// 同上一句註釋
d.stop <- struct{}{}
return
}
}
}
// Filename: worker.go
type worker struct {
// 全局公用worker隊列
workerQueue chan *worker
// 自己獨有的工作隊列,由調度器投遞任務進來
taskQueue chan Task
stop chan struct{}
}
func newWorker(workerQueue chan *worker) *worker {
return &worker{
workerQueue: workerQueue,
taskQueue: make(chan Task),
stop: make(chan struct{}),
}
}
func (w *worker) start() {
go func() {
var task Task
for {
// 活兒幹完了 把空閒的worker放回去,這裏不要忘了,不然就沒worker可用了...
w.workerQueue <- w
select {
case task = <-w.taskQueue:
task() // 閉包
case <-w.stop:
w.stop <- struct{}{}
return
}
}
}()
}
// Filename: task.go
type Task func()
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/WBA5R45DdscMklABQMnrGg