Go 語言控制協程 -goroutine- 的併發數量,有哪些好的解決方法

在使用協程併發處理某些任務時, 其併發數量往往因爲各種因素的限制不能無限的增大. 例如網絡請求、數據庫查詢等等。

從運行效率角度考慮,在相關服務可以負載的前提下(限制最大併發數),儘可能高的併發。

在 Go 語言中,可以使用一些方法來控制協程(goroutine)的併發數量,以防止併發過多導致資源耗盡或性能下降。以下是一些常見的方法:

1. 使用信號量(Semaphore):

可以使用 Go 語言中的 channel 來實現簡單的信號量,限制併發數量。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, sem chan struct{}) {
    sem <- struct{}{} // 佔用一個信號量
    defer func() {
        <-sem // 釋放信號量
    }()

    // 執行工作任務
    fmt.Printf("Worker %d: Working...\n", id)
}

func main() {
    concurrency := 3
    sem := make(chan struct{}, concurrency)
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, sem)
        }(i)
    }

    wg.Wait()
    close(sem)
}

在上述例子中,sem 是一個有緩衝的 channel,通過控制 channel 中元素的數量,實現了一個簡單的信號量機制。

2. 使用協程池:

可以創建一個固定數量的協程池,將任務分發給這些協程執行。

package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d: Working on job %d\n", id, j)
        results <- j * 2
    }
}

func main() {
    const numJobs = 5
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    var wg sync.WaitGroup

    // 啓動協程池
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(i)
    }

    // 提交任務
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }

    close(jobs)

    // 等待所有工作完成
    go func() {
        wg.Wait()
        close(results)
    }()

    // 處理結果
    for result := range results {
        fmt.Println("Result:", result)
    }
}

在上述例子中,jobs 通道用於存儲任務,results 通道用於存儲處理結果。通過創建固定數量的工作協程,可以有效地控制併發數量。

3. 使用 golang.org/x/sync/semaphore 包:

Go 1.16 引入了 golang.org/x/sync/semaphore 包,它提供了一個更爲靈活的信號量實現。

package main

import (
    "fmt"
    "golang.org/x/sync/semaphore"
    "sync"
)

func worker(id int, sem *semaphore.Weighted) {
    sem.Acquire(semaphore.WithWeight(1))
    defer sem.Release(1)

    // 執行工作任務
    fmt.Printf("Worker %d: Working...\n", id)
}

func main() {
    concurrency := 3
    sem := semaphore.NewWeighted(int64(concurrency))
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, sem)
        }(i)
    }

    wg.Wait()
}

在上述例子中,使用 golang.org/x/sync/semaphore 包創建了一個帶權重的信號量,控制協程的併發數量。

選擇哪種方法取決於具體的應用場景和需求。使用信號量是一種簡單而靈活的方法,而協程池則更適用於需要批量處理任務的情況。golang.org/x/sync/semaphore 包提供了一個標準庫外的更靈活的信號量實現。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/lE-zlxyAboVHdSTfPhBFQw