Go 語言併發模式代碼模板

前言

文章代碼量較多,如果是初次閱讀,建議瞭解每種模式的基礎用法即可,工作中有實際的應用開發場景時,再回來研究代碼細節

經典模式 (pipeline + selector)

這裏有一個來自 Go 官方博客的例子,通過管道篩選質數並打印。

質數篩選器

第一個版本

package main

import "fmt"

// 生成數字併發送到通道
func generate(ch chan int) {
    for i := 2; ; i++ {
        ch <- i
    }
}

// 通過參數質數過濾 in 通道傳遞的數字
func filter(in, out chan int, prime int) {
    for {
        i := <-in 
        if i%prime != 0 {
            out <- i 
        }
    }
}

func main() {
    ch := make(chan int) 
    go generate(ch)      
    
    for {
        // 不斷生成數字和篩選管道
        prime := <-ch
        fmt.Print(prime, " ")
        ch1 := make(chan int)
        go filter(ch, ch1, prime)
        ch = ch1
    }
}

第二個版本

第二個版本在之前的基礎上進行了改進:sievegeneratefilter 改爲工廠函數,創建通道並返回,而且使用了協程的 lambda 函數。 main 函數變得更加短小清晰:調用 sieve() 返回包含質數的通道,然後打印即可。

package main

import (
    "fmt"
)

func generate() chan int {
    ch := make(chan int)
    go func() {
        for i := 2; ; i++ {
            ch <- i
        }
    }()
    return ch
}

func filter(in chan int, prime int) chan int {
    out := make(chan int)
    go func() {
        for {
            if i := <-in; i%prime != 0 {
                out <- i
            }
        }
    }()
    return out
}

func sieve() chan int {
    out := make(chan int)
    go func() {
        ch := generate()
        for {
            prime := <-ch
            ch = filter(ch, prime)
            out <- prime
        }
    }()
    return out
}

func main() {
    primes := sieve()
    for {
        fmt.Println(<-primes)
    }
}

質數篩選器動圖

FanIn FanOut

Fan-In: 1 個 goroutine 從多個通道讀取數據 (一對多)
Fan-Out: 多個 goroutine 從 1 個通道讀取數據 (多對一)

圖片來源: https://jguer.space/posts/go-fanout-context/

官網上有一個經典用例 質數求和,同時用到了這兩種模式。

package main

import (
    "math"
    "sync"
    "time"
)

func echo(numbs []int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range numbs {
            out <- n
        }
        close(out)
    }()
    return out
}

// 求和函數
func sum(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        res := 0
        for n := range in {
            res += n
        }
        out <- res
        close(out)
    }()
    return out
}

func makeRange(min, max int) []int {
    a := make([]int, max-min+1)
    for i := range a {
        a[i] = min + i
    }
    return a
}

func isPrime(value int) bool {
    for i := 2; i <= int(math.Floor(float64(value)/2)); i++ {
        if value%i == 0 {
            return false
        }
    }
    return value > 1
}

