golang 每日一庫之工作流引擎 Temporal
Temporal 是一個開源的分佈式工作流編排系統,旨在簡化構建和運行可靠、可擴展的長時間運行的後端應用程序。
它最初是由 Uber 的 Cadence 系統演變而來,現在由 Temporal Technologies 公司主導開發。
目前已被 Coinbase、Netflix、Box、Snap 等大規模應用。
核心功能
1. 分佈式工作流編排
Temporal 支持在多臺機器上協調執行復雜的業務邏輯。工作流可以在失敗、重啓甚至升級後繼續執行,不需要人工干預。
2. 持久性與容錯
所有工作流狀態會被持久化(通常使用 Cassandra、MySQL、PostgreSQL 等後端數據庫)。這使得工作流可以在服務崩潰或網絡中斷時恢復執行。
3. 異步任務和重試機制
Temporal 提供強大的異步任務支持,以及自動的、可配置的重試邏輯。即使下游服務暫時不可用,也能自動重試直到成功或達到重試上限。
4. 冪等與去重
通過事件驅動和有序日誌處理機制,Temporal 可以保證工作流執行的冪等性,防止重複執行任務。
5. 編程語言支持
支持多種語言 SDK:
-
Go (
temporalio/sdk-go
) -
Java (
temporalio/sdk-java
) -
TypeScript/Node.js (
temporalio/sdk-typescript
) -
Python(目前處於 beta 階段)
組件
Temporal 系統分爲以下關鍵組件:
1. Frontend Service
處理所有來自 SDK 的請求,是系統的入口點。
2. History Service
負責管理工作流執行的歷史記錄和狀態。
3. Matching Service
處理任務隊列,將任務分配給 worker。
4. Worker 服務
Worker 是由開發者實現的服務,用來實際執行工作流和活動(Activities)。Worker 通過 SDK 與 Temporal Server 通信。
5. Persistence Layer
數據持久化層,支持 Cassandra、MySQL、PostgreSQL,負責存儲執行歷史、任務隊列、工作流狀態等。
原理
-
開發者定義工作流和活動(Activity)函數
func MyWorkflow(ctx workflow.Context, input string) error { err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, nil) return err } func MyActivity(ctx context.Context, input string) error { // 實際的業務邏輯 return nil }
-
Worker 註冊工作流和活動
worker.RegisterWorkflow(MyWorkflow) worker.RegisterActivity(MyActivity)
-
Client 啓動工作流
workflowOptions := client.StartWorkflowOptions{ TaskQueue: "my-task-queue", } we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, MyWorkflow, "hello world")
-
Temporal 保證工作流一致性與容錯
-
每一個狀態變化都被寫入數據庫
-
崩潰後可以繼續執行
-
支持定時器、信號、查詢等複雜邏輯
架構組成
Temporal 是服務端 - 客戶端架構,核心組件包括:
- Temporal Server(Go 實現)
-
Frontend Service:處理 SDK 請求
-
History Service:存儲執行狀態、歷史記錄
-
Matching Service:分配任務給 Worker
-
Worker Service:處理後臺任務
- Client SDK
- 提供語言級 API 用於定義工作流和活動
- Worker
- 用戶實現的服務端代碼,用於運行工作流 / 活動
- 持久化層
- 支持 MySQL、PostgreSQL、Cassandra、Elasticsearch(可選用於可觀察性)
示例
示例 1:電商訂單處理
下單 → 扣款 → 庫存檢查 → 發貨 → 郵件通知
func OrderWorkflow(ctx workflow.Context, order Order) error {
err := workflow.ExecuteActivity(ctx, DeductInventory, order.ItemID).Get(ctx, nil)
if err != nil {
return err
}
err = workflow.ExecuteActivity(ctx, ChargeCustomer, order.UserID, order.Amount).Get(ctx, nil)
if err != nil {
return err
}
err = workflow.ExecuteActivity(ctx, ShipOrder, order).Get(ctx, nil)
return err
}
示例 2:數據處理流水線(ETL)
抓取數據 → 清洗 → 轉換 → 存儲
Temporal 可用於將這些階段連接成一個可靠的工作流,自動處理失敗和重試。
示例 3:視頻轉碼處理
上傳 → 分辨率轉換 → 水印添加 → CDN 分發
適合需要 GPU 資源、運行時間長但失敗成本高的流程。
示例 4:用戶註冊流程(Saga 模式)
創建賬戶 → 發放優惠券 → 推送歡迎郵件
若失敗:
撤銷賬戶 → 撤銷優惠券
Temporal 非常適合處理分佈式事務補償邏輯。
示例 5:金融服務自動化(貸款審批)
收集用戶信息 → 第三方信用審覈 → 風控策略 → 人工審覈 → 通知審批結果
這些任務可能分散在多個服務、角色之間,Temporal 保證任務一致性。
示例 6:定時和週期性任務
每日結算 → 定期備份 → 週期性健康檢查
工作流可利用定時器 workflow.Sleep()
或 cron 式觸發機制輕鬆實現。
部署
使用官方 Docker Compose 快速運行本地 Temporal:
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker-compose up
標題:golang 每日一庫之工作流引擎 Temporal
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/06/03/1748918890556.html
聯繫:scotttu@163.com
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/YxCNLwOZveHtZOt-OIaENA