golang 每日一庫之 goflow
goflow
是一個基於 Go 語言的高性能、可擴展、分佈式的工作流框架。它允許開發者以編程方式將分佈式工作流定義爲任務的有向無環圖(DAG),並通過多個工作節點(Worker)均勻分配負載來執行任務。
核心特性
1. DAG 構建與任務編排
goflow
允許用戶以 DAG 的形式定義工作流,每個節點代表一個任務,邊表示任務之間的依賴關係。這種結構使得任務的執行順序清晰,便於管理複雜的工作流程。
2. 分佈式執行與負載均衡
任務可以分佈在多個 Worker 上執行,goflow
通過均勻分配負載的方式,確保各個 Worker 的工作量平衡,從而提高整體執行效率。
3. 可擴展的服務架構
goflow
提供了 FlowService
結構體,允許用戶配置服務的端口、Redis 地址、OpenTrace 地址、Worker 併發數等參數,便於根據實際需求進行擴展和調整。
4. 內置監控與可觀測性
框架支持集成 OpenTracing,提供對工作流執行過程的監控和追蹤功能,幫助開發者及時發現和解決問題。
5. Redis 支持
goflow
使用 Redis 作爲後端存儲,管理工作流的狀態和任務隊列,確保任務的可靠執行和狀態的持久化。
安裝入門
安裝
go mod init myflow
go get github.com/s8sg/goflow@master
編寫第一個工作流
創建一個名爲 flow.go
的文件,並添加以下代碼:
package main
import (
"fmt"
goflow "github.com/s8sg/goflow/v1"
flow "github.com/s8sg/goflow/flow/v1"
)
// 定義任務函數
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
return []byte(fmt.Sprintf("你說了:\"%s\"", string(data))), nil
}
// 定義工作流
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("test", doSomething)
return nil
}
func main() {
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "localhost:6379",
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
EnableMonitoring: true,
}
fs.Register("myflow", DefineWorkflow)
fs.Start()
}
上述代碼中,我們定義了一個名爲 doSomething
的任務函數,並在工作流中添加了一個名爲 test
的節點來執行該任務。然後,我們配置並啓動了 FlowService
,監聽 8080 端口,連接本地的 Redis 和 OpenTrace 服務,設置 Worker 的併發數爲 5,並啓用了監控功能。
高級功能
1. 條件執行與分支控制
goflow
支持在工作流中添加條件判斷和分支控制,允許根據不同的條件路徑執行不同的任務序列,增強了工作流的靈活性。
2. 循環與子流程
框架支持在工作流中定義循環結構和子流程,便於處理重複性任務和模塊化的流程設計。
3. 狀態管理與持久化
通過 Redis 的支持,goflow
能夠持久化工作流的狀態,確保在系統重啓或故障恢復後,能夠繼續執行未完成的任務。
示例
goflow
提供了多個示例,展示了不同的工作流模式:
-
single
:單個節點的簡單工作流。
-
serial
:串行執行的多個節點。
-
parallel
:並行執行的多個節點。
-
condition
:包含條件分支的工作流。
1. 並行執行示例(parallel)
該示例展示瞭如何並行執行多個任務節點。在工作流中,多個節點可以同時處理相同的輸入數據,從而提高處理效率。
要運行該示例:
-
構建項目:
make
-
啓動依賴服務:
docker-compose down && docker-compose up -d
-
運行示例:
./examples
-
使用
curl
執行工作流:curl -d hello localhost:8080/flow/parallel
此命令將向 parallel
工作流發送數據,觸發並行執行的任務節點。
2. 條件分支示例(condition)
該示例展示瞭如何根據條件動態選擇執行路徑。工作流中的某個節點根據輸入數據的特定條件,決定後續執行哪個分支。
例如,根據面部識別的結果,決定是執行 face-match
任務還是 create-user
任務。
要運行該示例,請按照與 parallel
示例相同的步驟,將 parallel
替換爲 condition
。
高級功能示例
1. 條件分支(Conditional Branching)
goflow
提供了 ConditionalBranch
方法,用於根據條件動態選擇執行路徑。例如:
branches := dag.ConditionalBranch("handle-response", []string{"pass", "fail"}, func(response []byte) []string {
if string(response) == "pass" {
return []string{"pass"}
}
return []string{"fail"}
})
branches["pass"].Node("process-pass", processPass)
branches["fail"].Node("process-fail", processFail)
在上述代碼中,根據 response
的內容,工作流將選擇執行 process-pass
或 process-fail
節點。
2. 子工作流(SubDAG)
goflow
支持將一個完整的 DAG 嵌套爲另一個工作流的子節點,便於模塊化和複用。例如:
func SubWorkflow() *flow.Dag {
dag := flow.NewDag()
dag.Node("step1", step1)
dag.Node("step2", step2)
dag.Edge("step1", "step2")
return dag
}
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
dag := f.Dag()
dag.Node("start", start)
dag.SubDag("subflow", SubWorkflow())
dag.Node("end", end)
dag.Edge("start", "subflow")
dag.Edge("subflow", "end")
return nil
}
在上述代碼中,SubWorkflow
定義了一個子工作流,包含兩個步驟 step1
和 step2
。主工作流中,subflow
節點嵌套了該子工作流,實現了模塊化設計。
🌰就舉到這裏。
最後
- 官方 GitHub 倉庫:https://github.com/s8sg/goflow
goflow
是一個功能強大且靈活的工作流框架,適用於需要處理複雜任務編排和分佈式執行的場景。
作者的 readme 和 examples 寫的很詳細,如果對這個庫感興趣,建議通讀 readme 和 examples。
標題:golang 每日一庫之 goflow
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/05/29/1748481669444.html
聯繫:scotttu@163.com
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/oIynWUzM_XJqhAbTRtjb-Q