使用 Go Channel 構建可擴展的併發 worker pool
在本篇博文中,我們將深入探討如何在 Go 中構建一個可擴展的 Worker pool。該實現可有效地管理 Worker pool,以處理大量請求,同時根據負載動態調整 Worker 的數量。我們還會討論可能出現的問題以及如何避免它們。
概覽
要實現的 worker pool 應該具有以下功能:
-
根據負載情況動態調整 worker 數量;
-
具備超時和重試機制處理接收到的請求;
-
能夠 Gracefully shut down workers
實現
下面是完整的代碼,之後是每個部分的解釋和文檔。
Dispatcher
// interface.go
package workerpool
import "context"
// WorkerLauncher is an interface for launching workers.
type WorkerLauncher interface {
LaunchWorker(in chan Request, stopCh chan struct{})
}
// Dispatcher is an interface for managing the worker pool.
type Dispatcher interface {
AddWorker(w WorkerLauncher)
RemoveWorker(minWorkers int)
LaunchWorker(id int, w WorkerLauncher)
ScaleWorkers(minWorkers, maxWorkers, loadThreshold int)
MakeRequest(Request)
Stop(ctx context.Context)
}
// struct.go
package workerpool
import "time"
// Request represents a request to be processed by a worker.
type Request struct {
Handler RequestHandler
Type int
Data interface{}
Timeout time.Duration // Timeout duration for the request
Retries int // Number of retries
MaxRetries int // Max number of retries
}
// RequestHandler defines a function type for handling requests.
type RequestHandler func(interface{}) error
// dispatcher.go
package workerpool
import (
"context"
"fmt"
"sync"
"time"
)
// ReqHandler is a map of request handlers, keyed by request type.
var ReqHandler = map[int]RequestHandler{
1: func(data interface{}) error {
return nil
},
}
// dispatcher manages a pool of workers and distributes incoming requests among them.
type dispatcher struct {
inCh chan Request
wg *sync.WaitGroup
mu sync.Mutex
workerCount int
stopCh chan struct{} // Channel to signal workers to stop
}
// AddWorker adds a new worker to the pool and increments the worker count.
func (d *dispatcher) AddWorker(w WorkerLauncher) {
d.mu.Lock()
defer d.mu.Unlock()
d.workerCount++
d.wg.Add(1)
w.LaunchWorker(d.inCh, d.stopCh)
}
// RemoveWorker removes a worker from the pool if the worker count is greater than minWorkers.
func (d *dispatcher) RemoveWorker(minWorkers int) {
d.mu.Lock()
defer d.mu.Unlock()
if d.workerCount > minWorkers {
d.workerCount--
d.stopCh <- struct{}{} // Signal a worker to stop
}
}
// ScaleWorkers dynamically adjusts the number of workers based on the load.
func (d *dispatcher) ScaleWorkers(minWorkers, maxWorkers, loadThreshold int) {
ticker := time.NewTicker(time.Microsecond)
defer ticker.Stop()
for range ticker.C {
load := len(d.inCh) // Current load is the number of pending requests in the channel
if load > loadThreshold && d.workerCount < maxWorkers {
fmt.Println("Scaling Triggered")
newWorker := &Worker{
Wg: d.wg,
Id: d.workerCount,
ReqHandler: ReqHandler,
}
d.AddWorker(newWorker)
} else if load < 0.75*loadThreshold && d.workerCount > minWorkers {
fmt.Println("Reducing Triggered")
d.RemoveWorker(minWorkers)
}
}
}
// LaunchWorker launches a worker and increments the worker count.
func (d *dispatcher) LaunchWorker(id int, w WorkerLauncher) {
w.LaunchWorker(d.inCh, d.stopCh) // Pass stopCh to the worker
d.mu.Lock()
d.workerCount++
d.mu.Unlock()
}
// MakeRequest adds a request to the input channel, or drops it if the channel is full.
func (d *dispatcher) MakeRequest(r Request) {
select {
case d.inCh <- r:
default:
// Handle the case when the channel is full
fmt.Println("Request channel is full. Dropping request.")
// Alternatively, you can log, buffer the request, or take other actions
}
}
// Stop gracefully stops all workers, waiting for them to finish processing.
func (d *dispatcher) Stop(ctx context.Context) {
fmt.Println("\nstop called")
close(d.inCh) // Close the input channel to signal no more requests will be sent
done := make(chan struct{})
go func() {
d.wg.Wait() // Wait for all workers to finish
close(done)
}()
select {
case <-done:
fmt.Println("All workers stopped gracefully")
case <-ctx.Done():
fmt.Println("Timeout reached, forcing shutdown")
// Forcefully stop all workers if timeout is reached
for i := 0; i < d.workerCount; i++ {
d.stopCh <- struct{}{}
}
}
d.wg.Wait()
}
// NewDispatcher creates a new dispatcher with a buffered channel and a wait group.
func NewDispatcher(b int, wg *sync.WaitGroup, maxWorkers int) Dispatcher {
return &dispatcher{
inCh: make(chan Request, b),
wg: wg,
stopCh: make(chan struct{}, maxWorkers), // Buffered channel to prevent blocking on stop
}
}
dispatcher 負責管理 worker,並在他們之間分配傳入的請求。它可以根據當前負載動態添加或移除 worker,並確保優雅地關閉所有 worker。
-
AddWorker: 向池中添加一個新的 Worker 並增加 Worker 數量。Worker 啓動後將開始處理請求。
-
RemoveWorker: 如果 worker 數超過最低要求,則從池中移除一個 worker。通過 stopCh 通道發出停止信號。
-
ScaleWorkers: 根據負載動態調整 worker 數量。如果負載超過閾值,且 worker 數量少於允許的最大值,則會添加一個新的 worker。如果負載低於閾值,且 worker 數量多於最低要求,則會移除一個 worker。
-
LaunchWorker:啓動 Worker 啓動 Worker 並增加 Worker 數量。這通常用於初始化操作。
-
MakeRequest: 向輸入通道添加一個請求。如果通道已滿,則放棄請求,並記錄一條信息。
-
Stop: 優雅地停止所有工作進程。它會等待所有 worker 處理完當前請求。如果超時,它會強制停止所有工作進程。
Worker
// worker.go
package workerpool
import (
"context"
"fmt"
"sync"
"time"
)
// Worker represents a worker that processes requests.
type Worker struct {
Id int
Wg *sync.WaitGroup
ReqHandler map[int]RequestHandler
}
// LaunchWorker launches the worker to process incoming requests.
// It runs in a separate goroutine, continuously listening for incoming requests on the input channel.
// The worker gracefully stops when either the input channel is closed or it receives a stop signal.
func (w *Worker) LaunchWorker(in chan Request, stopCh chan struct{}) {
go func() {
defer w.Wg.Done()
for {
select {
case msg, open := <-in:
if !open {
// If the channel is closed, stop processing and return
// if we skip close channel check then after closing channel,
// worker keep reading empty values from closed channel.
fmt.Println("Stopping worker:", w.Id)
return
}
w.processRequest(msg)
time.Sleep(1 * time.Microsecond) // Small delay to prevent tight loop
case <-stopCh:
fmt.Println("Stopping worker:", w.Id)
return
}
}
}()
}
// processRequest processes a single request.
func (w *Worker) processRequest(msg Request) {
fmt.Printf("Worker %d processing request: %v\n", w.Id, msg)
var handler RequestHandler
var ok bool
if handler, ok = w.ReqHandler[msg.Type]; !ok {
fmt.Println("Handler not implemented: workerID:", w.Id)
} else {
if msg.Timeout == 0 {
msg.Timeout = time.Duration(10 * time.Millisecond) // Default timeout
}
for attempt := 0; attempt <= msg.MaxRetries; attempt++ {
var err error
done := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), msg.Timeout)
defer cancel()
go func() {
err = handler(msg.Data)
close(done)
}()
select {
case <-done:
if err == nil {
return // Successfully processed
}
fmt.Printf("Worker %d: Error processing request: %v\n", w.Id, err)
case <-ctx.Done():
fmt.Printf("Worker %d: Timeout processing request: %v\n", w.Id, msg.Data)
}
fmt.Printf("Worker %d: Retry %d for request %v\n", w.Id, attempt, msg.Data)
}
fmt.Printf("Worker %d: Failed to process request %v after %d retries\n", w.Id, msg.Data, msg.MaxRetries)
}
}
Worker 結構表示處理請求的 Worker。每個 Worker 都在自己的 goroutine 中運行,並在一個通道上監聽傳入的請求。
-
LaunchWorker:在一個單獨的 goroutine 中啓動 Worker。Worker 會處理接收到的請求,直到輸入通道關閉或收到停止信號。
-
processRequest:處理單個請求。如果出現錯誤或請求超時,它會重試請求,最多可重試指定的最大次數。
Main
// main.go
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
wp "workerpool/workerpool"
)
func main() {
// Set GOMAXPROCS to the number of available CPUs
numCPU := runtime.NumCPU()
runtime.GOMAXPROCS(numCPU)
fmt.Printf("Running with %d CPUs\n", numCPU)
// Configuration
bufferSize := 50000
maxWorkers := 20
minWorkers := 3
loadThreshold := 40000
requests := 50000
var wg sync.WaitGroup
dispatcher := wp.NewDispatcher(bufferSize, &wg, maxWorkers)
// Start initial set of workers
for i := 0; i < minWorkers; i++ {
fmt.Printf("Starting worker with id %d\n", i)
w := &wp.Worker{
Wg: &wg,
Id: i,
ReqHandler: wp.ReqHandler,
}
dispatcher.AddWorker(w)
}
// Start the scaling logic in a separate goroutine
go dispatcher.ScaleWorkers(minWorkers, maxWorkers, loadThreshold)
// Send requests to the dispatcher
for i := 0; i < requests; i++ {
req := wp.Request{
Data: fmt.Sprintf("(Msg_id: %d) -> Hello", i),
Handler: func(result interface{}) error { return nil },
Type: 1,
Timeout: 5 * time.Second,
}
dispatcher.MakeRequest(req)
}
// Gracefully stop the dispatcher
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
dispatcher.Stop(ctx)
fmt.Println("Exiting main!")
}
main.go 初始化調度程序和工作程序,向調度程序發送請求,並優雅地停止調度程序。
-
將 GOMAXPROCS 設置爲可用 CPU 的數量。
-
初始化調度程序並啓動化操作
-
向 dispatcher 發送請求。
-
超時後優雅地停止調度程序。
我們可以調整上下文超時、緩衝區大小和最小 / 最大 worker 等參數,以最大限度地提高每秒請求數(RPS)並改善應用程序性能。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/zmS5L-ZxHNYGo3SylibMWg