Go 語言併發模式代碼模板
前言
文章代碼量較多,如果是初次閱讀,建議瞭解每種模式的基礎用法即可,工作中有實際的應用開發場景時,再回來研究代碼細節。
經典模式 (pipeline + selector)
這裏有一個來自 Go 官方博客的例子,通過管道篩選質數並打印。
質數篩選器
第一個版本
package main
import "fmt"
// 生成數字併發送到通道
func generate(ch chan int) {
for i := 2; ; i++ {
ch <- i
}
}
// 通過參數質數過濾 in 通道傳遞的數字
func filter(in, out chan int, prime int) {
for {
i := <-in
if i%prime != 0 {
out <- i
}
}
}
func main() {
ch := make(chan int)
go generate(ch)
for {
// 不斷生成數字和篩選管道
prime := <-ch
fmt.Print(prime, " ")
ch1 := make(chan int)
go filter(ch, ch1, prime)
ch = ch1
}
}
第二個版本
第二個版本在之前的基礎上進行了改進:sieve
, generate
, filter
改爲工廠函數,創建通道並返回,而且使用了協程的 lambda
函數。 main
函數變得更加短小清晰:調用 sieve()
返回包含質數的通道,然後打印即可。
package main
import (
"fmt"
)
func generate() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}
func filter(in chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}
func sieve() chan int {
out := make(chan int)
go func() {
ch := generate()
for {
prime := <-ch
ch = filter(ch, prime)
out <- prime
}
}()
return out
}
func main() {
primes := sieve()
for {
fmt.Println(<-primes)
}
}
質數篩選器動圖
FanIn FanOut
Fan-In: 1 個 goroutine 從多個通道讀取數據 (一對多)
Fan-Out: 多個 goroutine 從 1 個通道讀取數據 (多對一)
圖片來源: https://jguer.space/posts/go-fanout-context/
官網上有一個經典用例 質數求和
,同時用到了這兩種模式。
package main
import (
"math"
"sync"
"time"
)
func echo(numbs []int) <-chan int {
out := make(chan int)
go func() {
for _, n := range numbs {
out <- n
}
close(out)
}()
return out
}
// 求和函數
func sum(in <-chan int) <-chan int {
out := make(chan int)
go func() {
res := 0
for n := range in {
res += n
}
out <- res
close(out)
}()
return out
}
func makeRange(min, max int) []int {
a := make([]int, max-min+1)
for i := range a {
a[i] = min + i
}
return a
}
func isPrime(value int) bool {
for i := 2; i <= int(math.Floor(float64(value)/2)); i++ {
if value%i == 0 {
return false
}
}
return value > 1
}
func prime(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if isPrime(n) {
out <- n
}
}
close(out)
}()
return out
}
func merge(cs []<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(cs))
for _, c := range cs {
go func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
nums := makeRange(1, 10)
in := echo(nums)
// Fan-Out
var cs [4]<-chan int
for i := range cs {
cs[i] = sum(prime(in))
}
// Fan-In
out := sum(merge(cs[:]))
println(<-out)
time.Sleep(time.Second) // 等待 out 通道關閉
}
Pipeline
管道 (pipeline)
是由通道連接的一系列階段,其中每個階段都是一組運行相同功能的 goroutine
,例如 Linux
中的管道命令:
$ ps –ef | grep systemd | awk '{print $2}'
圖片來源: https://medium.com/@eric.g.yuan/go-concurrency-patterns-pipeline-2845d84bd92d
package main
import "fmt"
func main() {
// 數值生成器管道
generator := func(done <-chan int, nums []int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, n := range nums {
select {
case <-done:
return
case intStream <- n:
}
}
}()
return intStream
}
// 數值相乘管道
multiply := func(done <-chan int, intStream <-chan int, multiplier int) <-chan int {
multiplyStream := make(chan int)
go func() {
defer close(multiplyStream)
for i := range intStream {
select {
case <-done:
return
case multiplyStream <- i * multiplier:
}
}
}()
return multiplyStream
}
// 數值相加管道
add := func(done <-chan int, intStream <-chan int, addition int) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + addition:
}
}
}()
return addedStream
}
done := make(chan int)
defer close(done)
intStream := generator(done, []int{1, 2, 3})
// 執行了類似 Linux 中的管道命令: generator | multiply | add
pipeline := add(done, multiply(done, intStream, 2), 1)
for p := range pipeline {
fmt.Println(p)
}
}
Generate
生成者模式 (Generate)
像 yield
一樣,生成一系列連續的值。
package main
func Count(start int, end int) chan int {
ch := make(chan int)
go func(ch chan int) {
for i := start; i <= end; i++ {
ch <- i
}
close(ch)
}(ch)
return ch
}
func main() {
for i := range Count(1, 5) {
println(i)
}
}
// $ go run main.go
// 1
// 2
// 3
// 4
// 5
生產者 / 消費者
package main
import "fmt"
// 生產者
func produce(start, count int, out chan<- int) {
for i := 0; i < count; i++ {
out <- start
start = start + count
}
close(out)
}
// 消費者
func consume(in <-chan int, done chan<- bool) {
for num := range in {
fmt.Printf("%d\n", num)
}
done <- true
}
func main() {
numChan := make(chan int)
done := make(chan bool)
go produce(0, 10, numChan)
go consume(numChan, done)
<-done
}
// $ go run ma1in.go
// 輸出如下
// 0
// 10
// 20
// 30
// 40
// 50
// 60
// 70
// 80
// 90
信號量模式
信號量
是一種同步原語,對數量有限的資源的訪問進行控制。
接口
type Interface interface {
Acquire() error
Release() error
}
資源信號量
package main
var (
ErrNoTickets = errors.New("semaphore: could not aquire semaphore")
ErrIllegalRelease = errors.New("semaphore: can't release the semaphore without acquiring it first")
)
type implementation struct {
sem chan struct{}
timeout time.Duration
}
// Acquire 請求資源
func (s *implementation) Acquire() error {
select {
case s.sem <- struct{}{}:
return nil
case <-time.After(s.timeout):
return ErrNoTickets
}
}
// Release 釋放資源
func (s *implementation) Release() error {
select {
case _ = <-s.sem:
return nil
case <-time.After(s.timeout):
return ErrIllegalRelease
}
return nil
}
func New(tickets int, timeout time.Duration) Interface {
return &implementation{
sem: make(chan struct{}, tickets),
timeout: timeout,
}
}
超時信號
tickets, timeout := 1, 3*time.Second
s := semaphore.New(tickets, timeout)
if err := s.Acquire(); err != nil {
panic(err)
}
// do something
if err := s.Release(); err != nil {
panic(err)
}
無超時信號 (非阻塞)
tickets, timeout := 0, 0
s := semaphore.New(tickets, timeout)
if err := s.Acquire(); err != nil {
if err != semaphore.ErrNoTickets {
panic(err)
}
os.Exit(1)
}
流式計算
π 的計算公式
package main
import (
"fmt"
"math"
"runtime"
)
const NCPU = 2
func main() {
runtime.GOMAXPROCS(2)
fmt.Println(CalculatePi(5000))
}
func CalculatePi(end int) float64 {
ch := make(chan float64)
for i := 0; i < NCPU; i++ {
// 啓動 2 個 goroutine
// 1 個 goroutine 計算 k (0 -> 2500)
// 1 個 goroutine 計算 k (2500 -> 5000)
go term(ch, i*end/NCPU, (i+1)*end/NCPU)
}
result := 0.0
for i := 0; i < NCPU; i++ {
result += <-ch
}
return result
}
// 計算公式展開式
func term(ch chan float64, start, end int) {
result := 0.0
for i := start; i < end; i++ {
x := float64(i)
result += 4 * (math.Pow(-1, x) / (2.0*x + 1.0))
}
ch <- result
}
// $ go run ma1in.go
// 輸出如下
// 3.1413926535917938
*/
簡單 Master/Worker
對於任何可以建模爲 Master-Worker
的問題,各個 Worker
通道和 Master
通信,如果系統是分佈式部署的,各個工作節點充當 Worker
, 中央節點Master
和 Worker
之間使用 RPC
等協議進行通信。
package main
func main() {
pending, done := make(chan *Task), make(chan *Task)
go sendWork(pending)
for i := 0; i < N; i++ {
go Worker(pending, done)
}
consumeWork(done)
}
func Worker(in, out chan *Task) {
for {
t := <-in
process(t)
out <- t
}
}
Futures 模式
Futures
模式是指在使用某一個值之前需要先對其進行計算。這時可以在另一個 goroutine
進行該值的計算,到該值真正使用時就已經計算完畢了。 Futures
模式通過閉包和通道可以很容易實現,類似於生成器,不同地方在於 Futures
需要返回一個值。
假設我們有一個矩陣類型,我們需要計算兩個矩陣 A 和 B 乘積的逆,首先我們通過函數 Inverse(M)
分別對其進行求逆運算,再將結果相乘。
func InverseProduct(a Matrix, b Matrix) {
a_inv_future := InverseFuture(a)
b_inv_future := InverseFuture(b)
a_inv := <-a_inv_future
b_inv := <-b_inv_future
return Product(a_inv, b_inv)
}
InverseFuture
函數以 goroutine
的形式起了一個閉包,該閉包會將矩陣求逆結果放入到 future
通道中:
func InverseFuture(a Matrix) chan Matrix {
future := make(chan Matrix)
go func() {
future <- Inverse(a)
}()
return future
}
當開發一個計算密集型庫時,使用 Futures
模式設計 API 接口是很有意義的。在你的包使用 Futures
模式,且能保持友好的 API 接口。 此外,Futures
可以通過一個異步的 API 暴露出來。這樣就可以用最小的成本將包中的並行計算移到用戶代碼中。
限制併發請求處理數量
使用帶緩衝區的通道很容易實現,其緩衝區容量就是同時處理請求的最大數量。程序中超過 MAXREQS
的請求將不會被同時處理, 因爲當 sem
通道表示緩衝區已滿時,handle
函數會阻塞且不再處理其他請求,直到某個請求從 sem
通道中被移除。
package main
const MAXREQS = 50
var sem = make(chan int, MAXREQS)
type Request struct {
a, b int
replyc chan int
}
func process(r *Request) {
// do something
}
// 一進一出,一來一回,很巧妙
func handle(r *Request) {
sem <- 1 // doesn't matter what we put in it
process(r)
<-sem // one empty place in the buffer: the next request can start
}
func server(service chan *Request) {
for {
request := <-service
go handle(request)
}
}
func main() {
service := make(chan *Request)
go server(service)
}
狀態模式
假設我們需要處理一些數量巨大且互不相關的數據項,它們從一個 in
通道被傳遞進來,當我們處理完以後又要將它們放入另一個 out
通道, 就像一個工廠流水線一樣。處理每個數據項也可能包含許多步驟:Preprocess(預處理) / StepA(步驟A) / StepB(步驟B) / ... / PostProcess(後處理)
。
讓每一個處理步驟作爲一個 goroutine
獨立工作,每一個步驟從上一步的輸出通道中獲得輸入數據。 這種方式僅有極少數時間會被浪費,而大部分時間所有的步驟都在一直執行中。
單純從流程描述的話,很像設計模式裏面的 “狀態模式”,核心是下一個數據依賴於上一個數據處理完成,通道的緩衝區大小可以調整優化。
func ParallelProcessData (in <-chan *Data, out chan<- *Data) {
preOut := make(chan *Data, 100)
stepAOut := make(chan *Data, 100)
stepBOut := make(chan *Data, 100)
stepCOut := make(chan *Data, 100)
go PreprocessData(in, preOut)
go ProcessStepA(preOut,StepAOut)
go ProcessStepB(StepAOut,StepBOut)
go ProcessStepC(StepBOut,StepCOut)
go PostProcessData(StepCOut,out)
}
鏈式調用
package main
// 反向執行過程很像遞歸
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
leftmost := make(chan int)
var left, right chan int = nil, leftmost
for i := 0; i < 100000; i++ {
left, right = right, make(chan int)
go f(left, right)
}
right <- 0 // bang!
x := <-leftmost // wait for completion
println(x) // 100000
}
當循環完成之後,一個 0 被寫入到 最右邊
的通道里,於是 100000
個 goroutine
開始順序執行。
Reference
-
• Github Design-Patterns[1]
-
• Go Concurrency Patterns[2]
-
• Go Concurrency Patterns: Pipelines and cancellation[3]
-
• Visualizing Concurrency in Go[4]
-
• Go Language Patterns[5]
-
• tmrts/go-patterns[6]
-
• Concurrency in Go[7]
-
• Rethinking Classical Concurrency Patterns[8]
-
• Go Concurrency[9]
-
• visualizing-concurrency-go[10]
-
• 代碼的未來 [11]
引用鏈接
[1]
Github Design-Patterns: https://github.com/duanbiaowu/go-examples-for-beginners/tree/master/patterns/concurrency
[2]
Go Concurrency Patterns: https://go.dev/talks/2012/concurrency.slide
[3]
Go Concurrency Patterns: Pipelines and cancellation: https://go.dev/blog/pipelines
[4]
Visualizing Concurrency in Go: https://divan.github.io/posts/go_concurrency_visualize/
[5]
Go Language Patterns: http://www.golangpatterns.info/
[6]
tmrts/go-patterns: https://github.com/tmrts/go-patterns
[7]
Concurrency in Go: https://book.douban.com/subject/26994591/
[8]
Rethinking Classical Concurrency Patterns: https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
[9]
Go Concurrency: https://blogtitle.github.io/categories/concurrency/
[10]
visualizing-concurrency-go: https://www.cloudbees.com/blog/visualizing-concurrency-go
[11]
代碼的未來: https://book.douban.com/subject/24536403/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/431YMKwRjaStanqA6juePA