Go 併發和協程池 -2-
Goroutines 和 channel 結構體使 golang 成爲強大的併發語言。在第一部分中,我們探討了如何構建一個協程池來優化 golang 的併發性能,即限制資源利用率。但那只是用一個簡單的例子來演示我們如何去做。
在本文,我們將構建一個健壯的解決方案,以便我們可以在任何應用程序中使用這個解決方案。在網上也有一些解決方案,使用了調度程序和複雜的結構。事實上,不需要那麼複雜,我們可以使用一個共享的渠道分配任務的處理。看看本文是怎麼實現的。
代碼結構
這裏,我們創建了一個通用的 workerpool 包,它可以根據設置的併發度完成任務處理。
workerpool
├── pool.go
├── task.go
└── worker.go
workerpool 是根目錄,Task 是要處理的任務,worker 是一個簡單的處理任務的函數。pool 是處理 worker 的創建和管理。
代碼實現
// workerpool/task.go
package workerpool
import (
"fmt"
)
type Task struct {
Err error
Data interface{}
f func(interface{}) error
}
func NewTask(f func(interface{}) error, data interface{}) *Task {
return &Task{f: f, Data: data}
}
func process(workerID int, task *Task) {
fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
task.Err = task.f(task.Data)
}
Task 是一個簡單的結構體,包含任務需要的所有字段,f 是處理任務的函數、data 是函數參數。process 是執行具體任務的函數。我們將任務返回的錯誤存儲在任務的 Err 字段裏面。下面看看 worker 是如何處理任務的。
// workerpool/worker.go
package workerpool
import (
"fmt"
"sync"
)
// Worker handles all the work
type Worker struct {
ID int
taskChan chan *Task
}
// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
return &Worker{
ID: ID,
taskChan: channel,
}
}
// Start starts the worker
func (wr *Worker) Start(wg *sync.WaitGroup) {
fmt.Printf("Starting worker %d\n", wr.ID)
wg.Add(1)
go func() {
defer wg.Done()
for task := range wr.taskChan {
process(wr.ID, task)
}
}()
}
Worker 也是一個簡單的結構體。包含一個等待處理的任務通道,以及 worker ID。Start 方法遍歷 taskChan,並啓動一個 goroutine 來處理任務。每個 worker 都併發地執行通道里面的任務。
Worker Pool
Task 和 Worker 結構體和相應的方法實現了,但還缺少如何創建 Worker 以及向通道中發送任務的內容。
// workerpoo/pool.go
package workerpool
import (
"fmt"
"sync"
"time"
)
// Pool is the worker pool
type Pool struct {
Tasks []*Task
concurrency int
collector chan *Task
wg sync.WaitGroup
}
// NewPool initializes a new pool with the given tasks and
// at the given concurrency.
func NewPool(tasks []*Task, concurrency int) *Pool {
return &Pool{
Tasks: tasks,
concurrency: concurrency,
collector: make(chan *Task, 1000),
}
}
// Run runs all work within the pool and blocks until it's
// finished.
func (p *Pool) Run() {
for i := 1; i <= p.concurrency; i++ {
worker := NewWorker(p.collector, i)
worker.Start(&p.wg)
}
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
close(p.collector)
p.wg.Wait()
}
workerpool 保存它需要處理的所有任務,並以併發數作爲輸入,生成數量相同的 goroutines 併發地完成任務。它有一個帶緩存的通道 collector,被所有的 worker 共享。
因此在運行 workerpool 時,會創建指定數量的 worker 來共同處理 collector 通道中的任務。使用 waitgroup 來同步所有的 worker,下面來試試效果。
// main.go
package main
import (
"fmt"
"time"
"github.com/Joker666/goworkerpool/workerpool"
)
func main() {
var allTask []*workerpool.Task
for i := 1; i <= 100; i++ {
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Task %d processed\n", taskID)
return nil
}, i)
allTask = append(allTask, task)
}
pool := workerpool.NewPool(allTask, 5)
pool.Run()
}
在這裏,我們創建了 100 個任務,並使用 5 個作爲併發處理它們。看看輸出:
Worker 3 processes task 98
Task 92 processed
Worker 2 processes task 99
Task 98 processed
Worker 5 processes task 100
Task 99 processed
Task 100 processed
Took ===============> 2.0056295s
花了 2 秒完成 100 個任務的處理,如果將併發改爲 10 的話處理時間大概 1 秒左右。
我們已經爲 workerpool 實現了一個健壯的解決方案,可以處理併發性,存儲錯誤到任務中。這個包是通用的,沒有與特定的實現耦合。
進一步擴展: 在後臺處理任務
實際上,我們可以進一步擴展我們的解決方案,這樣,worker 就可以一直在後臺等待新的任務,我們可以把新的任務發送給它們處理。爲此,Task 保持原樣,但是我們需要稍微修改一下 Worker。
// workerpool/worker.go// Worker handles all the worktype Worker struct { ID int taskChan chan *Task quit chan bool}// NewWorker returns new instance of workerfunc NewWorker(channel chan *Task, ID int) *Worker { return &Worker{ ID: ID, taskChan: channel, quit: make(chan bool), }}....// StartBackground starts the worker in background waitingfunc (wr *Worker) StartBackground() { fmt.Printf("Starting worker %d\n", wr.ID) for { select { case task := <-wr.taskChan: process(wr.ID, task) case <-wr.quit: return } }}// Stop quits the workerfunc (wr *Worker) Stop() { fmt.Printf("Closing worker %d\n", wr.ID) go func() { wr.quit <- true }()}
我們添加 quit 通道和兩個方法到 Worker 結構體中。StartBackgorund 將啓動一個無限循環從 taskChan 中讀取任務並處理。如果從 quit 通道中讀取到數據就退出函數。Stop 方法向 quit 中寫數據。因此 Pool 也需要做相應的改動。
// workerpool/pool.go
type Pool struct {
Tasks []*Task
Workers []*Worker
concurrency int
collector chan *Task
runBackground chan bool
wg sync.WaitGroup
}
// AddTask adds a task to the pool
func (p *Pool) AddTask(task *Task) {
p.collector <- task
}
// RunBackground runs the pool in background
func (p *Pool) RunBackground() {
go func() {
for {
fmt.Print("⌛ Waiting for tasks to come in ...\n")
time.Sleep(10 * time.Second)
}
}()
for i := 1; i <= p.concurrency; i++ {
worker := NewWorker(p.collector, i)
p.Workers = append(p.Workers, worker)
go worker.StartBackground()
}
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
p.runBackground = make(chan bool)
<-p.runBackground //阻塞等待所有worker停止
}
// Stop stops background workers
func (p *Pool) Stop() {
for i := range p.Workers {
p.Workers[i].Stop()
}
p.runBackground <- true
}
Pool 結構體包含一個 runBackground 通道通知 worker 後臺執行,AddTask 方法可以隨時向任務通道 collector 中添加任務。RunBackground 方法通過一個 goroutine 在後臺保持執行。如果任務通道沒有數據就會等待。
如果有一個真實的場景,它將與 HTTP 服務器一起運行並處理任務。我們會用一個無限循環來複制類似的行爲,符合某個條件,就會停止。
// main.go
...
pool := workerpool.NewPool(allTask, 5)
go func() {
for {
taskID := rand.Intn(100) + 20
if taskID%7 == 0 {
pool.Stop()
}
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Task %d processed\n", taskID)
return nil
}, taskID)
pool.AddTask(task)
}
}()
pool.RunBackground()
當我們運行 main 函數時,我們會看到一個隨機的任務被插入,而 workers 在後臺運行,其中一個 worker 會讀取到這個任務。當它符合停止的條件時,它將最終停止。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/EFWcv13s6pkaipDp2BdbQw