Go 實現 Master-Worker 任務模式

簡介

將一組任務分別調度到多個 goroutine 上進行處理,處理之後將結果返回到主 goroutine 中進行合併整理, 減少一個 goroutine 處理任務的壓力。你可能會說,每個任務開一個 goroutine 不是更快,快確實快。但是一個 goroutine 初始的棧大小是 2k,並且在需要的時候可以擴展到 1GB,大量的 goroutine 還是很耗資源的。同時,大量的 goroutine 對於調度和垃圾回收的耗時還是會有影響的,因此,goroutine 並不是越多越好。舉個栗子,常見的數據庫連接池,總不能每個請求打過來都連接一次吧,當併發量小的時候無所謂,高併發的時候就會達到連接上限,拒絕服務。此時就需要對連接進行池化,當需要連接數據庫的時候從預先生成的連接池中拿,用完後返還,與本文講的是同樣的道理。

設計思路

代碼


代碼量不多,註釋很詳細,別的就不囉嗦了。

// Filename: main.go
func main() {
 pool := workerPool.NewPool(10, 50)
 defer pool.Release()

 // 10個任務
 pool.WaitTotal(10)

 for i := 0; i < 10; i++ {
  count := i
  pool.AddTask(func() {
   fmt.Printf(" Worker! Number %d\n", count)
   defer pool.Done()
  })
 }

 // 等待任務處理完畢
 pool.Wait()
}
// Filename: pool.go
type Pool struct {
 // 工作worker數量
 workerNum int
 // 任務隊列(全局公用)
 taskQueue chan Task
 // 調度器
 dispatcher *dispatcher

 wg sync.WaitGroup
}

func NewPool(workerNum int, taskQueueLen int) *Pool {
 taskQueue := make(chan Task, taskQueueLen)
 workerQueue := make(chan *worker, workerNum)

 return &Pool{
  taskQueue:  taskQueue,
  dispatcher: newDispatcher(workerQueue, taskQueue),
 }
}

// 投遞任務
func (p *Pool) AddTask(task Task) {
 p.taskQueue <- task
}

// wg Done
func (p *Pool) Done() {
 p.wg.Done()
}

// wg add total
func (p *Pool) WaitTotal(total int) {
 p.wg.Add(total)
}

// wg wait
func (p *Pool) Wait() {
 p.wg.Wait()
}

// release resources
func (p *Pool) Release() {
 p.dispatcher.stop <- struct{}{}
 <-p.dispatcher.stop
}
// Filename: dispatcher.go
// 調度器
type dispatcher struct {
 // 空閒的worker隊列
 workerQueue chan *worker
 // 待處理的任務隊隊列
 taskQueue chan Task
 // 停止
 stop chan struct{}
}

func newDispatcher(workerQueue chan *worker, taskQueue chan Task) *dispatcher {
 d := &dispatcher{
  workerQueue: workerQueue,
  taskQueue:   taskQueue,
  stop:        make(chan struct{}),
 }

 for i := 0; i < cap(d.workerQueue); i++ {
  // 生產worker
  worker := newWorker(d.workerQueue)
  // 讓worker監控自己的任務隊列,有就處理,沒就阻塞
  worker.start()
 }

 // 從任務隊列拿任務,拿到後從worker隊列找一個空閒worker,投遞給它
 go d.dispatcher()
 return d
}

// 從任務隊列讀數據投遞到worker工作隊列
func (d *dispatcher) dispatcher() {
 for {
  select {
  // 獲取任務
  case task := <-d.taskQueue:
   // 獲取可用的worker,如果沒有了這裏會阻塞等待
   worker := <-d.workerQueue
   // 拿到worker 投遞任務給它。worker處理完會自動塞回worker隊列
   worker.taskQueue <- task
  case <-d.stop:
   // 任務結束,通知所有的worker停止
   for i := 0; i < cap(d.workerQueue); i++ {
    // 如果所有的worker都在忙,還沒出完,這裏會阻塞等處理完畢
    worker := <-d.workerQueue
    // 通知worker退出
    worker.stop <- struct{}{}

    // 這句啥意思?阻塞等待worker退出,worker退出後會再發一個消息,確保一定退出
    <-worker.stop
   }

   // 同上一句註釋
   d.stop <- struct{}{}
   return
  }
 }
}
// Filename: worker.go
type worker struct {
 // 全局公用worker隊列
 workerQueue chan *worker
 // 自己獨有的工作隊列,由調度器投遞任務進來
 taskQueue chan Task
 stop      chan struct{}
}

func newWorker(workerQueue chan *worker) *worker {
 return &worker{
  workerQueue: workerQueue,
  taskQueue:   make(chan Task),
  stop:        make(chan struct{}),
 }
}

func (w *worker) start() {
 go func() {
  var task Task
  for {
   // 活兒幹完了 把空閒的worker放回去,這裏不要忘了,不然就沒worker可用了...
   w.workerQueue <- w

   select {
   case task = <-w.taskQueue:
    task() // 閉包
   case <-w.stop:
    w.stop <- struct{}{}
    return
   }
  }
 }()
}
// Filename: task.go
type Task func()
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/WBA5R45DdscMklABQMnrGg