Go 實現後臺任務調度系統
一、背景
平常我們在開發 API 的時候,前端傳遞過來的大批數據需要經過後端處理,如果後端處理的速度快,前端響應就快,反之則很慢,影響用戶體驗。針對這種場景我們一般都是後臺異步處理,不需要前端等待所有的都執行完才返回。爲了解決這一問題,需要我們自己實現後臺任務調度系統。
二、任務調度器實現
poll.go
package poller
import (
"context"
"fmt"
"log"
"sync"
"time"
)
type Poller struct {
routineGroup *goroutineGroup // 併發控制
workerNum int // 記錄同時在運行的最大goroutine數
sync.Mutex
ready chan struct{} // 某個goroutine已經準備好了
metric *metric // 統計當前在運行中的goroutine數量
}
func NewPoller(workerNum int) *Poller {
return &Poller{
routineGroup: newRoutineGroup(),
workerNum: workerNum,
ready: make(chan struct{}, 1),
metric: newMetric(),
}
}
// 調度器
func (p *Poller) schedule() {
p.Lock()
defer p.Unlock()
if int(p.metric.BusyWorkers()) >= p.workerNum {
return
}
select {
case p.ready <- struct{}{}: // 只要滿足當前goroutine數量小於最大goroutine數量 那麼就通知poll去調度goroutine執行任務
default:
}
}
func (p *Poller) Poll(ctx context.Context) error {
for {
// step01
p.schedule() // 調度
select {
case <-p.ready: // goroutine準備好之後 這裏就會有消息
case <-ctx.Done():
return nil
}
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
default:
// step02
task, err := p.fetch(ctx) // 獲取任務
if err != nil {
log.Println("fetch task error:", err.Error())
break
}
fmt.Println(task)
p.metric.IncBusyWorker() // 當前正在運行的goroutine+1
// step03
p.routineGroup.Run(func() { // 執行任務
if err := p.execute(ctx, task); err != nil {
log.Println("execute task error:", err.Error())
}
})
break LOOP
}
}
}
}
func (p *Poller) fetch(ctx context.Context) (string, error) {
time.Sleep(1000 * time.Millisecond)
return "task", nil
}
func (p *Poller) execute(ctx context.Context, task string) error {
defer func() {
p.metric.DecBusyWorker() // 執行完成之後 goroutine數量-1
p.schedule() // 重新調度下一個goroutine去執行任務 這一步是必須的
}()
return nil
}
metric.go
package poller
import "sync/atomic"
type metric struct {
busyWorkers uint64
}
func newMetric() *metric {
return &metric{}
}
func (m *metric) IncBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, 1)
}
func (m *metric) DecBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}
func (m *metric) BusyWorkers() uint64 {
return atomic.LoadUint64(&m.busyWorkers)
}
goroutine_group.go
package poller
import "sync"
type goroutineGroup struct {
waitGroup sync.WaitGroup
}
func newRoutineGroup() *goroutineGroup {
return new(goroutineGroup)
}
func (g *goroutineGroup) Run(fn func()) {
g.waitGroup.Add(1)
go func() {
defer g.waitGroup.Done()
fn()
}()
}
func (g *goroutineGroup) Wait() {
g.waitGroup.Wait()
}
三、測試
package main
import (
"context"
"fmt"
"ta/poller"
"go.uber.org/goleak"
"testing"
)
func TestMain(m *testing.M) {
fmt.Println("start")
goleak.VerifyTestMain(m)
}
func TestPoller(t *testing.T) {
producer := poller.NewPoller(5)
producer.Poll(context.Background())
}
結果:
四、總結
大家用別的方式也可以實現,核心要點就是控制併發節奏,防止大量請求打到task service
,在這裏起到核心作用的就是schedule
,它控制着整個任務系統的調度。同時還封裝了WaitGroup
,這在大多數開源代碼中都比較常見,大家可以去嘗試。另外就是test case
一定得跟上,防止goroutine
泄漏。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/hTyG-gvsqvXJDlJtHF6vNQ