Go 可以無限 Go?回家等通知吧

重複別人所說的話,只需要教育;而要挑戰別人所說的話,則需要頭腦。——瑪麗 · 佩蒂博恩 · 普爾

之前有兩篇文章講過 Golang 的併發控制,相信大家已經熟記於心,掌握其緊隨,但是我們一直沒有說 Go 的 goroutine 是否可以無限制的開闢,以及如何限定其數量,那麼這篇文章我們就來聊聊 Go 是如何控制數量的。

1 不控制數量會引發問題

我們都知道 Goroutine 具備如下兩個特點

那麼 goroutine 是否可以無限開闢呢,如果做一個服務器或者一些高業務的場景,能否隨意的開闢 goroutine 並且放養不管呢?讓他們自生自滅,畢竟有強大的 GC 和優質的調度算法支撐?

可以先看下下面這個問題。

package main
import (
    "fmt"
    "math"
    "runtime"
)
func main() {
    //模擬用戶需求業務的數量
    task_cnt := math.MaxInt64
    for i := 0; i < task_cnt; i++ {
        go func(i int) {
            //... do some busi...
            fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine())
        }(i)
    }
}

結果如下圖所示:

從結果圖可以看出,併發太大,單個 socket 所能處理的併發是有限的

所以,我們迅速的開闢 goroutine(不控制併發的 goroutine 數量) 會在短時間內佔據操作系統的資源 (CPU、內存、文件描述符等)。

這些資源實際上是所有用戶態程序共享的資源,所以大批的 goroutine 最終引發的災難不僅僅是自身,還會關聯其他運行的程序。

所以在編寫邏輯業務的時候,限制 goroutine 是我們必須要重視的問題。

2 一些簡單方法控制 goroutines 數量

2.1 用有 buffer 的 channel 來限制

package main
import (
    "fmt"
    "math"
    "runtime"
)
func busi(ch chan bool, i int) {
    fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine())
    <-ch
}
func main() {
    //模擬用戶需求業務的數量
    task_cnt := math.MaxInt64
    //task_cnt := 10
    ch := make(chan bool, 3)
    for i := 0; i < task_cnt; i++ {
        ch <- true
        go busi(ch, i)
    }
}

結果:

...
go func  352277  goroutine count =  4
go func  352278  goroutine count =  4
go func  352279  goroutine count =  4
go func  352280  goroutine count =  4
go func  352281  goroutine count =  4
go func  352282  goroutine count =  4
...

從結果看,程序並沒有出現崩潰,而是按部就班的順序執行,並且 go 的數量控制在了 3,(4 的原因是因爲還有一個 main goroutine) 那麼從數字上看,是不是在跑的 goroutines 有幾十萬個呢?

這裏我們用了,buffer 爲 3 的 channel, 在寫的過程中,實際上是限制了速度。限制的是

for 循環的速度,因爲這個速度決定了 go 的創建速度,而 go 的結束速度取決於 busi() 函數的執行速度。這樣實際上,我們就能夠保證了,同一時間內運行的 goroutine 的數量與 buffer 的數量一致。從而達到了限定效果。

但是這段代碼有一個小問題,就是如果我們把 go_cnt 的數量變的小一些,會出現打出的結果不正確。

package main
import (
    "fmt"
    //"math"
    "runtime"
)
func busi(ch chan bool, i int) {
    fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine())
    <-ch
}
func main() {
    //模擬用戶需求業務的數量
    //task_cnt := math.MaxInt64
    task_cnt := 10
    ch := make(chan bool, 3)
    for i := 0; i < task_cnt; i++ {
        ch <- true
        go busi(ch, i)
    }
}

結果:

go func  2  goroutine count =  4
go func  3  goroutine count =  4
go func  4  goroutine count =  4
go func  5  goroutine count =  4
go func  6  goroutine count =  4
go func  1  goroutine count =  4
go func  8  goroutine count =  4

是因爲 main 將全部的 go 開闢完之後,就立刻退出進程了。所以想全部 go 都執行,需要在 main 的最後進行阻塞操作。

2.2 使用 sync 同步機制

import (
    "fmt"
    "math"
    "sync"
    "runtime"
)
var wg = sync.WaitGroup{}
func busi(i int) {
    fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine())
    wg.Done()
}
func main() {
    //模擬用戶需求業務的數量
    task_cnt := math.MaxInt64
    for i := 0; i < task_cnt; i++ {
        wg.Add(1)
        go busi(i)
    }
      wg.Wait()
}

很明顯,單純的使用 sync 依然達不到控制 goroutine 的數量,所以最終結果依然是崩潰。

...
go func  7562  goroutine count =  7582
go func  24819  goroutine count =  17985
go func  7685  goroutine count =  7582
go func  24701  goroutine count =  17984
go func  7567  goroutine count =  7582
go func  24711  goroutine count =  17975
//操作系統停止響應

2.3 channel 與 sync 同步組合方式

package main
import (
    "fmt"
    "math"
    "sync"
    "runtime"
)
var wg = sync.WaitGroup{}
func busi(ch chan bool, i int) {
    fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine())
    <-ch
    wg.Done()
}
func main() {
    //模擬用戶需求go業務的數量
    task_cnt := math.MaxInt64
    ch := make(chan bool, 3)
    for i := 0; i < task_cnt; i++ {
        wg.Add(1)
        ch <- true
        go busi(ch, i)
    }
      wg.Wait()
}

結果:

//...
go func  228851  goroutine count =  4
go func  228852  goroutine count =  4
go func  228853  goroutine count =  4
go func  228854  goroutine count =  4
go func  228855  goroutine count =  4
//...

這樣我們程序就不會再造成資源爆炸而崩潰。而且運行 go 的數量控制住了在 buffer 爲 3 的這個範圍內。

2.4 利用無緩衝 channel 與任務發送 / 執行分離方式

package main
import (
    "fmt"
    "math"
    "sync"
    "runtime"
)
var wg = sync.WaitGroup{}
func busi(ch chan int) {
    for t := range ch {
        fmt.Println("go task = ", t, ", goroutine count = ", runtime.NumGoroutine())
        wg.Done()
    }
}
func sendTask(task int, ch chan int) {
    wg.Add(1)
    ch <- task
}
func main() {
    ch := make(chan int)   //無buffer channel
    goCnt := 3              //啓動goroutine的數量
    for i := 0; i < goCnt; i++ {
        //啓動go
        go busi(ch)
    }
    taskCnt := math.MaxInt64 //模擬用戶需求業務的數量
    for t := 0; t < taskCnt; t++ {
        //發送任務
        sendTask(t, ch)
    }
      wg.Wait()
}

結果:

//...
go task =  130069 , goroutine count =  4
go task =  130070 , goroutine count =  4
go task =  130071 , goroutine count =  4
go task =  130072 , goroutine count =  4
...

執行流程大致如下,這裏實際上是將任務的發送和執行做了業務上的分離。使得消息出去,輸入 SendTask 的頻率可設置、執行 Goroutine 的數量也可設置。也就是既控制輸入 (生產),又控制輸出 (消費)。使得可控更加靈活。這也是很多 Go 框架的 Worker 工作池的最初設計思想理念。

3 關注公衆號

微信公衆號:堆棧 future

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/tiQtqz6q1gh2dJAsqYu7lA