Go 併發和協程池 -2-

Goroutines 和 channel 結構體使 golang 成爲強大的併發語言。在第一部分中,我們探討了如何構建一個協程池來優化 golang 的併發性能,即限制資源利用率。但那只是用一個簡單的例子來演示我們如何去做。

在本文,我們將構建一個健壯的解決方案,以便我們可以在任何應用程序中使用這個解決方案。在網上也有一些解決方案,使用了調度程序和複雜的結構。事實上,不需要那麼複雜,我們可以使用一個共享的渠道分配任務的處理。看看本文是怎麼實現的。

代碼結構

這裏,我們創建了一個通用的 workerpool 包,它可以根據設置的併發度完成任務處理。

workerpool
├── pool.go
├── task.go
└── worker.go

workerpool 是根目錄,Task 是要處理的任務,worker 是一個簡單的處理任務的函數。pool 是處理 worker 的創建和管理。

代碼實現

// workerpool/task.go
package workerpool
import (
    "fmt"
)
type Task struct {
    Err  error
    Data interface{}
    f    func(interface{}) error
}
func NewTask(f func(interface{}) error, data interface{}) *Task {
    return &Task{f: f, Data: data}
}
func process(workerID int, task *Task) {
    fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
    task.Err = task.f(task.Data)
}

Task 是一個簡單的結構體,包含任務需要的所有字段,f 是處理任務的函數、data 是函數參數。process 是執行具體任務的函數。我們將任務返回的錯誤存儲在任務的 Err 字段裏面。下面看看 worker 是如何處理任務的。

// workerpool/worker.go
package workerpool
import (
    "fmt"
    "sync"
)
// Worker handles all the work
type Worker struct {
    ID       int
    taskChan chan *Task
}
// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
    return &Worker{
        ID:       ID,
        taskChan: channel,
    }
}
// Start starts the worker
func (wr *Worker) Start(wg *sync.WaitGroup) {
    fmt.Printf("Starting worker %d\n", wr.ID)
    wg.Add(1)
    go func() {
        defer wg.Done()
        for task := range wr.taskChan {
            process(wr.ID, task)
        }
    }()
}

Worker 也是一個簡單的結構體。包含一個等待處理的任務通道,以及 worker ID。Start 方法遍歷 taskChan,並啓動一個 goroutine 來處理任務。每個 worker 都併發地執行通道里面的任務。

Worker Pool

Task 和 Worker 結構體和相應的方法實現了,但還缺少如何創建 Worker 以及向通道中發送任務的內容。

// workerpoo/pool.go
package workerpool
import (
    "fmt"
    "sync"
    "time"
)
// Pool is the worker pool
type Pool struct {
    Tasks   []*Task
    concurrency   int
    collector     chan *Task
    wg            sync.WaitGroup
}
// NewPool initializes a new pool with the given tasks and
// at the given concurrency.
func NewPool(tasks []*Task, concurrency int) *Pool {
    return &Pool{
        Tasks:       tasks,
        concurrency: concurrency,
        collector:   make(chan *Task, 1000),
    }
}
// Run runs all work within the pool and blocks until it's
// finished.
func (p *Pool) Run() {
    for i := 1; i <= p.concurrency; i++ {
        worker := NewWorker(p.collector, i)
        worker.Start(&p.wg)
    }
    for i := range p.Tasks {
        p.collector <- p.Tasks[i]
    }
    close(p.collector)
    p.wg.Wait()
}

workerpool 保存它需要處理的所有任務,並以併發數作爲輸入,生成數量相同的 goroutines 併發地完成任務。它有一個帶緩存的通道 collector,被所有的 worker 共享。
因此在運行 workerpool 時,會創建指定數量的 worker 來共同處理 collector 通道中的任務。使用 waitgroup 來同步所有的 worker,下面來試試效果。

// main.go
package main
import (
    "fmt"
    "time"
    "github.com/Joker666/goworkerpool/workerpool"
)
func main() {
    var allTask []*workerpool.Task
    for i := 1; i <= 100; i++ {
        task := workerpool.NewTask(func(data interface{}) error {
            taskID := data.(int)
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("Task %d processed\n", taskID)
            return nil
        }, i)
        allTask = append(allTask, task)
    }
    pool := workerpool.NewPool(allTask, 5)
    pool.Run()
}

