golang 批量執行任務的通用模板

需求

一個接口調用時,接收到一個列表,十個元素,需要併發執行十個任務,每個任務都要返回執行的結果和異常,然後對返回的結果裝填到一個切片列表裏,統一返回結果。

需要協程處理的結構體

type Order struct {  
  Name string `json:"name"`  
  Id int `json:"id"`  
}

確定通道數量

一般按入參的需要處理的元素數量爲準

taskNum := 10

初始化通道

orderCh := make(chan Order, taskNum) //接收返回的結果
errCh := make(chan error, taskNum) //接收返回的異常

發起執行,我們使用 sync.WaitGroup 來監聽執行情況

wg := sync.WaitGroup{}
for i:=0; i < taskNum; i++ {
   wg.Add(1)
   go func() {
     defer wg.Done()
     if i == 3 {//模擬當i=3的時候,返回一個異常
         err := errors.New("there is an error")
         errCh <- err 
         return
     }
     //組裝返回結果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <- res    
  }()
}
wg.Wait() //等待所有任務執行完畢

使用 for-select 接收執行結果

orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
    select {
        case order, ok := <-orderCh: //接收orderCh
        if ok {
            orderList = append(orderList, order)
        }
        case err := <-errCh: //接收errCh
        if err != nil {
            return err //看需求,這裏設計發現一個錯誤就直接停止執行,返回錯誤
        }
        default:
        fmt.Println("done")
    }
}
//處理完數據,關閉通道
close(orderCh)
close(errCh)

1. 超時問題

任務執行過程中,需要控制每個任務的執行時間,不能超過一定範圍,我們用定時器來解決這個問題

timeoutTime := time.Second * 3  //超時時間
taskTimer := time.NewTimer(timeoutTime) //初始化定時器
orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
    select {
        ....
        case <-taskTimeout.C: //處理超時
            err := errors.New("task timeout") //此處我們認爲超時是錯誤的一種,賦值給了err
            return
        ...
    }
    //每次執行都需要重置定時器
    taskTimer.Reset(timeoutTime)
}

2. 協程 panic 問題

主程序是無法捕捉協程內的 panic,因此如果不手動處理,就會發生協程內 panic 導致整個程序中止的情況,我們在 defer 裏處理

for i:=0; i < taskNum; i++ {
   wg.Add(1)
   go func() {
     defer func () {
      wg.Done()
      //協程內單獨捕捉異常  
      if r := recover(); r != nil {  
        err := errors.New(fmt.Sprintf("System panic:%v", r))  
        errCh <- err //此處將panic信息轉爲err返回,也可以按需求和異常等級進行處理
        return
      }
     }()
   ........
  }()
}

3. 順序問題

返回的列表元素的順序,需要跟傳參的列表順序保持一致,這時我們需要定義個帶序號的結構體

// 需要記錄原始順序的時候,定義個帶編號的結構體  
type OrderWithSeq struct {  
    Seq int  
    OrderItem Order  
}  
//重寫相關排序類型
type BySeq []OrderWithSeq  
func (a BySeq) Len() int {  
    return len(a)  
}  
func (a BySeq) Swap(i, j int) {  
    a[i], a[j] = a[j], a[i]  
}  
func (a BySeq) Less(i, j int) bool {  
    return a[i].Seq < a[j].Seq  
}
// 調整返回結果
orderCh := make(chan OrderWithSeq, taskNum) //接收帶序號的結構體
//在執行任務時,加入序號
for i:=0; i < taskNum; i++ {
   i:= i
   wg.Add(1)
   go func() {
     ····
     //組裝返回結果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <-OrderWithSeq {
         Seq: i, //帶上i這個序號
         OrderItem: res,
     }
  }()
 //接收信息,也按帶序號的結構體進行組裝
 orderSeqList := make([]OrderWithSeq, taskNum)
 for i:=0; i<taskNum; i++ {
    select {
        case order, ok := <-orderCh: //接收orderCh
        if ok {
            orderList = append(orderSeqList, order)
        }
       .....
     }
   }
 //按原始順序進行排序
sort.Sort(BySeq(orderSeqList))
....重新組裝數據返回

總結

標準模板如下:

type Order struct {  
  Name string `json:"name"`  
  Id int `json:"id"`  
}
// 需要記錄原始順序的時候,定義個帶編號的結構體  
type OrderWithSeq struct {  
    Seq int  
    OrderItem Order  
}  
//重寫相關排序類型
type BySeq []OrderWithSeq  
func (a BySeq) Len() int {  
    return len(a)  
}  
func (a BySeq) Swap(i, j int) {  
    a[i], a[j] = a[j], a[i]  
}  
func (a BySeq) Less(i, j int) bool {  
    return a[i].Seq < a[j].Seq  
}
taskNum := 10 
orderCh := make(chan OrderWithSeq, taskNum) //接收帶序號的結構體
errCh := make(chan error, taskNum) //接收返回的異常
wg := sync.WaitGroup{}
//在執行任務時,加入序號
for i:=0; i < taskNum; i++ {
   i:= i
   wg.Add(1)
   go func() {
     defer func () {
      wg.Done()
      //協程內單獨捕捉異常  
      if r := recover(); r != nil {  
        err := errors.New(fmt.Sprintf("System panic:%v", r))  
        errCh <- err //此處將panic信息轉爲err返回,也可以按需求和異常等級進行處理
        return
      }
     }()
     //組裝返回結果
     res := Order{  
         Name: "num: " + strconv.Itoa(i),  
         Id: i,  
         }
     orderCh <-OrderWithSeq {
         Seq: i, //帶上i這個序號
         OrderItem: res,
     }
  }()
 wg.Wait()
  //接收信息,也按帶序號的結構體進行組裝
 orderSeqList := make([]OrderWithSeq, taskNum)
 timeoutTime := time.Second * 3 
 taskTimer := time.NewTimer(timeoutTime)
 for i:=0; i<taskNum; i++ {
    select {
        case order, ok := <-orderCh: //接收orderCh
        if ok {
            orderList = append(orderSeqList, order)
        }
        case err := <-errCh: //接收errCh
        if err != nil {
            return err
        }
        case <-taskTimer.C: //處理超時
        err := errors.New("task timeout")
        return
        default:
        fmt.Println("done")
     }
     taskTimer.Reset(timeoutTime)
   }
close(orderCh)
close(errCh)
 //按原始順序進行排序
sort.Sort(BySeq(orderSeqList))
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/42aZB4BFxhp5E1NG4ZFjWA