使用 Go Channel 構建可擴展的併發 worker pool

在本篇博文中,我們將深入探討如何在 Go 中構建一個可擴展的 Worker pool。該實現可有效地管理 Worker pool,以處理大量請求,同時根據負載動態調整 Worker 的數量。我們還會討論可能出現的問題以及如何避免它們。

概覽

要實現的 worker pool 應該具有以下功能:

實現

下面是完整的代碼,之後是每個部分的解釋和文檔。

Dispatcher

// interface.go 
package workerpool
import "context"
// WorkerLauncher is an interface for launching workers.
type WorkerLauncher interface {
 LaunchWorker(in chan Request, stopCh chan struct{})
}
// Dispatcher is an interface for managing the worker pool.
type Dispatcher interface {
 AddWorker(w WorkerLauncher)
 RemoveWorker(minWorkers int)
 LaunchWorker(id int, w WorkerLauncher)
 ScaleWorkers(minWorkers, maxWorkers, loadThreshold int)
 MakeRequest(Request)
 Stop(ctx context.Context)
}
// struct.go
package workerpool
import "time"
// Request represents a request to be processed by a worker.
type Request struct {
 Handler    RequestHandler
 Type       int
 Data       interface{}
 Timeout    time.Duration // Timeout duration for the request
 Retries    int           // Number of retries
 MaxRetries int           // Max number of retries
}
// RequestHandler defines a function type for handling requests.
type RequestHandler func(interface{}) error
// dispatcher.go
package workerpool
import (
 "context"
 "fmt"
 "sync"
 "time"
)
// ReqHandler is a map of request handlers, keyed by request type.
var ReqHandler = map[int]RequestHandler{
 1: func(data interface{}) error {
  return nil
 },
}
// dispatcher manages a pool of workers and distributes incoming requests among them.
type dispatcher struct {
 inCh        chan Request
 wg          *sync.WaitGroup
 mu          sync.Mutex
 workerCount int
 stopCh      chan struct{} // Channel to signal workers to stop
}
// AddWorker adds a new worker to the pool and increments the worker count.
func (d *dispatcher) AddWorker(w WorkerLauncher) {
 d.mu.Lock()
 defer d.mu.Unlock()
 d.workerCount++
 d.wg.Add(1)
 w.LaunchWorker(d.inCh, d.stopCh)
}
// RemoveWorker removes a worker from the pool if the worker count is greater than minWorkers.
func (d *dispatcher) RemoveWorker(minWorkers int) {
 d.mu.Lock()
 defer d.mu.Unlock()
 if d.workerCount > minWorkers {
  d.workerCount--
  d.stopCh <- struct{}{} // Signal a worker to stop
 }
}
// ScaleWorkers dynamically adjusts the number of workers based on the load.
func (d *dispatcher) ScaleWorkers(minWorkers, maxWorkers, loadThreshold int) {
 ticker := time.NewTicker(time.Microsecond)
 defer ticker.Stop()
 for range ticker.C {
  load := len(d.inCh) // Current load is the number of pending requests in the channel
  if load > loadThreshold && d.workerCount < maxWorkers {
   fmt.Println("Scaling Triggered")
   newWorker := &Worker{
    Wg:         d.wg,
    Id:         d.workerCount,
    ReqHandler: ReqHandler,
   }
   d.AddWorker(newWorker)
  } else if load < 0.75*loadThreshold && d.workerCount > minWorkers {
   fmt.Println("Reducing Triggered")
   d.RemoveWorker(minWorkers)
  }
 }
}
// LaunchWorker launches a worker and increments the worker count.
func (d *dispatcher) LaunchWorker(id int, w WorkerLauncher) {
 w.LaunchWorker(d.inCh, d.stopCh) // Pass stopCh to the worker
 d.mu.Lock()
 d.workerCount++
 d.mu.Unlock()
}
// MakeRequest adds a request to the input channel, or drops it if the channel is full.
func (d *dispatcher) MakeRequest(r Request) {
 select {
 case d.inCh <- r:
 default:
  // Handle the case when the channel is full
  fmt.Println("Request channel is full. Dropping request.")
  // Alternatively, you can log, buffer the request, or take other actions
 }
}
// Stop gracefully stops all workers, waiting for them to finish processing.
func (d *dispatcher) Stop(ctx context.Context) {
 fmt.Println("\nstop called")
 close(d.inCh) // Close the input channel to signal no more requests will be sent
 done := make(chan struct{})
 go func() {
  d.wg.Wait() // Wait for all workers to finish
  close(done)
 }()
 select {
 case <-done:
  fmt.Println("All workers stopped gracefully")
 case <-ctx.Done():
  fmt.Println("Timeout reached, forcing shutdown")
  // Forcefully stop all workers if timeout is reached
  for i := 0; i < d.workerCount; i++ {
   d.stopCh <- struct{}{}
  }
 }
 d.wg.Wait()
}
// NewDispatcher creates a new dispatcher with a buffered channel and a wait group.
func NewDispatcher(b int, wg *sync.WaitGroup, maxWorkers int) Dispatcher {
 return &dispatcher{
  inCh:   make(chan Request, b),
  wg:     wg,
  stopCh: make(chan struct{}, maxWorkers), // Buffered channel to prevent blocking on stop
 }
}

