基於事件溯源的任務編排

思考

既然不能像業務一樣通過傳統的事務模型進行業務完整性保障,那我們何不換一種思路呢?於是基於穩定性的思考,筆者將設計思路轉換成提高系統的容錯能力,並儘可能的減小爆炸半徑,同時儘可能的提升系統的可擴展性,保障高可用。

checkpoint

checkpoint 通常用於保存某些記錄的位置信息用於方便系統故障後快速恢復,在 flink 中也利用了 checkpoint 機制來實現 exactly once 語義,其會按照配置週期性的計算狀態生成檢查點快照,然後將 checkpoint 持久化存儲下來,這樣後續如果崩潰則就可以通過 checkpoint 來進行恢復

barrier

checkpoint 只作用於 flink 內部,那如果要實現從 source 到 sink 整個鏈路的 exactly once,則就會涉及到多個組件同時做 checkpoint 的同步, 這時候就要讓多個組件的 checkpoint 達到一致性, 爲了實現這個功能 flink 裏面引入了 Barrier 用於切分數據流;就類似編程語言中的內存屏障,通過 Barrier 讓多個組件同時進行對於 checkpoint 的持久化。每個 Barrier 都會攜帶一個 checkpoint ID,這樣整個數據流的多個組件就會同時進行同一個 checkpoint 的持久化了

checkpointCoordinator

有了 Barrier 機制之後則就需要一個觸發和管理組件,利用 barrier 和 checkpoit 讓 source、process、sink 三者同時進行 checkpoint 保存,在 flink 中就引入 checkpointCoordinator 來協調多個組件, 有了這三個核心的概念,就可以讓在 flink 中的多個分佈式組件中實現 checkpoint 機制了

兩階段提交

前面的設計都是位於 flink 內部,但是在數據處理中 source、sink 組件則通常是第三方平臺,這個時候如果還要保障 exactly once 則除了冪等性就需要用到我們這裏說的兩階段提交了;要實現兩階段提交,則就需要對應的平臺提供事務機制,在 preCommit 階段做數據的消費和寫入,同時在 commit 階段實現事務的提交,由於事務未提交則對應的平臺讀取不到對應的數據,只有最終都提交成功後,纔可以讀取到寫入的數據

總結

通過上面的我們瞭解瞭如何基於利用兩階段提交、checkpoint、barrier 結合事務機制實現分佈式環境中的 exactly once 實現機制,後續在數據處理的場景中,我們就可以利用這套機制結合實際業務場景進行落地了

在下一節我們將開始介紹分佈式任務編排中的另外一種實現機制,用於實現分佈式系統的容錯解決上述場景中遇到的問題

基於 event sourcing 的分佈式任務編排

事件溯源

事件溯源保證應用狀態的所有改變都保存在事件流中. 這樣我們不僅能查詢這些事件, 我們也可以通過這個事件的日誌來重新構建以前的狀態, 以些爲基礎實現自動改變狀態來應對追溯過的變化. 其核心關鍵點:事件、順序、持久化,通過對持久化存儲中的事件按照順序進行回放,我們就可以得到當前的狀態,同理在任務編排的場景下,也可以借鑑類似的思想。

任務編排容錯

任務編排的核心是通過編排對應的任務序列實現某個業務功能,在分佈式環境中,通常會涉及到 workflow 任務的編排、task 任務分配、運行時數據的存儲等。在大多數的任務編排框架中,關注點都是任務調度。而我們今天接下來要介紹的 temporal 其關鍵點則是容錯,即當對應的 workflow、task 如果執行失敗,系統該如何進行恢復。也是事件溯源利用的主要場景。

任務執行容錯語義

在前面的介紹 exactly once 場景中我們介紹過兩階段對事物機制的依賴,同理在任務編排中的狀態,我們這裏容錯機制實現的語義是 at-lease-once,即任務至少被執行一次,並儘可能保障業務不會重複被執行

溯源任務狀態

結合事件溯源介紹下 temporal 裏是如何基於事件溯源來實現容錯語義的。在 temporal 一個 workflow 的當前狀態,是由對應的 workflow 的事件 reply 來決定的,即通過回放 workflow 的所有事件來決定接下來該執行那個任務,在 temporal 裏面的任務事件數據都由 history 服務統一存儲,即事件數據的存儲都是 transaction 的,這樣就可以保障即使發生網絡分區的情況,一個任務的執行結果也會只有一份, 那當我們要恢復任務狀態的時候,就只需要通過事件回放,就知道接下來要執行那個任務,以及當前的狀態數據

不變性

前面提到通過事件序列來進行事件回放可以得到當前狀態,其實在任務編排場景中還有第二個序列 - 執行序列,即我們要執行的任務列表一定要是順序的。只有這樣才能順着正確的道路繼續恢復。

例如在 go 裏面對 slice 的 for range 遍歷是固定的,這裏包含兩部分:恢復 slice 和遍歷 slice, 即我再不同的機器上通過歷史數據我可以構建出 slice, 然後遍歷這個 slice 這兩個操作的結果都是一樣的。

但是對 map 則不一定,我們並不能保證在不同機器上恢復和遍歷這兩個操作的結果都是一樣的。所以 workflow 裏面的邏輯和狀態數據一定要是不變的

爲什麼是 temporal

除了上面提到的容錯,其實選擇 temporal 更多的是就是易於學習和理解,大家可以看下我們創建虛機的 workflow。

 1// 創建虛機工作流
 2func CreateVMWorkflow(ctx workflow.Context, clientToken string, vmRequest cloud.CreateVMRequest, vmGroup ServerGroupt) (*CreateVMWorkflowResponse, error) {
 3var (
 4        tvmTask     TVM
 5        response    CreateVMWorkflowResponse
 6        workflowCtx = workflow.WithActivityOptions(ctx, defaultTaskOptions)
 7    )
 8// 創建虛機
 9var createResponse *cloud.CreateVMResponse
10if err := workflow.ExecuteActivity(
11        workflowCtx, tvmTask.CreateVMActivity, clientToken, vmRequest).Get(workflowCtx, &createResponse); err != nil {
12return nil, err
13    }
14if !createResponse.Success() {
15return nil, errorx.StringError("create vm response error: %v", createResponse)
16    }
17// 虛機初始化流程
18var futures []workflow.ChildWorkflowFuture
19for _, host := range createResponse.Data.Instance {
20        future := workflow.ExecuteChildWorkflow(workflowCtx, WaitAndBindWorkflow, host, vmRequest.IDC, vmGroup)
21        futures = append(futures, future)
22    }
23// 等待虛機結果
24for _, future := range futures {
25var resp *AddServerLoadResponse
26if err := future.Get(workflowCtx, &resp); err != nil {
27            response.Messages = append(response.Messages, err.Error())
28continue
29        }
30if resp.Success {
31            response.Success = append(response.Success, resp.IP)
32continue
33        }
34        response.Failure = append(response.Failure, resp.IP)
35        response.Messages = append(response.Messages, resp.Message)
36    }
37return response, nil
38}
39

總結

temporal 當然也有不足的地方,例如

不過想想基於 temporal 可以快速實現一個分佈式、可擴展、高容錯、無狀態的任務編排系統,其他都是小事情哈哈。後面有時間在給大家從源碼上梳理下 temporal 的是如何實現上述功能的。包括任務分片、ringpop、信號、狀態保存等

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/DtU0m4xhwF3oBtEQlTG98w