Golang 如何優雅地實現併發編排任務
圖片拍攝於 2021 年 1 月 3 日。
業務場景
在做任務開發的時候,你們一定會碰到以下場景:
場景 1:調用第三方接口的時候, 一個需求你需要調用不同的接口,做數據組裝。
場景 2: 一個應用首頁可能依託於很多服務。那就涉及到在加載頁面時需要同時請求多個服務的接口。這一步往往是由後端統一調用組裝數據再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。
針對以上兩種場景,假設在沒有強依賴關係下,選擇串行調用,那麼總耗時即:
time=s1+s2+....sn
按照當代秒入百萬的有爲青年,這麼長時間早就把你祖宗十八代問候了一遍。
爲了偉大的 KPI,我們往往會選擇併發地調用這些依賴接口。那麼總耗時就是:
time=max(s1,s2,s3.....,sn)
當然開始堆業務的時候可以先串行化,等到上面的人着急的時候,亮出絕招。
這樣,年底 PPT
就可以加上濃重的一筆流水賬: 爲業務某個接口提高百分之 XXX 性能,間接產生 XXX 價值。
當然這一切的前提是,做老闆不懂技術,做技術” 懂” 你。
言歸正傳, 如果修改成併發調用,你可能會這麼寫,
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
var userInfo *User
var productList []Product
go func() {
defer wg.Done()
userInfo, _ = getUser()
}()
go func() {
defer wg.Done()
productList, _ = getProductList()
}()
wg.Wait()
fmt.Printf("用戶信息:%+v\n", userInfo)
fmt.Printf("商品信息:%+v\n", productList)
}
/********用戶服務**********/
type User struct {
Name string
Age uint8
}
func getUser() (*User, error) {
time.Sleep(500 * time.Millisecond)
var u User
u.Name = "wuqinqiang"
u.Age = 18
return &u, nil
}
/********商品服務**********/
type Product struct {
Title string
Price uint32
}
func getProductList() ([]Product, error) {
time.Sleep(400 * time.Millisecond)
var list []Product
list = append(list, Product{
Title: "SHib",
Price: 10,
})
return list, nil
}
從實現上來說,需要多少服務,會開多少個 G
,利用 sync.WaitGroup
的特性,
實現併發編排任務的效果。
好像,問題不大。
但是隨着代號 996
業務場景的增加,你會發現,好多模塊都有相似的功能,只是對應的業務場景不同而已。
那麼我們能不能抽像出一套針對此業務場景的工具,而把具體業務實現交給業務方。
使用
本着不重複造輪子的原則,去搜了下開源項目,最終看上了 go-zero
裏面的一個工具 mapreduce
。
可以自行 Google
這個名詞。
使用很簡單。我們通過它改造一下上面的代碼:
package main
import (
"fmt"
"github.com/tal-tech/go-zero/core/mr"
"time"
)
func main() {
var userInfo *User
var productList []Product
_ = mr.Finish(func() (err error) {
userInfo, err = getUser()
return err
}, func() (err error) {
productList, err = getProductList()
return err
})
fmt.Printf("用戶信息:%+v\n", userInfo)
fmt.Printf("商品信息:%+v\n", productList)
}
//打印
用戶信息:&{Name:wuqinqiang Age:18}
商品信息:[{Title:SHib Price:10}]
是不是舒服多了。
但是這裏還需要注意一點,假設你調用的其中一個服務錯誤,並且你 return err
對應的錯誤,那麼其他調用的服務會被取消。
比如我們修改 getProductList 直接響應錯誤。
func getProductList() ([]Product, error) {
return nil, errors.New("test error")
}
//打印
// 用戶信息:<nil>
// 商品信息:[]
那麼最終打印的時候連用戶信息都會爲空,因爲出現一個服務錯誤,用戶服務請求被取消了。
一般情況下,在請求服務錯誤的時候我們會有保底操作,一個服務錯誤不能影響其他請求的結果。
所以在使用的時候具體處理取決於業務場景。
源碼
既然用了,那麼就追下源碼吧。
func Finish(fns ...func() error) error {
if len(fns) == 0 {
return nil
}
return MapReduceVoid(func(source chan<- interface{}) {
for _, fn := range fns {
source <- fn
}
}, func(item interface{}, writer Writer, cancel func(error)) {
fn := item.(func() error)
if err := fn(); err != nil {
cancel(err)
}
}, func(pipe <-chan interface{}, cancel func(error)) {
drain(pipe)
}, WithWorkers(len(fns)))
}
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel)
drain(input)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...)
return err
}
對於 MapReduceVoid
函數,主要查看三個閉包參數。
-
第一個
GenerateFunc
用於生產數據。 -
MapperFunc
讀取生產出的數據,進行處理。 -
VoidReducerFunc
這裏表示不對mapper
後的數據做聚合返回。所以這個閉包在此操作幾乎 0 作用。
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {
source := buildSource(generate)
return MapReduceWithSource(source, mapper, reducer, opts...)
}
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})// 創建無緩衝通道
threading.GoSafe(func() {
defer close(source)
generate(source) //開始生產數據
})
return source //返回無緩衝通道
}
buildSource
函數中,返回一個無緩衝的通道。並開啓一個 G
運行 generate(source)
,往無緩衝通道塞數據。這個generate(source)
不就是一開始 Finish
傳遞的第一個閉包參數。
return MapReduceVoid(func(source chan<- interface{}) {
// 就這個
for _, fn := range fns {
source <- fn
}
})
然後查看 MapReduceWithSource
函數,
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
options := buildOptions(opts...)
//任務執行結束通知信號
output := make(chan interface{})
//將mapper處理完的數據寫入collector
collector := make(chan interface{}, options.workers)
// 取消操作信號
done := syncx.NewDoneChan()
writer := newGuardedWriter(output, done.Done())
var closeOnce sync.Once
var retErr errorx.AtomicError
finish := func() {
closeOnce.Do(func() {
done.Close()
close(output)
})
}
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
retErr.Set(ErrCancelWithNil)
}
drain(source)
finish()
})
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
drain(collector)
}()
// 真正從生成器通道取數據執行Mapper
go executeMappers(func(item interface{}, w Writer) {
mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
value, ok := <-output
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return value, nil
} else {
return nil, ErrReduceNoOutput
}
}
這段代碼挺長的,我們說下核心的點。這裏使用一個G
調用 executeMappers
方法。
go executeMappers(func(item interface{}, w Writer) {
mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
done <-chan lang.PlaceholderType, workers int) {
var wg sync.WaitGroup
defer func() {
// 等待所有任務全部執行完畢
wg.Wait()
// 關閉通道
close(collector)
}()
//根據指定數量創建 worker池
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done)
for {
select {
case <-done:
return
case pool <- lang.Placeholder:
// 從buildSource() 返回的無緩衝通道取數據
item, ok := <-input
// 當通道關閉,結束
if !ok {
<-pool
return
}
wg.Add(1)
// better to safely run caller defined method
threading.GoSafe(func() {
defer func() {
wg.Done()
<-pool
}()
//真正運行閉包函數的地方
// func(item interface{}, w Writer) {
// mapper(item, w, cancel)
// }
mapper(item, writer)
})
}
}
}
具體的邏輯已備註,代碼很容易懂。
一旦 executeMappers
函數返回,關閉 collector
通道,那麼執行 reducer
不再阻塞。
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
//這裏
drain(collector)
}()
這裏的 reducer(collector, writer, cancel)
其實就是從 MapReduceVoid
傳遞的第三個閉包函數。
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel)
//這裏
drain(input)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...)
return err
}
然後這個閉包函數又執行了 reducer(input, cancel)
,這裏的 reducer
就是我們一開始解釋過的 VoidReducerFunc
,從 Finish() 而來
。
等等, 看到上面三個地方的 drain(input)
了嗎?
// drain drains the channel.
func drain(channel <-chan interface{}) {
// drain the channel
for range channel {
}
}
其實就是一個排空 channel
的操作,但是三個地方都對同一個 channel
做同樣的操作,也是讓我費解。
還有更重要的一點。
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
drain(collector)
}()
上面的代碼,假如執行 reducer
,writer
寫入引發 panic
, 那麼drain(collector)
將沒有機會執行。
不過作者已經修復了這個問題,直接把 drain(collector)
放入到 defer
。
具體 issues[1]。
到這裏,關於 Finish
的源碼也就結束了。感興趣的可以看看其他源碼。
很喜歡 go-zero 裏的一些工具,但是工具往往並不獨立,依賴於其他文件包,導致明明只想使用其中一個工具卻需要安裝整個包。
所以最終的結果就是扒源碼,創建無依賴庫工具集,遵循 MIT
即可。
附錄
[1]https://github.com/tal-tech/go-zero/issues/676
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/M88-VS_H7o754mw5Ra7BYg