Go 併發機制解密:Goroutine 調度

Goroutine 是 Go 編程語言中一個極具特色的設計,也是其併發能力的核心亮點之一。Goroutine 本質上是一種協程(Coroutine),是實現並行計算的關鍵。使用 Goroutine 非常簡單,只需通過 go 關鍵字即可啓動一個協程,協程會以異步方式運行。程序無需等待 Goroutine 完成即可繼續執行後續代碼。

go func() // 使用 go 關鍵字啓動一個協程

II. Goroutine 的內部原理

概念介紹

併發(Concurrency)

在單個 CPU 上,可以同時執行多個任務。在極短的時間內,CPU 會在任務之間快速切換(例如,先執行一小段程序 A,然後迅速切換到程序 B)。從宏觀上看,這種任務的時間上有重疊,似乎是同時執行的,但從微觀上看,實際上是順序執行的。這種現象稱爲併發。

並行(Parallelism)

當系統擁有多個 CPU 時,每個 CPU 可以同時運行任務,且各自不需要爭奪資源。多個任務真正同時運行,這種現象稱爲並行。

進程(Process)

當 CPU 在多個程序之間切換時,如果不保存之前程序的狀態(即上下文),直接切換到下一個程序,那麼之前程序的一系列狀態會丟失。爲了解決這個問題,引入了進程的概念。進程爲程序執行分配所需的資源,因此進程是程序運行的基本資源單位(也可以看作程序執行的實體)。例如,運行一個文本編輯器時,該進程會管理所有資源,如文本緩衝區的內存空間、文件操作資源等。

線程(Thread)

CPU 在多個進程之間切換時,由於需要進入內核模式並讀取用戶模式數據,切換開銷較大。隨着進程數量增加,CPU 調度會消耗大量資源。爲了解決這一問題,引入了線程的概念。線程本身消耗的資源很少,它們共享進程內的資源。線程的調度開銷比進程小得多。例如,在一個 Web 服務器應用中,可以使用多個線程同時處理不同的客戶端請求,這些線程共享服務器進程的資源(如網絡連接和內存緩存)。

協程(Coroutine)

協程擁有自己的寄存器上下文和棧。當協程被調度切換時,會保存當前的寄存器上下文和棧;當切換回來時,則恢復之前保存的上下文和棧。因此,協程可以保留上一次調用的狀態(即所有局部狀態的特定組合)。每次重新進入協程時,相當於返回到上次調用時的狀態,即邏輯流程中上次退出的位置。

線程和進程的操作由系統接口觸發,最終由系統執行;而協程的操作由用戶程序自身執行。Goroutine 就是一種協程。


調度模型簡介

Goroutine 的強大併發能力通過 GPM 調度模型實現。以下是 Goroutine 調度模型的核心結構:

調度器中的四個重要結構

  1. M(Machine)
    表示內核級線程。每個 M 對應一個線程,Goroutine 運行在 M 上。例如,當一個 Goroutine 被啓動以執行復雜計算時,該 Goroutine 會被分配到一個 M 上執行。M 是一個較大的結構,包含小對象內存緩存(mcache)、當前正在執行的 Goroutine、隨機數生成器等信息。

  2. G(Goroutine)
    表示 Goroutine。它有自己的棧,用於存儲函數調用信息,還有一個指令指針,用於指定執行位置。此外,G 還包含其他信息(如等待的通道信息),這些信息用於調度。例如,當一個 Goroutine 等待從通道接收數據時,該信息會存儲在 G 結構中。

  3. P(Processor)
    全稱爲 Processor,主要用於執行 Goroutine。可以將其視爲任務分發器。P 維護一個 Goroutine 隊列,存儲需要由其執行的所有 Goroutine。例如,當創建多個 Goroutine 時,這些 Goroutine 會被添加到 P 的隊列中等待調度。

  4. Sched(Scheduler)
    表示調度器。可以看作是中央調度中心,維護 M 和 G 的隊列,以及調度器的一些狀態信息,確保整個系統的高效調度。


調度的實現

調度模型圖

