Go 實現後臺任務調度系統

一、背景

平常我們在開發 API 的時候,前端傳遞過來的大批數據需要經過後端處理,如果後端處理的速度快,前端響應就快,反之則很慢,影響用戶體驗。針對這種場景我們一般都是後臺異步處理,不需要前端等待所有的都執行完才返回。爲了解決這一問題,需要我們自己實現後臺任務調度系統。

二、任務調度器實現

poll.go

package poller

import (
 "context"
 "fmt"
 "log"
 "sync"
 "time"
)

type Poller struct {
 routineGroup *goroutineGroup // 併發控制
 workerNum    int // 記錄同時在運行的最大goroutine數

 sync.Mutex
 ready  chan struct{} // 某個goroutine已經準備好了
 metric *metric // 統計當前在運行中的goroutine數量
}

func NewPoller(workerNum int) *Poller {
 return &Poller{
  routineGroup: newRoutineGroup(),
  workerNum:    workerNum,
  ready:        make(chan struct{}, 1),
  metric:       newMetric(),
 }
}

// 調度器
func (p *Poller) schedule() {
 p.Lock()
 defer p.Unlock()
 if int(p.metric.BusyWorkers()) >= p.workerNum {
  return
 }

 select {
 case p.ready <- struct{}{}: // 只要滿足當前goroutine數量小於最大goroutine數量 那麼就通知poll去調度goroutine執行任務
 default:
 }
}

func (p *Poller) Poll(ctx context.Context) error {
 for {
  // step01
  p.schedule() // 調度

  select {
  case <-p.ready: // goroutine準備好之後 這裏就會有消息
  case <-ctx.Done():
   return nil
  }

 LOOP:
  for {
   select {
   case <-ctx.Done():
    break LOOP
   default:
    // step02
    task, err := p.fetch(ctx) // 獲取任務
    if err != nil {
     log.Println("fetch task error:", err.Error())
     break
    }
    fmt.Println(task)
    p.metric.IncBusyWorker() // 當前正在運行的goroutine+1
    // step03
    p.routineGroup.Run(func() { // 執行任務
     if err := p.execute(ctx, task); err != nil {
      log.Println("execute task error:", err.Error())
     }
    })
    break LOOP
   }
  }
 }
}

func (p *Poller) fetch(ctx context.Context) (string, error) {
 time.Sleep(1000 * time.Millisecond)
 return "task", nil
}

func (p *Poller) execute(ctx context.Context, task string) error {
 defer func() {
  p.metric.DecBusyWorker() // 執行完成之後 goroutine數量-1
  p.schedule() // 重新調度下一個goroutine去執行任務 這一步是必須的
 }()
 return nil
}

metric.go

package poller

import "sync/atomic"

type metric struct {
 busyWorkers uint64
}

func newMetric() *metric {
 return &metric{}
}

func (m *metric) IncBusyWorker() uint64 {
 return atomic.AddUint64(&m.busyWorkers, 1)
}

func (m *metric) DecBusyWorker() uint64 {
 return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}

func (m *metric) BusyWorkers() uint64 {
 return atomic.LoadUint64(&m.busyWorkers)
}

goroutine_group.go

package poller

import "sync"

type goroutineGroup struct {
 waitGroup sync.WaitGroup
}

func newRoutineGroup() *goroutineGroup {
 return new(goroutineGroup)
}

func (g *goroutineGroup) Run(fn func()) {
 g.waitGroup.Add(1)

 go func() {
  defer g.waitGroup.Done()
  fn()
 }()
}

func (g *goroutineGroup) Wait() {
 g.waitGroup.Wait()
}

三、測試

package main

import (
 "context"
 "fmt"
 "ta/poller"
 "go.uber.org/goleak"
 "testing"
)

func TestMain(m *testing.M)  {
 fmt.Println("start")
 goleak.VerifyTestMain(m)
}

func TestPoller(t *testing.T) {
 producer := poller.NewPoller(5)
 producer.Poll(context.Background())
}

結果:

四、總結

大家用別的方式也可以實現,核心要點就是控制併發節奏,防止大量請求打到task service,在這裏起到核心作用的就是schedule,它控制着整個任務系統的調度。同時還封裝了WaitGroup,這在大多數開源代碼中都比較常見,大家可以去嘗試。另外就是test case一定得跟上,防止goroutine泄漏。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/hTyG-gvsqvXJDlJtHF6vNQ