伴魚實時計算平臺 Palink 的設計與實現
在伴魚發展早期,出現了一系列實時性相關的需求,比如算法工程師期望可以拿到用戶的實時特徵數據做實時推薦,產品經理希望數據方可以提供實時指標看板做實時運營分析。這個階段中臺數據開發工程師主要是基於「Spark」實時計算引擎開發作業來滿足業務方提出的需求。然而這類作業並沒有統一的平臺進行管理,任務的開發形式、提交方式、可用性保障等也完全因人而異。
伴隨着業務的加速發展,越來越多的實時場景湧現出來,對實時作業的開發效率和質量保障提出了更高的要求。爲此,我們從去年開始着手打造伴魚公司級的實時計算平臺,平臺代號「Palink」,由「Palfish」 + 「Flink」組合而來。之所以選擇「Flink」作爲平臺唯一的實時計算引擎,是因爲近些年來其在實時領域的優秀表現和主導地位,同時活躍的社區氛圍也提供了非常多不錯的實踐經驗可供借鑑。目前「Palink」項目已經落地並投入使用,很好地滿足了伴魚業務在實時場景的需求。
核心原則
通過調研阿里雲、網易等各大廠商提供的實時計算服務,我們基本確定了「Palink」的整個產品形態。同時,在系統設計過程中緊緊圍繞以下幾個核心原則:
-
極簡性:保持簡易設計,快速落地,不過度追求功能的完整性,滿足核心需求爲主。
-
高質量:保持項目質量嚴要求,核心模塊思慮周全。
-
可擴展:保持較高的可擴展性,便於後續方案的迭代升級。
系統設計
平臺整體架構
以下是平臺整體的架構示意圖:
整個平臺由四部分組成:
-
Web UI:前端操作頁面。
-
Palink(GO) 服務:實時作業管理服務,負責作業元信息及作業生命週期內全部狀態的管理,承接全部的前端流量。包括作業調度、作業提交、作業狀態同步及作業 HA 管理幾個核心模塊。
-
PalinkProxy(JAVA) 服務:SQL 化服務,Flink SQL 作業將由此模塊編譯、提交至遠端集羣。包括 SQL 語法校驗、SQL 作業調試及 SQL 作業編譯和提交幾個核心模塊。
-
Flink On Yarn:基於 Hadoop Yarn 做集羣的資源管理。
這裏之所以將後臺服務拆分成兩塊,並且分別使用 GO 和 JAVA 語言實現,原因主要有三個方面:一是伴魚擁有一套非常完善的基於 GO 語言實現的微服務基礎框架,基於它可以快速構建服務並擁有包括服務監控在內的一系列周邊配套,公司目前 95% 以上的服務是基於此服務框架構建的;二是 SQL 化模塊是基於開源項目二次開發實現的(這個在後文會做詳細介紹),而該開源項目使用的是 JAVA 語言;三是內部服務增加一次遠程調用的成本是可以接受的。這裏也體現了我們極簡性原則中對快速落地的要求。事實上,以 GO 爲核心開發語言是非常具有「Palfish」特色的,在接下來伴魚大數據系列的相關文章中也會有所體現。
接下來本文將着重介紹「Palink」幾個核心模塊的設計。
作業調度 & 執行
後端服務接收到前端創建作業的請求後,將生成一條 PalinkJob 記錄和 一條 PalinkJobCommand 記錄並持久化到 DB,PalinkJobCommand 爲作業提交執行階段抽象出的一個實體,整個作業調度過程將圍繞該實體的狀態變更向前推進。其結構如下:
type PalinkJobCommand struct {
ID uint64 `json:"id"`
PalinkJobID uint64 `json:"palink_job_id"`
CommandParams string `json:"command_params"`
CommandState int8 `json:"command_state"`
Log string `json:"log"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
這裏並沒有直接基於 PalinkJob 實體來串聯整個調度過程,是因爲作業的狀態同步會直接作用於這個實體,如果調度過程也基於該實體,兩部分的邏輯就緊耦合了。
調度流程
下圖爲作業調度的流程圖:
palink pod 異步執行競爭分佈式鎖操作,保證同一時刻有且僅有一個實例獲取週期性監測權限,滿足條件的 Command 將直接被髮送到 Kafka 待執行隊列,同時變更其狀態,保證之後不再被調度。此外,所有的 palink pod 將充當待執行隊列消費者的角色,並歸屬於同一個消費者組,消費到消息的實例將獲取到最終的執行權。
執行流程
作業的執行實則是作業提交的過程,根據作業類型的不同提交工作流有所區別,可細分爲三類:
-
Flink JAR 作業:我們摒棄了用戶直接上傳 JAR 文件的交互方式。用戶只需提供作業 gitlab 倉庫地址即可,打包構建全流程平臺直接完成。由於每一個服務實例都內嵌 Flink 客戶端,任務是直接通過 flink run 方式提交的。
-
PyFlink 作業:與 Flink JAR 方式類似,少了編譯的過程,提交命令也有所不同。
-
Flink SQL 作業:與上兩種方式區別較大。對於 Flink SQL 作業而言,用戶只需提交相對簡單的 SQL 文本信息,這個內容我們是直接維護在平臺的元信息中,故沒有和 gitlab 倉庫交互的地方。SQL 文本將進一步提交給 PalinkProxy 服務進行後續的編譯,然後使用 Yarn Client 方式提交。
Command 狀態機
PalinkJobCommand 的狀態流轉如下圖所示:
-
UNDO:初始狀態,將被調度實例監測。
-
DOING:執行中狀態,同樣會調度實例監測,防止長期處於進行中的髒狀態產生。
-
SUCCESSED:執行成功狀態。隨着用戶的後續行爲,如重新提交、重新啓動操作,狀態會再次回到 UNDO 態。
-
FAILED:執行失敗狀態。同上,狀態可能會再次回到 UNDO 態。
作業狀態同步
作業成功提交至集羣后,由於集羣狀態的不確定性或者其他的一些因素最終導致任務異常終止了,平臺該如何及時感知到?這就涉及到我們即將要闡述的另一個話題「狀態同步」。
狀態同步流程
這裏首先要回答的一個問題是同步誰的狀態?有過離線或者 flink on yarn 開發經驗的同學一定知道,作業在部署到 yarn 上之後會有一個 application 與之對應,每一個 application 都有其對應的狀態和操作動作,比如我們可以執行 Yarn UI 上 Kill Application 操作來殺掉整個任務。同樣的,當我們翻閱 Flink 官方文檔或者進入 Flink UI 頁面也都可以看到每一個任務都有其對應的狀態和一系列操作行爲。最直接的想法肯定是以 flink 任務狀態爲準,畢竟這是我們最想拿到的,但仔細分析,其實二者的狀態對於平臺而言沒有太大區別,只是狀態的粒度有所不同而已,yarn application 的狀態已經是對 flink 狀態做了一次 state mapping。可是考慮到,Flink 在 HA 的時候,作業對外暴露的 URL 會發生變更,這種情況下只能通過獲取作業對應的 application 信息才能拿到最新的地址。與此同時,一次狀態同步的過程不僅僅只是希望拿到最新的狀態,對於任務的「checkpoint」等相關信息同樣是有同步的訴求。看來二者的信息在一次同步的過程中都需要獲取,最終的狀態同步設計如下:
前置流程和作業調度流程類似,有且僅有一個實例負責週期性監測工作,符合條件的 Job ID (注,並非所有的作業都用同步的必要,比如一些處於終態的作業)將發送到內部延遲隊列。之所以採用延遲隊列而非 Kafka 隊列,主要是爲了將同一時間點批量同步的需求在一定時間間隔內隨機打散,降低同步的壓力。最後,在獲取到作業的完整信息後,再做一次 state mapping 將狀態映射爲平臺抽象的狀態類型。
由於狀態同步是週期性進行的,存在一定的延遲。因此在平臺獲取作業詳情時,也會同步觸發一次狀態同步,保證獲取最新數據。
Job 狀態機
PalinkJob 的狀態流轉如下圖所示:
-
DEPLOYING:作業初始狀態,將隨着 PalinkJobCommand 的狀態驅動向 DEPLOY_SUCCESSED 和 DEPLOY_FAILED 流轉。
-
DEPLOY_SUCCESSED:部署成功狀態,依賴作業「狀態同步」驅動向 RUNNING 狀態或者其他終態流轉。
-
DEPLOY_FAILED:部署失敗狀態,依賴用戶重新提交向 DEPLOYING 狀態流轉。
-
RUNNING:運行中狀態。可通過用戶執行暫停操作向 FINISHED 狀態流轉,或執行終止操作向 KILLED 狀態流轉,或因爲內部異常向 FAILED 狀態流轉。
-
FINISHED:完成狀態,作業終態之一。通過用戶執行暫停操作,作業將回到此狀態。
-
KILLED:終止狀態,作業終態之一。通過用戶執行終止操作,作業將回到此狀態。
-
FAILED:失敗狀態,作業終態之一。作業異常會轉爲此狀態。
作業 HA 管理
解決了上述問題之後,另一個待討論的話題便是「作業 HA 管理」。我們需要回答用戶以下的兩個問題:
-
作業是有狀態的,但是作業需要代碼升級,如何處理?
-
作業異常失敗了,怎麼做到從失敗的時間點恢復?
Flink 提供了兩種機制用於恢復作業:「Checkpoint」和「Savepoint」,本文統稱爲保存點。「Savepoint」可以看作是一種特殊的「Checkpoint」,只不過不像「Checkpoint」定期的從系統中生成,它是用戶通過命令觸發的,用戶可以控制保存點產生的時間點。任務啓動時,通過指定「Checkpoint」或「Savepoint」外部路徑,就可以達到從保存點恢復的效果。我們對於平臺作業 HA 的管理也是基於這兩者展開的。下圖爲管理的流程圖:
用戶有兩種方式來手動停止一個作業:暫停和終止。暫停操作通過調用 flink cancel api 實現,將觸發作業生成「Savepoint」。終止操作則是通過調用 yarn kill application api 實現,用於快速結束一個任務。被暫停的作業重啓時,系統將比較「Savepoint」和「Checkpoint」的生成時間點,按照最近的一個保存點啓動,而當作業被重新提交時,由於用戶可能變更了代碼邏輯,將直接由用戶決定是否按照保存點恢復。對於被終止的作業,無論是重啓或者是重新提交,都直接採取由用戶決定的方式,因爲終止操作本身就帶有丟棄作業狀態的色彩。
失敗狀態的作業是由於異常錯誤被迫停止的。對於這類作業,有三重保障。一是任務自身可以設置重啓策略自動恢復,外部平臺無感知。二是,對於內部重啓依舊失敗的任務在平臺側可再次設置上層重啓策略。三是,手動重啓或重新提交。僅在重新提交時,由用戶決定按照那種方式啓動,其餘場景皆按照最近的保存點啓動。
任務 SQL 化
Flink JAR 和 PyFlink 都是採用 Flink API 的形式開發作業,這樣的形式必然極大地增加用戶的學習成本,影響開發的效率。需要不斷輸入和培養具有該領域開發技能的工程師,才能滿足源源不斷的業務需求。而產品定位不僅僅是面向數據中臺的開發工程師們,我們期望可以和離線目標用戶保持一致,將目標羣體滲透至分析人員乃至業務研發和部分的產品經理,簡單的需求完全可以自己動手實現。要達到這個目的,必然開發的形式也要向離線看齊,作業 SQL 化是勢在必行的。
我們期望 Flink 可以提供一種類似於 Hive Cli 或者 Hive JDBC 的作業提交方式,用戶無需寫一行 Java 或 Scala 代碼。查閱官方文檔,Flink 確實提供了一個 SQL 客戶端以支持以一種簡單的方式來編寫、調試和提交表程序到 Flink 集羣,不過截止到目前最新的 release 1.13 版本,SQL 客戶端僅支持嵌入式模式,相關的功能還不夠健全,另外對於 connector 支持也是有限的。因此,需要尋求一種更穩定、更高可擴展性的實現方案。
經過一番調研後,我們發現袋鼠雲開源的「flinkStreamSQL」基本可以滿足我們目前的要求。此項目是基於開源的 Flink 打造的,並對其實時 SQL 進行了擴展,支持原生 Flink SQL 所有的語法。
實現機制
下圖爲 Flink 官方提供的作業角色流程圖,由圖可知,用戶提交的代碼將在 Client 端進行加工、轉換(最終生成 Jobgraph )然後提交至遠程集羣。
那麼要實現用戶層面的作業 SQL 化,底層的實現同樣是繞不開這個流程。實際上「flinkStreamSQL」項目就是通過定製化的手段實現了 Client 端的邏輯,可以將整個過程簡要地描述爲:
構建 PackagedProgram
利用 PackagedProgramUtils 生成 JobGraph。
通過 YarnClusterDescriptor 提交作業。
其中,第一步是最關鍵的,PackagedProgram 的構造方法如下:
PackagedProgram.newBuilder()
.setJarFile(coreJarFile)
.setArguments(execArgs)
.setSavepointRestoreSettings(savepointRestoreSettings)
.build();
execArgs 爲外部輸入參數,這裏就包含了用戶提交的 SQL。而 coreJarFile 對應的就是 API 開發方式時用戶提交的 JAR 文件,只不過這裏系統幫我們實現了。coreJarFile 的代碼對應項目中的 core module,該 module 本質上就是 API 開發方式的一個 template 模板。module 內實現了自定義 SQL 解析以及各類 connector plugin 注入。更多細節可通過開源項目進一步瞭解。
定製開發
我們基於「flinkStreamSQL」進行了二次開發,以滿足內部更多樣化的需求。主要分爲以下幾點:
-
服務化:整個 SQL 化模塊作爲 proxy 獨立部署和管理,以 HTTP 形式暴露服務。
-
支持語法校驗特性。
-
支持調試特性:通過解析 SQL 結構可直接獲取到 source 表和 sink 表的結構信息。平臺可通過人工構造或線上抓取源表數據的方式得到測試數據集,sink 算子被 localTest connector 算子直接替換,以截取結果數據輸出。
-
支持更多的 connector plugin,如 pulsar connector。
-
其他特性
除了上文提到的一些功能特性,平臺還支持了:
-
DDL 語句注入
-
UDF 管理
-
租戶管理
-
版本管理
-
作業監控
-
日誌收集
這些點就不在本文詳細闡述,但作爲一個實時計算平臺這些點又是必不可少的。
線上效果
作業總覽
作業詳情
作業監控
未來工作
隨着業務的繼續推進,平臺將在以下幾方面繼續迭代優化:
-
穩定性建設:實時任務的穩定性建設必然是未來工作中的首要事項。作業參數如何設置,作業如何自動調優,作業在流量高峯如何保持穩定的性能,這些問題需要不斷探索並沉澱更多的最佳實踐。
-
提升開發效率:SQL 化建設。儘管 SQL 化已初具雛形,但開發起來依舊具備一定的學習成本,其中最明顯的就是 DDL 的構建,用戶對於 source、sink 的 schema 並不清楚,最好的方式是平臺可以和我們的元數據中心打通將構建 DDL 的過程自動化,這一點也是我們目前正在做的。
-
優化使用體驗:體驗上的問題在一定程度上也直接影響到了開發的效率。通過不斷收集用戶反饋,持續改進。
-
探索更多業務場景:目前伴魚內部已開始基於 Flink 開展 AI 、實時數倉等場景的建設。未來我們將繼續推進 Flink 在更多場景上的實踐。
參考
本文轉自:https://tech.ipalfish.com/blog/2021/06/01/palink
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/saznc-v8myqWnubCu6aQzw