asyncgo,純 Go 編寫的異步執行器
Asyncgo[1] 是純用 Go 編寫的零依賴異步任務執行器,優先考慮速度和易用性。
特徵
-
異步任務執行:提交任務以異步執行並檢索結果。
-
無需手動管理 Goroutine :抽象化管理 Goroutine 的複雜性,並簡化代碼。
-
工作池管理:Asyncgo 仔細處理工作池創建和任務執行。
-
正常關閉:確保在關閉工作線程之前完成所有現有任務。
-
任務取消:支持終端終止任務。
用例
-
可用於微服務的異步 HTTP 請求。
-
後臺作業執行。
-
使用工作池進行無限併發輪詢(從 AWS SQS 或類似服務接收消息)。
使用
- 安裝
go get github.com/abhi16180/asyncgo
- 導入
import "github.com/abhi16180/asyncgo"
例子
- 異步執行多個函數
package main
import (
"github.com/abhi16180/asyncgo"
"log"
"time"
)
func main() {
executor := asyncgo.NewExecutor()
future1 := executor.Submit(func(arg int) (int64, error) {
time.Sleep(1 * time.Second)
returnint64(arg * arg), nil
}, 10)
// 第一個參數是函數,其餘參數都是函數需要傳遞的參數
// 如果函數簽名/參數不匹配,會導致執行錯誤
future2 := executor.Submit(func(arg1 int, arg2 int) (int, error) {
time.Sleep(1 * time.Second)
return arg1 + arg2, nil
}, 10, 20)
// err 是執行錯誤,不代表函數返回的錯誤
result1, err := future1.Get()
if err != nil {
log.Println(err)
return
}
result2, err := future2.Get()
if err != nil {
log.Println(err)
return
}
// result 是[]interface,包含所有返回值,包括函數返回的錯誤值
log.Println(result1, result2)
}
executor.Submit(function,args..)
每次總是交換新的 goroutine。對於大量任務,建議使用工作池。
- 使用固定大小的工作池執行大量任務
package main
import (
"context"
"github.com/abhi16180/asyncgo"
"github.com/abhi16180/asyncgo/commons"
"log"
"time"
)
func main() {
executor := asyncgo.NewExecutor()
workerPool := executor.NewFixedWorkerPool(context.Background(), &commons.Options{
WorkerCount: 100,
BufferSize: 100,
})
// 優雅地終止所有工作者
// 保證每個任務都得到執行
defer workerPool.Shutdown()
futures := []*asyncgo.Future{}
for i := 0; i < 1000; i++ {
future, err := workerPool.Submit(timeConsumingTask)
if err != nil {
log.Println("error while submitting task to worker pool")
continue
}
futures = append(futures, future)
}
for _, future := range futures {
result, err := future.Get()
if err != nil {
log.Println("error while getting result from future")
continue
}
log.Println(result)
}
}
func timeConsumingTask() string {
time.Sleep(2 * time.Second)
return"success"
}
- 在執行過程中取消工作池
package main
import (
"context"
"github.com/abhi16180/asyncgo"
"github.com/abhi16180/asyncgo/commons"
"log"
"time"
)
func main() {
executor := asyncgo.NewExecutor()
workerPool := executor.NewFixedWorkerPool(context.Background(), &commons.Options{
WorkerCount: 100,
BufferSize: 100,
})
futures := []*asyncgo.Future{}
for i := 0; i < 1000; i++ {
future, err := workerPool.Submit(timeConsumingTask)
if err != nil {
log.Println("error while submitting task to worker pool")
continue
}
futures = append(futures, future)
}
// 在任務執行過程中終止工作池
workerPool.Terminate()
}
func timeConsumingTask() string {
time.Sleep(2 * time.Second)
return"success"
}
- 有關更多用例和複雜示例,請查看示例 [2] 部分
參考資料
[1]
Asyncgo:https://github.com/abhi16180/asyncgo
[2]
示例:https://github.com/abhi16180/asyncgo/tree/main/examples
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Cm4xD2TGVNopVQQvIKZBCw