分佈式計算框架狀態與容錯的設計
對於一個分佈式計算引擎(尤其是 7*24 小時不斷運行的流處理系統)來說,由於機器故障、數據異常等原因導致作業失敗的情況是時常發生的,因此一般的分佈式計算引擎如 Hadoop、Spark 都會設計狀態容錯機制確保作業失敗後能夠恢復起來繼續運行,而新一代的流處理系統 Flink 在這一點上更有着優秀而簡約的設計。
每個框架都有與之相關的諸多概念,常常令開發者感到困惑。本文會盡量避免從官方文檔的角度進行論述,而是嘗試先跳出具體的框架,從原理上分析分佈式計算引擎狀態容錯機制的設計思想。通過對比 Hadoop、Spark、Flink 關於這一點的不同思考,更能瞭解到批處理系統和流處理系統如何看待狀態與容錯這件事。
何謂狀態?
並不是分佈式計算引擎纔有狀態的概念。從廣義上來講,任何一個程序,在運行時的某一時刻其進程中各個字段、變量在內存中的值,都是狀態。
例如,一個程序從一個文件中讀取數據,程序在內存中記錄下來文件讀取到了什麼位置,將其保存在某個對象的 offset 字段中,以便接下來從該位置繼續讀取。這個 offset 字段的值其實就是一個有業務含義的 “狀態” 值。
既然任何程序都有狀態,那麼對於任何一個分佈式計算框架來說,無需任何特殊的設計,“狀態” 便天然地存在於其運行時的內存中。框架可以對這些狀態進行維護(例如將其持久化),實現任何框架想要實現的目的(例如將其用於接下來將要講到的容錯機制中)。
那麼這裏的問題則是,各個框架是選取哪些字段、變量的值進行管理的呢?這便是理解各個框架狀態與容錯機制的關鍵。
何謂容錯?
顯然,並不是任何程序、框架都必須實現容錯機制。在大數據計算領域常常把一個作業分類成流計算或批計算。對於批計算而言,容錯並不是一個必不可少的機制,因爲大部分批處理任務在時間和計算資源上來說都是可控的。如果作業在中途異常停止,大不了可以重新再運行一次。
然而,對於流處理作業並不是這樣。因爲從業務上來說,流處理作業會 7*24 地不間斷運行。設想如果一個流處理作業運行了一年,突然因爲一些異常原因掛掉,或者因爲發現了髒數據或邏輯問題而手動停止,如果這時沒有容錯機制,則需要從一年前的數據開始從頭運行。這在時間和計算成本上來說都無法接受。
如果一個作業需要容錯,往往指的就是這樣一個過程:
程序在運行的過程當中,在某一時刻對其狀態進行落盤存儲。在未來的某一時刻,程序因爲某種原因停止後,可以從之前落盤的數據重啓並繼續正常穩定地運行。
用通俗的話說就是一個存檔、讀檔的過程。整個過程如下圖所示:
注意:由於這裏討論的是廣義上的容錯,因此要特意指出,之前存儲狀態的程序,與後來恢復狀態的程序,未必是同一個程序,即程序內部的邏輯是可以完全不同的,只要該程序可以讀取磁盤中的狀態即可。至於讀取以後要怎麼利用這些狀態,那是業務需要考慮的事情。如果對這一點沒有清晰的認識,就會困惑於 Flink、Spark 這些計算引擎是否可以在做了 checkpoint 後修改程序的邏輯,修改過後是否還能正常重啓。這裏可以非常負責任地講,即便有些版本的 Flink、Spark 未必支持修改後的程序從之前的檢查點恢復,只要我們理解了其內在原理,都可以自己修改源碼或通過其他手段使其做到這一點。
狀態與容錯的關係
綜上,狀態指的是某一時刻程序中各個字段、變量等在內存中的值,容錯指的是對這些狀態進行存儲落盤、讀取恢復的過程。因此,關鍵之處在於選取哪些值進行存儲和恢復,以保證這樣的存儲和恢復具有業務價值。對這一點的理解與取捨,便是不同框架對狀態與容錯機制設計的出發點。
本節脫離具體的框架試舉幾例,大家可以自行對號入座,看這樣的設計思路接近於哪個框架。
-
存儲處理數據後的結果:在計算模型中,將數據按條處理。可以在處理數據的算子中定義一個字段,每處理一條數據,就按照業務邏輯對該字段進行更新。在進行狀態存儲時,僅存儲該字段的值。在作業重啓時,只需恢復該字段的值。
-
存儲數據本身:在計算模型中,以數據集的方式處理數據。數據集會被多個算子處理,因此可以在它被某個算子處理完後將這個中間結果保存下來。這樣在恢復時,就可以從這個完整的中間結果開始繼續運行。
-
存儲數據位置:由於計算引擎的數據一定有一個數據源,而某些數據源會爲每條數據記錄它在數據源中的位置。計算引擎可以將讀取到的最新一條數據在數據源的位置記錄下來,將其作爲狀態保存和恢復。
在不同的業務和技術場景下,狀態與容錯的解決方案理論上有無窮多,與每個計算框架的計算模型緊密相關。此外,一個框架的狀態與容錯機制能達到什麼樣的效果,還跟與其對接的組件有關(端到端的數據一致性問題)。比如上述第三例,倘若數據源並沒有記錄數據的位置信息,那麼該容錯機制也無法有效運行。
Hadoop 與 Spark 如何設計容錯
一般來說,最樸素的想法就是通過下面的步驟實現狀態與容錯:
-
暫停所有數據的接收。
-
每個任務處理當前已經接收的數據。
-
將此時所有任務的狀態進行持久化。
-
恢復數據的接收和處理。
當作業出現異常時,則可以從之前持久化的地方恢復。Hadoop 與 Spark 的容錯機制就是該思想的實現。
Hadoop 的任務可以分爲 Map 任務和 Reduce 任務。這是兩類分批次執行的任務,後者的輸入依賴前者的輸出。Hadoop 的設計思想十分簡單——當任務出現異常時,重新跑該任務即可。其實,跑成功的任務的輸出,就相當於整個作業的中間結果得到了持久化。比如 Reduce 任務異常重跑時,就不必重跑它依賴的 Map 任務。
Spark 的實現也是這一想法的延續。雖然 Spark 不是 Hadoop 那樣的批處理,但是它仍然把一個 “微批(micro batch)” 當作數據處理的最小單元,整個框架實際上延續了不少批處理的思想。Spark 的容錯機制相當經典,用到了其 RDD 的血統關係(lineage)。熟悉 Spark 的讀者應該瞭解 “寬依賴”、“窄依賴” 等概念。當 RDD 中的某個分區出現故障,那麼只需要按照這種依賴關係重新計算即可。以複雜一些的寬依賴爲例,Spark 會找到其父分區,經過計算重新獲取結果。
如上圖所示,如果 P10 發生故障,則 P00 與 P01 都會重新計算,而計算 P00 和 P01 又會繼續找其父分區重新計算。按照這個血緣關係來看,一直向上追溯會付出極大的代價。因此 Spark 提供了將分區計算結果持久化的方法。如果 P00 與 P0_1 的數據進行了持久化,那麼就可以利用該結果直接恢復狀態。
從以上設計可以感受到,這種實現更適合於批計算的框架中。它相當於將前一個階段的計算結果 “存檔” 下來,然後在任意時間後將該結果作爲輸入,運行下一個階段的任務。這種實現的狀態存儲過程顯然過於繁重,並不太適用於對 “低延時” 要求極高的流處理引擎。因此,Flink 設計了一套完全不同的分佈式輕量級實現方式,並精巧地實現了各種一致性語義。
Flink 的容錯機制——通過 Barrier 實現一致性語義
官方文檔是這樣描述 Flink 的:
Stateful Computations over Data Streams
即,在數據流上的狀態計算。可以說,狀態計算(包括狀態管理、檢查點機制等)是它最大的特點之一。
下面介紹 Flink 狀態容錯機制的設計原理。
從單機程序開始
現在跳出 Flink 框架,設想一個運行在單個節點的進程,該如何設計容錯機制。
比較容易想到的一個思路是,在主線程外另開啓一個線程執行定時任務,定期地將狀態數據刷寫到磁盤。當作業停止後重啓,則可以直接從之前刷寫到磁盤的數據恢復。如下圖所示:
分佈式容錯
延續這個思路,是否可以設計一個分佈式的容錯機制呢?下圖是一個多節點 的分佈式任務,數據流從左至右。
如果給這些 Task 分別開啓一個線程運行定時任務,這些分佈在不同物理機上的任務的確也可以做到狀態的存儲和恢復。然而,這種粗暴的處理方式極容易發生業務上的異常。比如,當最左邊的 Task 處理完了 a、b、c 這三條數據後,將數據發送至網絡,在這三條數據還未到達中間的 Task 時,三個線程同時(假設時間同步非常理想)觸發了狀態存儲的動作。這時左邊的 Task 保存的狀態是處理完 a、b、c 後的狀態,而後兩個 Task 保存的是未處理這三條數據時的狀態。此時整個集羣宕機,三個 Task 恢復後,左邊的 Task 將從 a、b、c 這三條數據後的數據開始讀取和處理,而後面的 Task 將永遠無法接收到這三條數據。這就造成了數據的丟失。如果三個機器線程的觸發時間不同步,也可能會造成數據重複處理。
這個問題在流處理中被稱爲 “一致性語義” 問題。當一條數據在計算引擎中被處理 “至少一次”、“恰好一次”、“最多一次” 時,一致性語義分別是“at least once”、“exactly once”、“at most once”。
不同的業務場景對於一致性語義有着不同的要求。舉例來說,一個廣告投放平臺按照用戶對廣告的點擊量進行收費,如果點擊量被少算,則對平臺方不利,如果點擊量被多算,則對廣告商不利,無論哪種情況都不利於長期合作。在這種情況下,“exactly once” 語義就顯得尤爲重要。
基於 Flink 的計算模型與數據傳輸方式的設計,容錯機制由 Barrier 來實現。Barrier 可以理解爲一條數據,被週期性地插入到數據流當中,跟隨數據一起被傳輸到下游。
此時,每個任務將不再需要另啓一個線程完成定時任務,只需要在接收到 Barrier 時觸發存儲狀態的動作即可。由於數據傳輸的有序性,這樣的機制可以保證 “exactly once” 語義。
爲什麼這裏說 “可以” 保證 “exactly once” 語義,而沒有說 “必然” 保證該語義呢?這是因爲作業的拓撲圖可能更加複雜,如下圖所示:
如果一個進程的上游有多條數據流,那麼它應該在接受到哪個 Barrier 時觸發狀態存儲操作呢?
以上圖爲例,當最右邊的進程接收到下面的數據流傳來的 Barrier 時,它可以先不觸發任何操作,該數據流後面的數據也暫時不做處理,而是將這些數據接收到緩存中。上面的數據流照常處理。當接收到了上面的數據流傳來的 Barrier 時,再觸發狀態存儲操作。這樣仍可以保證 “exactly once” 語義。
很顯然,在瞭解了這個原理後,就可以在這個過程中可以添加任何自己業務需要的策略。如可以不讓 Barrier 對齊就觸發操作,或是每條 Barrier 都觸發一次操作,甚至可以將部分數據丟棄,等待最後一個 Barrier 到來時觸發操作…… 這些不同的策略對應了不同的一致性語義。Flink 實現了 “exactly once” 語義和 “at least once” 語義。
Flink 中的狀態存儲與恢復
最後,從整體流程上來理解 Flink 的狀態存儲與恢復。
狀態的存儲流程大致可以拆分爲以下幾個部分進行理解:Checkpoint 的觸發、Barrier 的傳輸、狀態的更新、狀態數據及其元信息的存儲。從系統架構上來看,整個流程如下圖所示:
在 JobManager 端有一個組件叫做 CheckpointCoordinator,它是協調整個 Checkpoint 機制的管理器。從上圖可以觀察到,它會觸發 Checkpoint 的流程,並且會發送 Barrier 到 source task。隨後,Barrier 在任務間流轉,觸發每個任務的快照操作。分佈式框架中,每個任務獨立地完成狀態的存儲,在這裏可以簡單理解爲生成數據文件。每個 Task 實例將文件信息(如文件位置等信息)傳回到 JobManager 端,通知 CheckpointCoordinator 它完成了本次的狀態存儲。
與 JobManager 端的其他組件有着同樣的設計思路,CheckpointCoordinator 知道整個執行圖中的所有任務。這樣,當每個 Task 各自完成狀態存儲後通知 JobManager 端,CheckpointCoordinator 就可以知道本次 Checkpoint 是否所有 Task 都完成了狀態存儲。如果全部完成,則將所有回傳的信息彙總成一個元數據文件。
恢復的過程正是存儲的逆過程。JobManager 端讀取元數據文件,將這些信息封裝到執行圖各個節點中,部署到 TaskManager 端執行。這樣每個 Task 在初始化階段就知道去哪個文件讀取狀態數據,進而對其進行恢復。
總結
本文從通用視角介紹了狀態與容錯的基本概念,以 Hadoop、Spark、Flink 爲例分析了具體框架的實現原理。
通過對比可以瞭解到批處理系統與流處理系統對該機制有着不同的思考。批處理系統的基本思路是,當作業出現失敗時,把失敗的部分重啓即可,甚至可以把整個作業重新運行一遍;流處理系統則需要考慮數據的一致性問題,將其融入到整個狀態容錯機制當中。
由於框架本身定位的不同,這些狀態容錯機制並沒有明顯的優劣之分,但它們在各自的領域幾乎都是最優秀的實現,其設計思路都值得學習和反思。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/SpXg1eTUNLY1w3IzIJ8GSg