func prime(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if isPrime(n) {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

func merge(cs []<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    wg.Add(len(cs))

    for _, c := range cs {
        go func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    nums := makeRange(1, 10)
    in := echo(nums)

    // Fan-Out
    var cs [4]<-chan int
    for i := range cs {
        cs[i] = sum(prime(in))
    }

    // Fan-In
    out := sum(merge(cs[:]))
    println(<-out)

    time.Sleep(time.Second) // 等待 out 通道關閉
}

Pipeline

管道 (pipeline) 是由通道連接的一系列階段,其中每個階段都是一組運行相同功能的 goroutine,例如 Linux 中的管道命令:

$ ps –ef | grep systemd | awk '{print $2}'

圖片來源: https://medium.com/@eric.g.yuan/go-concurrency-patterns-pipeline-2845d84bd92d

package main

import "fmt"

func main() {
    // 數值生成器管道
    generator := func(done <-chan int, nums []int) <-chan int {
        intStream := make(chan int)

        go func() {
            defer close(intStream)

            for _, n := range nums {
                select {
                case <-done:
                    return
                case intStream <- n:
                }
            }
        }()

        return intStream
    }

    // 數值相乘管道
    multiply := func(done <-chan int, intStream <-chan int, multiplier int) <-chan int {
        multiplyStream := make(chan int)

        go func() {
            defer close(multiplyStream)

            for i := range intStream {
                select {
                case <-done:
                    return
                case multiplyStream <- i * multiplier:
                }
            }

        }()

        return multiplyStream
    }

    // 數值相加管道
    add := func(done <-chan int, intStream <-chan int, addition int) <-chan int {
        addedStream := make(chan int)

        go func() {
            defer close(addedStream)

            for i := range intStream {
                select {
                case <-done:
                    return
                case addedStream <- i + addition:
                }
            }

        }()

        return addedStream
    }

    done := make(chan int)
    defer close(done)

    intStream := generator(done[]int{1, 2, 3})
    // 執行了類似 Linux 中的管道命令: generator | multiply | add
    pipeline := add(done, multiply(done, intStream, 2), 1)

    for p := range pipeline {
        fmt.Println(p)
    }
}

Generate

生成者模式 (Generate) 像 yield 一樣,生成一系列連續的值。

package main

func Count(start int, end int) chan int {
    ch := make(chan int)

    go func(ch chan int) {
        for i := start; i <= end; i++ {
            ch <- i
        }

        close(ch)
    }(ch)

    return ch
}

func main() {
    for i := range Count(1, 5) {
        println(i)
    }
}

// $ go run main.go
// 1
// 2
// 3
// 4
// 5

生產者 / 消費者

package main

import "fmt"

// 生產者
func produce(start, count int, out chan<- int) {
    for i := 0; i < count; i++ {
        out <- start
        start = start + count
    }
    close(out)
}

// 消費者
func consume(in <-chan int, done chan<- bool) {
    for num := range in {
        fmt.Printf("%d\n", num)
    }
    done <- true
}

func main() {
    numChan := make(chan int)
    done := make(chan bool)
    go produce(0, 10, numChan)
    go consume(numChan, done)

    <-done
}

// $ go run ma1in.go
// 輸出如下
// 0
// 10
// 20
// 30
// 40
// 50
// 60
// 70
// 80
// 90

信號量模式

信號量 是一種同步原語,對數量有限的資源的訪問進行控制。

接口

type Interface interface {
    Acquire() error
    Release() error
}

資源信號量

package main

var (
    ErrNoTickets      = errors.New("semaphore: could not aquire semaphore")
    ErrIllegalRelease = errors.New("semaphore: can't release the semaphore without acquiring it first")
)

type implementation struct {
    sem     chan struct{}
    timeout time.Duration
}

// Acquire 請求資源
func (s *implementation) Acquire() error {
    select {
    case s.sem <- struct{}{}:
        return nil
    case <-time.After(s.timeout):
        return ErrNoTickets
    }
}

// Release 釋放資源
func (s *implementation) Release() error {
    select {
    case _ = <-s.sem:
        return nil
    case <-time.After(s.timeout):
        return ErrIllegalRelease
    }

    return nil
}

func New(tickets int, timeout time.Duration) Interface {
    return &implementation{
        sem:     make(chan struct{}, tickets),
        timeout: timeout,
    }
}

超時信號

tickets, timeout := 1, 3*time.Second
s := semaphore.New(tickets, timeout)

if err := s.Acquire(); err != nil {
    panic(err)
}

// do something

if err := s.Release(); err != nil {
    panic(err)
}

無超時信號 (非阻塞)

tickets, timeout := 0, 0
s := semaphore.New(tickets, timeout)

if err := s.Acquire(); err != nil {
    if err != semaphore.ErrNoTickets {
        panic(err)
    }
    
    os.Exit(1)
}

流式計算

π 的計算公式

package main

import (
    "fmt"
    "math"
    "runtime"
)

const NCPU = 2

func main() {
    runtime.GOMAXPROCS(2)
    fmt.Println(CalculatePi(5000))
}

func CalculatePi(end int) float64 {
    ch := make(chan float64)

    for i := 0; i < NCPU; i++ {
        // 啓動 2 個 goroutine
        // 1 個 goroutine 計算 k (0 -> 2500)
        // 1 個 goroutine 計算 k (2500 -> 5000)
        go term(ch, i*end/NCPU, (i+1)*end/NCPU)
    }

    result := 0.0
    for i := 0; i < NCPU; i++ {
        result += <-ch
    }
    return result
}

// 計算公式展開式
func term(ch chan float64, start, end int) {
    result := 0.0
    for i := start; i < end; i++ {
        x := float64(i)
        result += 4 * (math.Pow(-1, x) / (2.0*x + 1.0))
    }
    ch <- result
}

// $ go run ma1in.go
// 輸出如下
// 3.1413926535917938
*/

簡單 Master/Worker

對於任何可以建模爲 Master-Worker 的問題,各個 Worker 通道和 Master 通信,如果系統是分佈式部署的,各個工作節點充當 Worker, 中央節點Master 和 Worker 之間使用 RPC 等協議進行通信。

package main

func main() {
    pending, done := make(chan *Task), make(chan *Task)
    go sendWork(pending)       
    
    for i := 0; i < N; i++ {   
        go Worker(pending, done)
    }
    
    consumeWork(done)          
}

func Worker(in, out chan *Task) {
    for {
        t := <-in
        process(t)
        out <- t
    }
}

Futures 模式

Futures 模式是指在使用某一個值之前需要先對其進行計算。這時可以在另一個 goroutine 進行該值的計算,到該值真正使用時就已經計算完畢了。 Futures 模式通過閉包和通道可以很容易實現,類似於生成器,不同地方在於 Futures 需要返回一個值。

假設我們有一個矩陣類型,我們需要計算兩個矩陣 A 和 B 乘積的逆,首先我們通過函數 Inverse(M) 分別對其進行求逆運算,再將結果相乘。

func InverseProduct(a Matrix, b Matrix) {
    a_inv_future := InverseFuture(a)   
    b_inv_future := InverseFuture(b)   
    a_inv := <-a_inv_future
    b_inv := <-b_inv_future
    return Product(a_inv, b_inv)
}

InverseFuture 函數以 goroutine 的形式起了一個閉包,該閉包會將矩陣求逆結果放入到 future 通道中:

func InverseFuture(a Matrix) chan Matrix {
    future := make(chan Matrix)
    go func() {
        future <- Inverse(a)
    }()
    return future
}

當開發一個計算密集型庫時,使用 Futures 模式設計 API 接口是很有意義的。在你的包使用 Futures 模式,且能保持友好的 API 接口。 此外,Futures 可以通過一個異步的 API 暴露出來。這樣就可以用最小的成本將包中的並行計算移到用戶代碼中。

限制併發請求處理數量

使用帶緩衝區的通道很容易實現,其緩衝區容量就是同時處理請求的最大數量。程序中超過 MAXREQS 的請求將不會被同時處理, 因爲當 sem 通道表示緩衝區已滿時,handle 函數會阻塞且不再處理其他請求,直到某個請求從 sem 通道中被移除。

package main

const MAXREQS = 50

var sem = make(chan int, MAXREQS)

type Request struct {
    a, b   int
    replyc chan int
}

func process(r *Request) {
    // do something
}

// 一進一出,一來一回,很巧妙
func handle(r *Request) {
    sem <- 1 // doesn't matter what we put in it
    process(r)
    <-sem // one empty place in the buffer: the next request can start
}

func server(service chan *Request) {
    for {
        request := <-service
        go handle(request)
    }
}

func main() {
    service := make(chan *Request)
    go server(service)
}

狀態模式

假設我們需要處理一些數量巨大且互不相關的數據項,它們從一個 in 通道被傳遞進來,當我們處理完以後又要將它們放入另一個 out 通道, 就像一個工廠流水線一樣。處理每個數據項也可能包含許多步驟:Preprocess(預處理) / StepA(步驟A) / StepB(步驟B) / ... / PostProcess(後處理)

讓每一個處理步驟作爲一個 goroutine 獨立工作,每一個步驟從上一步的輸出通道中獲得輸入數據。 這種方式僅有極少數時間會被浪費,而大部分時間所有的步驟都在一直執行中。

單純從流程描述的話,很像設計模式裏面的 “狀態模式”,核心是下一個數據依賴於上一個數據處理完成,通道的緩衝區大小可以調整優化。

func ParallelProcessData (in <-chan *Data, out chan<- *Data) {
    preOut := make(chan *Data, 100)
    stepAOut := make(chan *Data, 100)
    stepBOut := make(chan *Data, 100)
    stepCOut := make(chan *Data, 100)
    
    go PreprocessData(in, preOut)
    go ProcessStepA(preOut,StepAOut)
    go ProcessStepB(StepAOut,StepBOut)
    go ProcessStepC(StepBOut,StepCOut)
    go PostProcessData(StepCOut,out)
}

鏈式調用

package main

// 反向執行過程很像遞歸
func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    leftmost := make(chan int)
    var left, right chan int = nil, leftmost

    for i := 0; i < 100000; i++ {
        left, right = right, make(chan int)
        go f(left, right)
    }

    right <- 0      // bang!
    x := <-leftmost // wait for completion
    println(x)      // 100000
}

當循環完成之後,一個 0 被寫入到 最右邊 的通道里,於是 100000 個 goroutine 開始順序執行。

Reference

引用鏈接

[1] Github Design-Patterns: https://github.com/duanbiaowu/go-examples-for-beginners/tree/master/patterns/concurrency
[2] Go Concurrency Patterns: https://go.dev/talks/2012/concurrency.slide
[3] Go Concurrency Patterns: Pipelines and cancellation: https://go.dev/blog/pipelines
[4] Visualizing Concurrency in Go: https://divan.github.io/posts/go_concurrency_visualize/
[5] Go Language Patterns: http://www.golangpatterns.info/
[6] tmrts/go-patterns: https://github.com/tmrts/go-patterns
[7] Concurrency in Go: https://book.douban.com/subject/26994591/
[8] Rethinking Classical Concurrency Patterns: https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
[9] Go Concurrency: https://blogtitle.github.io/categories/concurrency/
[10] visualizing-concurrency-go: https://www.cloudbees.com/blog/visualizing-concurrency-go
[11] 代碼的未來: https://book.douban.com/subject/24536403/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/431YMKwRjaStanqA6juePA