golang 批量執行任務的通用模板
需求
一個接口調用時,接收到一個列表,十個元素,需要併發執行十個任務,每個任務都要返回執行的結果和異常,然後對返回的結果裝填到一個切片列表裏,統一返回結果。
需要協程處理的結構體
type Order struct {
Name string `json:"name"`
Id int `json:"id"`
}
確定通道數量
一般按入參的需要處理的元素數量爲準
taskNum := 10
初始化通道
orderCh := make(chan Order, taskNum) //接收返回的結果
errCh := make(chan error, taskNum) //接收返回的異常
發起執行,我們使用 sync.WaitGroup 來監聽執行情況
wg := sync.WaitGroup{}
for i:=0; i < taskNum; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if i == 3 {//模擬當i=3的時候,返回一個異常
err := errors.New("there is an error")
errCh <- err
return
}
//組裝返回結果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <- res
}()
}
wg.Wait() //等待所有任務執行完畢
使用 for-select 接收執行結果
orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderList, order)
}
case err := <-errCh: //接收errCh
if err != nil {
return err //看需求,這裏設計發現一個錯誤就直接停止執行,返回錯誤
}
default:
fmt.Println("done")
}
}
//處理完數據,關閉通道
close(orderCh)
close(errCh)
1. 超時問題
任務執行過程中,需要控制每個任務的執行時間,不能超過一定範圍,我們用定時器來解決這個問題
timeoutTime := time.Second * 3 //超時時間
taskTimer := time.NewTimer(timeoutTime) //初始化定時器
orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
select {
....
case <-taskTimeout.C: //處理超時
err := errors.New("task timeout") //此處我們認爲超時是錯誤的一種,賦值給了err
return
...
}
//每次執行都需要重置定時器
taskTimer.Reset(timeoutTime)
}
2. 協程 panic 問題
主程序是無法捕捉協程內的 panic,因此如果不手動處理,就會發生協程內 panic 導致整個程序中止的情況,我們在 defer 裏處理
for i:=0; i < taskNum; i++ {
wg.Add(1)
go func() {
defer func () {
wg.Done()
//協程內單獨捕捉異常
if r := recover(); r != nil {
err := errors.New(fmt.Sprintf("System panic:%v", r))
errCh <- err //此處將panic信息轉爲err返回,也可以按需求和異常等級進行處理
return
}
}()
........
}()
}
3. 順序問題
返回的列表元素的順序,需要跟傳參的列表順序保持一致,這時我們需要定義個帶序號的結構體
// 需要記錄原始順序的時候,定義個帶編號的結構體
type OrderWithSeq struct {
Seq int
OrderItem Order
}
//重寫相關排序類型
type BySeq []OrderWithSeq
func (a BySeq) Len() int {
return len(a)
}
func (a BySeq) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a BySeq) Less(i, j int) bool {
return a[i].Seq < a[j].Seq
}
// 調整返回結果
orderCh := make(chan OrderWithSeq, taskNum) //接收帶序號的結構體
//在執行任務時,加入序號
for i:=0; i < taskNum; i++ {
i:= i
wg.Add(1)
go func() {
····
//組裝返回結果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <-OrderWithSeq {
Seq: i, //帶上i這個序號
OrderItem: res,
}
}()
//接收信息,也按帶序號的結構體進行組裝
orderSeqList := make([]OrderWithSeq, taskNum)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderSeqList, order)
}
.....
}
}
//按原始順序進行排序
sort.Sort(BySeq(orderSeqList))
....重新組裝數據返回
總結
標準模板如下:
type Order struct {
Name string `json:"name"`
Id int `json:"id"`
}
// 需要記錄原始順序的時候,定義個帶編號的結構體
type OrderWithSeq struct {
Seq int
OrderItem Order
}
//重寫相關排序類型
type BySeq []OrderWithSeq
func (a BySeq) Len() int {
return len(a)
}
func (a BySeq) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a BySeq) Less(i, j int) bool {
return a[i].Seq < a[j].Seq
}
taskNum := 10
orderCh := make(chan OrderWithSeq, taskNum) //接收帶序號的結構體
errCh := make(chan error, taskNum) //接收返回的異常
wg := sync.WaitGroup{}
//在執行任務時,加入序號
for i:=0; i < taskNum; i++ {
i:= i
wg.Add(1)
go func() {
defer func () {
wg.Done()
//協程內單獨捕捉異常
if r := recover(); r != nil {
err := errors.New(fmt.Sprintf("System panic:%v", r))
errCh <- err //此處將panic信息轉爲err返回,也可以按需求和異常等級進行處理
return
}
}()
//組裝返回結果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <-OrderWithSeq {
Seq: i, //帶上i這個序號
OrderItem: res,
}
}()
wg.Wait()
//接收信息,也按帶序號的結構體進行組裝
orderSeqList := make([]OrderWithSeq, taskNum)
timeoutTime := time.Second * 3
taskTimer := time.NewTimer(timeoutTime)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderSeqList, order)
}
case err := <-errCh: //接收errCh
if err != nil {
return err
}
case <-taskTimer.C: //處理超時
err := errors.New("task timeout")
return
default:
fmt.Println("done")
}
taskTimer.Reset(timeoutTime)
}
close(orderCh)
close(errCh)
//按原始順序進行排序
sort.Sort(BySeq(orderSeqList))
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/42aZB4BFxhp5E1NG4ZFjWA