dispatcher 負責管理 worker,並在他們之間分配傳入的請求。它可以根據當前負載動態添加或移除 worker,並確保優雅地關閉所有 worker。

Worker

// worker.go
package workerpool
import (
 "context"
 "fmt"
 "sync"
 "time"
)
// Worker represents a worker that processes requests.
type Worker struct {
 Id         int
 Wg         *sync.WaitGroup
 ReqHandler map[int]RequestHandler
}
// LaunchWorker launches the worker to process incoming requests.
// It runs in a separate goroutine, continuously listening for incoming requests on the input channel.
// The worker gracefully stops when either the input channel is closed or it receives a stop signal.
func (w *Worker) LaunchWorker(in chan Request, stopCh chan struct{}) {
 go func() {
  defer w.Wg.Done()
  for {
   select {
   case msg, open := <-in:
    if !open {
     // If the channel is closed, stop processing and return
     // if we skip close channel check then after closing channel, 
     // worker keep reading empty values from closed channel.
     fmt.Println("Stopping worker:", w.Id)
     return
    }
    w.processRequest(msg)
    time.Sleep(1 * time.Microsecond) // Small delay to prevent tight loop
   case <-stopCh:
    fmt.Println("Stopping worker:", w.Id)
    return
   }
  }
 }()
}
// processRequest processes a single request.
func (w *Worker) processRequest(msg Request) {
 fmt.Printf("Worker %d processing request: %v\n", w.Id, msg)
 var handler RequestHandler
 var ok bool
 if handler, ok = w.ReqHandler[msg.Type]; !ok {
  fmt.Println("Handler not implemented: workerID:", w.Id)
 } else {
  if msg.Timeout == 0 {
   msg.Timeout = time.Duration(10 * time.Millisecond) // Default timeout
  }
  for attempt := 0; attempt <= msg.MaxRetries; attempt++ {
   var err error
   done := make(chan struct{})
   ctx, cancel := context.WithTimeout(context.Background(), msg.Timeout)
   defer cancel()
   go func() {
    err = handler(msg.Data)
    close(done)
   }()
   select {
   case <-done:
    if err == nil {
     return // Successfully processed
    }
    fmt.Printf("Worker %d: Error processing request: %v\n", w.Id, err)
   case <-ctx.Done():
    fmt.Printf("Worker %d: Timeout processing request: %v\n", w.Id, msg.Data)
   }
   fmt.Printf("Worker %d: Retry %d for request %v\n", w.Id, attempt, msg.Data)
  }
  fmt.Printf("Worker %d: Failed to process request %v after %d retries\n", w.Id, msg.Data, msg.MaxRetries)
 }
}

Worker 結構表示處理請求的 Worker。每個 Worker 都在自己的 goroutine 中運行,並在一個通道上監聽傳入的請求。

Main

// main.go
package main
import (
 "context"
 "fmt"
 "runtime"
 "sync"
 "time"
 wp "workerpool/workerpool"
)
func main() {
 // Set GOMAXPROCS to the number of available CPUs
 numCPU := runtime.NumCPU()
 runtime.GOMAXPROCS(numCPU)
 fmt.Printf("Running with %d CPUs\n", numCPU)
 // Configuration
 bufferSize := 50000
 maxWorkers := 20
 minWorkers := 3
 loadThreshold := 40000
 requests := 50000
 var wg sync.WaitGroup
 dispatcher := wp.NewDispatcher(bufferSize, &wg, maxWorkers)
 // Start initial set of workers
 for i := 0; i < minWorkers; i++ {
  fmt.Printf("Starting worker with id %d\n", i)
  w := &wp.Worker{
   Wg:         &wg,
   Id:         i,
   ReqHandler: wp.ReqHandler,
  }
  dispatcher.AddWorker(w)
 }
 // Start the scaling logic in a separate goroutine
 go dispatcher.ScaleWorkers(minWorkers, maxWorkers, loadThreshold)
 // Send requests to the dispatcher
 for i := 0; i < requests; i++ {
  req := wp.Request{
   Data:    fmt.Sprintf("(Msg_id: %d) -> Hello", i),
   Handler: func(result interface{}) error { return nil },
   Type:    1,
   Timeout: 5 * time.Second,
  }
  dispatcher.MakeRequest(req)
 }
 // Gracefully stop the dispatcher
 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 defer cancel()
 dispatcher.Stop(ctx)
 fmt.Println("Exiting main!")
}

main.go 初始化調度程序和工作程序,向調度程序發送請求,並優雅地停止調度程序。

我們可以調整上下文超時、緩衝區大小和最小 / 最大 worker 等參數,以最大限度地提高每秒請求數(RPS)並改善應用程序性能。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/zmS5L-ZxHNYGo3SylibMWg