如圖所示,有兩個物理線程 M,每個 M 綁定一個處理器 P,並運行一個 Goroutine。

當某個操作系統線程(如 M0)被阻塞時(如下圖所示),P 會切換到另一個線程(如 M1)。M1 可能是新創建的,也可能是從線程緩存中取出的。

線程阻塞切換圖

當 M0 返回時,它需要嘗試獲取一個 P 來運行 Goroutine。如果無法獲取 P,它會將 Goroutine 放入全局運行隊列,並進入休眠狀態(進入線程緩存)。所有 P 會定期檢查全局運行隊列,並運行其中的 Goroutine;否則,全局運行隊列中的 Goroutine 將永遠無法執行。


III. Goroutine 的使用

基本用法

設置 Goroutine 的運行 CPU 數量。Go 的最新版本默認會自動設置。

num := runtime.NumCPU() // 獲取主機的邏輯 CPU 數量
runtime.GOMAXPROCS(num) // 根據主機 CPU 數量設置 Goroutine 的最大併發級別

使用示例

示例 1:簡單的 Goroutine 計算

package main

import (
    "fmt"
    "time"
)

func cal(a int, b int) {
    c := a + b
    fmt.Printf("%d + %d = %d\n", a, b, c)
}

func main() {
    for i := 0; i < 10; i++ {
        go cal(i, i+1) // 啓動 10 個 Goroutine 進行計算
    }
    time.Sleep(time.Second * 2) // 等待所有任務完成
}

運行結果:

8 + 9 = 17
9 + 10 = 19
4 + 5 = 9
...

Goroutine 異常捕獲

當啓動多個 Goroutine 時,如果其中一個發生異常且未處理,整個程序會終止。因此,建議在每個 Goroutine 的函數中添加異常處理。可以使用 recover 函數捕獲異常。

package main

import (
    "fmt"
    "time"
)

func addele(a []int, i int) {
    deferfunc() {
        if err := recover(); err != nil {
            fmt.Println("add ele fail")
        }
    }()
    a[i] = i
    fmt.Println(a)
}

func main() {
    Arry := make([]int, 4)
    for i := 0; i < 10; i++ {
        go addele(Arry, i)
    }
    time.Sleep(time.Second * 2)
}

運行結果:

add ele fail
[0 0 0 0]
[0 1 0 0]
...

Goroutine 的同步

由於 Goroutine 是異步執行的,主程序可能在 Goroutine 完成前退出。爲確保所有 Goroutine 完成後再退出,Go 提供了 sync 包和 channel 來解決同步問題。

示例 1:使用 sync.WaitGroup 同步 Goroutine

package main

import (
    "fmt"
    "sync"
)

func cal(a int, b int, n *sync.WaitGroup) {
    c := a + b
    fmt.Printf("%d + %d = %d\n", a, b, c)
    defer n.Done()
}

func main() {
    var go_sync sync.WaitGroup
    for i := 0; i < 10; i++ {
        go_sync.Add(1)
        go cal(i, i+1, &go_sync)
    }
    go_sync.Wait()
}

運行結果:

9 + 10 = 19
2 + 3 = 5
...

Goroutine 間的通信

Goroutine 本質上是協程,可以通過 channel 實現通信或數據共享。

示例:使用 channel 模擬生產者 - 消費者模式

package main

import (
    "fmt"
    "sync"
)

func Productor(mychan chan int, data int, wait *sync.WaitGroup) {
    mychan <- data
    fmt.Println("product data:", data)
    wait.Done()
}

func Consumer(mychan chan int, wait *sync.WaitGroup) {
    a := <-mychan
    fmt.Println("consumer data:", a)
    wait.Done()
}

func main() {
    datachan := make(chanint, 100)
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go Productor(datachan, i, &wg)
    }
    for j := 0; j < 10; j++ {
        wg.Add(1)
        go Consumer(datachan, &wg)
    }
    wg.Wait()
}

運行結果:

product data: 0
consumer data: 0
...
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/7Cr4lyKd6qCLgEIzs_CmIg