Go 併發和協程池

在現代編程語言中,併發已經成爲一種非常明確的需求。如今幾乎每種編程語言都有一些併發的方法。有些語言具有豐富的結構體,可以將負載分配到操作系統的多線程上執行,例如 Java,類似的還有 Ruby。

Golang 具有非常強大的併發模型,稱爲 CSP(communicating sequential processes),它將一個問題分解成更小的順序進程,然後調度這些進程的幾個實例,稱爲 Goroutines。這些 goroutines 之間的通信是通過通道傳遞不可變的消息進行的。

本文,我們將探討如何利用 golang 中的併發,以及如何通過協程池限制 goroutine 的使用。

一個簡單的例子

假設我們有一個外部 API 調用,它需要大約 100 毫秒才能完成。如果我們有 1000 個這樣的調用,並且我們同步地調用,大約需要 100s 才能完成。

//// model/data.go

package model

type SimpleData struct {
    ID int
}

//// basic/basic.go

package basic

import (
    "fmt"
    "github.com/Joker666/goworkerpool/model"
    "time"
)

funcWork(allData []model.SimpleData) {
    start := time.Now()
    for i, _ := range allData {
        Process(allData[i])
    }
    elapsed := time.Since(start)
    fmt.Printf("Took ===============> %s\n", elapsed)
}

funcProcess(data model.SimpleData) {
    fmt.Printf("Start processing %d\n", data.ID)
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("Finish processing %d\n", data.ID)
}

//// main.go

package main

import (
    "fmt"
    "github.com/Joker666/goworkerpool/basic"
    "github.com/Joker666/goworkerpool/model"
    "github.com/Joker666/goworkerpool/worker"
)

func main() {
    // 準備數據
    var allData []model.SimpleData
    for i := 0; i < 1000; i++ {
        data := model.SimpleData{ ID: i }
        allData = append(allData, data)
    }
    fmt.Printf("Start processing all work \n")

    //處理數據
    basic.Work(allData)
}

這裏,我們有一個簡單的模型,它包含一個只有整數值的數據結構。我們同步處理這結構體數組。這顯然不是最佳解決方案,因爲這些任務可以併發處理。讓我們使用 goroutines 和通道將其轉換爲異步過程。

異步處理

////未使用協程池

func NotPooledWork(allData []model.SimpleData) {
    start := time.Now()
    var wg sync.WaitGroup

    dataCh := make(chan model.SimpleData, 100)

    wg.Add(1)
    go func() {
        defer wg.Done()
        for data := range dataCh {
            wg.Add(1)
            go func(data model.SimpleData) {
                defer wg.Done()
                basic.Process(data)
            }(data)
        }
    }()

    for i, _ := range allData {
        dataCh <- allData[i]
    }

    close(dataCh)
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.NotPooledWork(allData)

在這裏,我們創建了一個 100 的緩衝通道,並將傳遞給 NoPooledWork 函數的所有數據添加到該通道。由於它是一個緩衝通道,因此在數據被處理之前,它不能輸入超過 100 個數據。這些都在一個 goroutine 中完成。從通道中每提取一個數據,添加一個 goroutine,然後處理。在這裏創建的 goroutine 數量是沒有限制的。它可以處理傳遞的所有任務。理論上,在給定所需資源的情況下,處理儘可能多的數據是可能的。運行以上代碼,我們將在 100 毫秒左右完成 1000 個任務。

存在問題

除非我們有足夠的資源,否則我們在一段時間內所能做的資源分配是有限的。goroutine 對象的最小大小是 2K,但可以達到 1GB。上面的代碼併發地運行所有任務,假設有一百萬個這樣的任務,它會很快耗盡機器的內存和 CPU。我們要麼升級機器,要麼找別的解決辦法。

計算機科學家很久以前就已經考慮過這個問題,並提出了一個聰明的解決方案,稱爲線程池或 WorkerPool。這個想法是爲了讓有限的計算資源來處理這些任務。一旦一個 woker 完成了一個任務,它就開始處理下一個任務。因此,任務一直等待被處理。這減少了 CPU 和內存的突發增長,隨着時間的推移分配任務。

WorkerPool 解決方案

/// worker/pooled.go

func PooledWork(allData []model.SimpleData) {
    start := time.Now()
    var wg sync.WaitGroup
    workerPoolSize := 100

    dataCh := make(chan model.SimpleData, workerPoolSize)

    for i := 0; i < workerPoolSize; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            for data := range dataCh {
                basic.Process(data)
            }
        }()
    }

    for i, _ := range allData {
        dataCh <- allData[i]
    }

    close(dataCh)
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.PooledWork(allData)

這裏使用 100 個 worker,創建 100 個 goroutine 來處理任務。我們可以把通道看作隊列,每個 goroutine 都是一個消費者。多個 goroutines 可以監聽同一個 channel,但是 channel 上的每個數據只被處理一次。

這是一個不錯的解決方案,執行上面的代碼可以在 1s 完成所有的任務。

錯誤處理

我們還沒有完全完成。上面的代碼看起來像一個完整的解決方案,但其實不是。在這裏未處理錯誤。我們創建一個需要處理錯誤的場景,看看如何處理。

//// worker/pooledError.go

func PooledWorkError(allData []model.SimpleData) {
    start := time.Now()
    var wg sync.WaitGroup
    workerPoolSize := 100

    dataCh := make(chan model.SimpleData, workerPoolSize)
    errors := make(chan error, 1000)

    for i := 0; i < workerPoolSize; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            for data := range dataCh {
                process(data, errors)
            }
        }()
    }

    for i, _ := range allData {
        dataCh <- allData[i]
    }

    close(dataCh)

    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            select {
            case err := <-errors:
                fmt.Println("finished with error:", err.Error())
            case <-time.After(time.Second * 1):
                fmt.Println("Timeout: errors finished")
                return
            }
        }
    }()

    defer close(errors)
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("Took ===============> %s\n", elapsed)
}

func process(data model.SimpleData, errors chan<- error) {
    fmt.Printf("Start processing %d\n", data.ID)
    time.Sleep(100 * time.Millisecond)
    if data.ID % 29 == 0 {
        errors <- fmt.Errorf("error on job %v", data.ID)
    } else {
        fmt.Printf("Finish processing %d\n", data.ID)
    }
}

//// main.go

// Process
worker.PooledWorkError(allData)

我們修改了 process 方法來處理一些隨機錯誤。因此,爲了在併發模型中處理錯誤,需要錯誤通道來保存錯誤數據。在所有任務完成處理後,我們檢查 errors 通道以查找錯誤。error 對象保存任務的 ID,以便在需要時可以再次處理這些 ID。

這比完全不處理錯誤的解決方案更好。但它還不是最完整的,在下一篇文章,我們將討論如何創建一個健壯的專用 WorkerPool,並在有限的 goroutine 數量下處理併發任務。

練習

如果多個任務處理失敗,我們並不總是希望繼續處理任務。可能我們有其他問題要先解決。作爲練習,您可以嘗試改進最後一個解決方案,當有兩個以上的任務導致錯誤時,停止處理任務。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/GfPPWuFR5WGatrAN7A_a6A