使用 channel 控制併發數量
協程
goroutine 是輕量級線程,調度由 Go 運行時進行管理的。Go 語言的併發控制主要使用關鍵字 go 開啓協程 goroutine。Go 協程(Goroutine)之間通過信道(channel)進行通信,簡單的說就是多個協程之間通信的管道。信道可以防止多個協程訪問共享內存時發生資源爭搶的問題。語法格式:
// 普通函數創建 goroutine
go 函數名(參數列表)
//匿名函數創建 goroutine
go func(參數列表){
//函數體
}(調用參數列表)
協程可以開啓多少個?是否有限制呢?
func testRoutine() {
var wg sync.WaitGroup
for i := 0; i < math.MaxInt32; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Printf("併發數量:%d/n", i)
time.Sleep(time.Second)
}(i)
}
wg.Wait()
}
以上代碼開啓了 math.MaxInt32 個協程的併發,執行後可以看到結果直接 panic:“panic: too many concurrent operations on a single file or socket (max 1048575)”。整個併發操作超出了系統最大值。
控制協程數量
sync 同步機制
使用 sync.WaitGroup 啓動指定數量的協程 goroutine。
func testRoutine() {
var wg = sync.WaitGroup{}
taskCount := 5 // 指定併發數量
for i := 0; i < taskCount; i++ {
wg.Add(1)
go func(i int) {
fmt.Println("go func ", i)
wg.Done()
}(i)
}
wg.Wait()
}
如果 taskcount 設置的很大超出了限制的,則其還是沒有控制到併發數量。可以優化下設計,類似池的設計思想,通過允許最大連接數控制量,當超出了數量就需要等待釋放,有空閒的連接的時候纔可以繼續執行。
func testRoutine() {
task_chan := make(chan bool, 3) //100 爲 channel長度
wg := sync.WaitGroup{}
defer close(task_chan)
for i := 0; i < math.MaxInt; i++ {
wg.Add(1)
fmt.Println("go func ", i)
task_chan <- true
go func() {
<-task_chan
defer wg.Done()
}()
}
wg.Wait()
}
-
創建緩衝區大小爲 3 的 channel,在沒有被接收的情況下,至多發送 3 個消息則被阻塞。通過 channel 控制每次併發的數量。
-
開啓協程前,設置 task_chan <- true,若緩存區滿了則阻塞
-
協程任務執行完成後就釋放緩衝區
-
等待所有的併發都處理結束後則函數結束。其實可以不使用 sync.WaitGroup。因使用 channel 控制併發處理的任務數量可以不用使用等待併發處理結束。
參考資料:
- boilingfrog.github.io/2021/04/14 / 控制 goroutine 的數量 /
轉自:
https://juejin.cn/post/7134345224511291429
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/HKN1a_QbwnChJ0CYH-mgqQ