Go 併發和協程池
在現代編程語言中,併發已經成爲一種非常明確的需求。如今幾乎每種編程語言都有一些併發的方法。有些語言具有豐富的結構體,可以將負載分配到操作系統的多線程上執行,例如 Java,類似的還有 Ruby。
Golang 具有非常強大的併發模型,稱爲 CSP(communicating sequential processes),它將一個問題分解成更小的順序進程,然後調度這些進程的幾個實例,稱爲 Goroutines。這些 goroutines 之間的通信是通過通道傳遞不可變的消息進行的。
本文,我們將探討如何利用 golang 中的併發,以及如何通過協程池限制 goroutine 的使用。
一個簡單的例子
假設我們有一個外部 API 調用,它需要大約 100 毫秒才能完成。如果我們有 1000 個這樣的調用,並且我們同步地調用,大約需要 100s 才能完成。
//// model/data.go
package model
type SimpleData struct {
ID int
}
//// basic/basic.go
package basic
import (
"fmt"
"github.com/Joker666/goworkerpool/model"
"time"
)
funcWork(allData []model.SimpleData) {
start := time.Now()
for i, _ := range allData {
Process(allData[i])
}
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
funcProcess(data model.SimpleData) {
fmt.Printf("Start processing %d\n", data.ID)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Finish processing %d\n", data.ID)
}
//// main.go
package main
import (
"fmt"
"github.com/Joker666/goworkerpool/basic"
"github.com/Joker666/goworkerpool/model"
"github.com/Joker666/goworkerpool/worker"
)
func main() {
// 準備數據
var allData []model.SimpleData
for i := 0; i < 1000; i++ {
data := model.SimpleData{ ID: i }
allData = append(allData, data)
}
fmt.Printf("Start processing all work \n")
//處理數據
basic.Work(allData)
}
這裏,我們有一個簡單的模型,它包含一個只有整數值的數據結構。我們同步處理這結構體數組。這顯然不是最佳解決方案,因爲這些任務可以併發處理。讓我們使用 goroutines 和通道將其轉換爲異步過程。
異步處理
////未使用協程池
func NotPooledWork(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
dataCh := make(chan model.SimpleData, 100)
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
wg.Add(1)
go func(data model.SimpleData) {
defer wg.Done()
basic.Process(data)
}(data)
}
}()
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
//// main.go
// Process
worker.NotPooledWork(allData)
在這裏,我們創建了一個 100 的緩衝通道,並將傳遞給 NoPooledWork 函數的所有數據添加到該通道。由於它是一個緩衝通道,因此在數據被處理之前,它不能輸入超過 100 個數據。這些都在一個 goroutine 中完成。從通道中每提取一個數據,添加一個 goroutine,然後處理。在這裏創建的 goroutine 數量是沒有限制的。它可以處理傳遞的所有任務。理論上,在給定所需資源的情況下,處理儘可能多的數據是可能的。運行以上代碼,我們將在 100 毫秒左右完成 1000 個任務。
存在問題
除非我們有足夠的資源,否則我們在一段時間內所能做的資源分配是有限的。goroutine 對象的最小大小是 2K,但可以達到 1GB。上面的代碼併發地運行所有任務,假設有一百萬個這樣的任務,它會很快耗盡機器的內存和 CPU。我們要麼升級機器,要麼找別的解決辦法。
計算機科學家很久以前就已經考慮過這個問題,並提出了一個聰明的解決方案,稱爲線程池或 WorkerPool。這個想法是爲了讓有限的計算資源來處理這些任務。一旦一個 woker 完成了一個任務,它就開始處理下一個任務。因此,任務一直等待被處理。這減少了 CPU 和內存的突發增長,隨着時間的推移分配任務。
WorkerPool 解決方案
/// worker/pooled.go
func PooledWork(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
workerPoolSize := 100
dataCh := make(chan model.SimpleData, workerPoolSize)
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
basic.Process(data)
}
}()
}
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
//// main.go
// Process
worker.PooledWork(allData)
這裏使用 100 個 worker,創建 100 個 goroutine 來處理任務。我們可以把通道看作隊列,每個 goroutine 都是一個消費者。多個 goroutines 可以監聽同一個 channel,但是 channel 上的每個數據只被處理一次。
這是一個不錯的解決方案,執行上面的代碼可以在 1s 完成所有的任務。
錯誤處理
我們還沒有完全完成。上面的代碼看起來像一個完整的解決方案,但其實不是。在這裏未處理錯誤。我們創建一個需要處理錯誤的場景,看看如何處理。
//// worker/pooledError.go
func PooledWorkError(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
workerPoolSize := 100
dataCh := make(chan model.SimpleData, workerPoolSize)
errors := make(chan error, 1000)
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
process(data, errors)
}
}()
}
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case err := <-errors:
fmt.Println("finished with error:", err.Error())
case <-time.After(time.Second * 1):
fmt.Println("Timeout: errors finished")
return
}
}
}()
defer close(errors)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
func process(data model.SimpleData, errors chan<- error) {
fmt.Printf("Start processing %d\n", data.ID)
time.Sleep(100 * time.Millisecond)
if data.ID % 29 == 0 {
errors <- fmt.Errorf("error on job %v", data.ID)
} else {
fmt.Printf("Finish processing %d\n", data.ID)
}
}
//// main.go
// Process
worker.PooledWorkError(allData)
我們修改了 process 方法來處理一些隨機錯誤。因此,爲了在併發模型中處理錯誤,需要錯誤通道來保存錯誤數據。在所有任務完成處理後,我們檢查 errors 通道以查找錯誤。error 對象保存任務的 ID,以便在需要時可以再次處理這些 ID。
這比完全不處理錯誤的解決方案更好。但它還不是最完整的,在下一篇文章,我們將討論如何創建一個健壯的專用 WorkerPool,並在有限的 goroutine 數量下處理併發任務。
練習
如果多個任務處理失敗,我們並不總是希望繼續處理任務。可能我們有其他問題要先解決。作爲練習,您可以嘗試改進最後一個解決方案,當有兩個以上的任務導致錯誤時,停止處理任務。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/GfPPWuFR5WGatrAN7A_a6A