asyncgo,純 Go 編寫的異步執行器

Asyncgo[1] 是純用 Go 編寫的零依賴異步任務執行器,優先考慮速度和易用性。

特徵

用例

使用

  1. 安裝
go get github.com/abhi16180/asyncgo
  1. 導入
import "github.com/abhi16180/asyncgo"

例子

  1. 異步執行多個函數
package main

import (
    "github.com/abhi16180/asyncgo"
    "log"
    "time"
)

func main() {
    executor := asyncgo.NewExecutor()
    future1 := executor.Submit(func(arg int) (int64, error) {
        time.Sleep(1 * time.Second)
        returnint64(arg * arg), nil
    }, 10)
    // 第一個參數是函數,其餘參數都是函數需要傳遞的參數
    // 如果函數簽名/參數不匹配,會導致執行錯誤
    future2 := executor.Submit(func(arg1 int, arg2 int) (int, error) {
        time.Sleep(1 * time.Second)
        return arg1 + arg2, nil
    }, 10, 20)
    // err 是執行錯誤,不代表函數返回的錯誤
    result1, err := future1.Get()
    if err != nil {
        log.Println(err)
        return
    }
    result2, err := future2.Get()
    if err != nil {
        log.Println(err)
        return
    }
    // result 是[]interface,包含所有返回值,包括函數返回的錯誤值
    log.Println(result1, result2)
}

executor.Submit(function,args..) 每次總是交換新的 goroutine。對於大量任務,建議使用工作池。

  1. 使用固定大小的工作池執行大量任務
package main

import (
"context"
"github.com/abhi16180/asyncgo"
"github.com/abhi16180/asyncgo/commons"
"log"
"time"
)

func main() {
 executor := asyncgo.NewExecutor()
 workerPool := executor.NewFixedWorkerPool(context.Background(), &commons.Options{
  WorkerCount: 100,
  BufferSize:  100,
 })
    // 優雅地終止所有工作者
// 保證每個任務都得到執行
defer workerPool.Shutdown()
 futures := []*asyncgo.Future{}
for i := 0; i < 1000; i++ {
  future, err := workerPool.Submit(timeConsumingTask)
  if err != nil {
   log.Println("error while submitting task to worker pool")
   continue
  }
  futures = append(futures, future)
 }

for _, future := range futures {
  result, err := future.Get()
  if err != nil {
   log.Println("error while getting result from future")
   continue
  }
  log.Println(result)
 }
}

func timeConsumingTask() string {
 time.Sleep(2 * time.Second)
return"success"
}
  1. 在執行過程中取消工作池
package main

import (
"context"
"github.com/abhi16180/asyncgo"
"github.com/abhi16180/asyncgo/commons"
"log"
"time"
)

func main() {
 executor := asyncgo.NewExecutor()
 workerPool := executor.NewFixedWorkerPool(context.Background(), &commons.Options{
  WorkerCount: 100,
  BufferSize:  100,
 })

 futures := []*asyncgo.Future{}
for i := 0; i < 1000; i++ {
  future, err := workerPool.Submit(timeConsumingTask)
  if err != nil {
   log.Println("error while submitting task to worker pool")
   continue
  }
  futures = append(futures, future)
 }
// 在任務執行過程中終止工作池
 workerPool.Terminate()
}

func timeConsumingTask() string {
 time.Sleep(2 * time.Second)
return"success"
}
  1. 有關更多用例和複雜示例,請查看示例 [2] 部分

參考資料

[1]

Asyncgo:https://github.com/abhi16180/asyncgo

[2]

示例:https://github.com/abhi16180/asyncgo/tree/main/examples

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Cm4xD2TGVNopVQQvIKZBCw