在這裏,我們創建了 100 個任務,並使用 5 個作爲併發處理它們。看看輸出:

Worker 3 processes task 98
Task 92 processed
Worker 2 processes task 99
Task 98 processed
Worker 5 processes task 100
Task 99 processed
Task 100 processed
Took ===============> 2.0056295s

花了 2 秒完成 100 個任務的處理,如果將併發改爲 10 的話處理時間大概 1 秒左右。
我們已經爲 workerpool 實現了一個健壯的解決方案,可以處理併發性,存儲錯誤到任務中。這個包是通用的,沒有與特定的實現耦合。

進一步擴展: 在後臺處理任務

實際上,我們可以進一步擴展我們的解決方案,這樣,worker 就可以一直在後臺等待新的任務,我們可以把新的任務發送給它們處理。爲此,Task 保持原樣,但是我們需要稍微修改一下 Worker。

// workerpool/worker.go// Worker handles all the worktype Worker struct {    ID       int    taskChan chan *Task    quit     chan bool}// NewWorker returns new instance of workerfunc NewWorker(channel chan *Task, ID int) *Worker {    return &Worker{        ID:       ID,        taskChan: channel,        quit:     make(chan bool),    }}....// StartBackground starts the worker in background waitingfunc (wr *Worker) StartBackground() {    fmt.Printf("Starting worker %d\n", wr.ID)    for {        select {        case task := <-wr.taskChan:            process(wr.ID, task)        case <-wr.quit:            return        }    }}// Stop quits the workerfunc (wr *Worker) Stop() {    fmt.Printf("Closing worker %d\n", wr.ID)    go func() {        wr.quit <- true    }()}

我們添加 quit 通道和兩個方法到 Worker 結構體中。StartBackgorund 將啓動一個無限循環從 taskChan 中讀取任務並處理。如果從 quit 通道中讀取到數據就退出函數。Stop 方法向 quit 中寫數據。因此 Pool 也需要做相應的改動。

// workerpool/pool.go
type Pool struct {
    Tasks   []*Task
    Workers []*Worker
    concurrency   int
    collector     chan *Task
    runBackground chan bool
    wg            sync.WaitGroup
}
// AddTask adds a task to the pool
func (p *Pool) AddTask(task *Task) {
    p.collector <- task
}
// RunBackground runs the pool in background
func (p *Pool) RunBackground() {
    go func() {
        for {
            fmt.Print("⌛ Waiting for tasks to come in ...\n")
            time.Sleep(10 * time.Second)
        }
    }()
    for i := 1; i <= p.concurrency; i++ {
        worker := NewWorker(p.collector, i)
        p.Workers = append(p.Workers, worker)
        go worker.StartBackground()
    }
    for i := range p.Tasks {
        p.collector <- p.Tasks[i]
    }
    p.runBackground = make(chan bool)
    <-p.runBackground //阻塞等待所有worker停止
}
// Stop stops background workers
func (p *Pool) Stop() {
    for i := range p.Workers {
        p.Workers[i].Stop()
    }
    p.runBackground <- true
}

Pool 結構體包含一個 runBackground 通道通知 worker 後臺執行,AddTask 方法可以隨時向任務通道 collector 中添加任務。RunBackground 方法通過一個 goroutine 在後臺保持執行。如果任務通道沒有數據就會等待。

如果有一個真實的場景,它將與 HTTP 服務器一起運行並處理任務。我們會用一個無限循環來複制類似的行爲,符合某個條件,就會停止。

// main.go
...
pool := workerpool.NewPool(allTask, 5)
go func() {
    for {
        taskID := rand.Intn(100) + 20
        if taskID%7 == 0 {
            pool.Stop()
        }
        time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
        task := workerpool.NewTask(func(data interface{}) error {
            taskID := data.(int)
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("Task %d processed\n", taskID)
            return nil
        }, taskID)
        pool.AddTask(task)
    }
}()
pool.RunBackground()

當我們運行 main 函數時,我們會看到一個隨機的任務被插入,而 workers 在後臺運行,其中一個 worker 會讀取到這個任務。當它符合停止的條件時,它將最終停止。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/EFWcv13s6pkaipDp2BdbQw