伴魚實時計算平臺 Palink 的設計與實現

在伴魚發展早期,出現了一系列實時性相關的需求,比如算法工程師期望可以拿到用戶的實時特徵數據做實時推薦,產品經理希望數據方可以提供實時指標看板做實時運營分析。這個階段中臺數據開發工程師主要是基於「Spark」實時計算引擎開發作業來滿足業務方提出的需求。然而這類作業並沒有統一的平臺進行管理,任務的開發形式、提交方式、可用性保障等也完全因人而異。

伴隨着業務的加速發展,越來越多的實時場景湧現出來,對實時作業的開發效率和質量保障提出了更高的要求。爲此,我們從去年開始着手打造伴魚公司級的實時計算平臺,平臺代號「Palink」,由「Palfish」 + 「Flink」組合而來。之所以選擇「Flink」作爲平臺唯一的實時計算引擎,是因爲近些年來其在實時領域的優秀表現和主導地位,同時活躍的社區氛圍也提供了非常多不錯的實踐經驗可供借鑑。目前「Palink」項目已經落地並投入使用,很好地滿足了伴魚業務在實時場景的需求。

核心原則

通過調研阿里雲、網易等各大廠商提供的實時計算服務,我們基本確定了「Palink」的整個產品形態。同時,在系統設計過程中緊緊圍繞以下幾個核心原則:

系統設計

平臺整體架構

以下是平臺整體的架構示意圖:

整個平臺由四部分組成:

這裏之所以將後臺服務拆分成兩塊,並且分別使用 GO 和 JAVA 語言實現,原因主要有三個方面:一是伴魚擁有一套非常完善的基於 GO 語言實現的微服務基礎框架,基於它可以快速構建服務並擁有包括服務監控在內的一系列周邊配套,公司目前 95% 以上的服務是基於此服務框架構建的;二是 SQL 化模塊是基於開源項目二次開發實現的(這個在後文會做詳細介紹),而該開源項目使用的是 JAVA 語言;三是內部服務增加一次遠程調用的成本是可以接受的。這裏也體現了我們極簡性原則中對快速落地的要求。事實上,以 GO 爲核心開發語言是非常具有「Palfish」特色的,在接下來伴魚大數據系列的相關文章中也會有所體現。

接下來本文將着重介紹「Palink」幾個核心模塊的設計。

作業調度 & 執行

後端服務接收到前端創建作業的請求後,將生成一條 PalinkJob 記錄和 一條 PalinkJobCommand 記錄並持久化到 DB,PalinkJobCommand 爲作業提交執行階段抽象出的一個實體,整個作業調度過程將圍繞該實體的狀態變更向前推進。其結構如下:

type PalinkJobCommand struct {
 ID            uint64 `json:"id"`                       
 PalinkJobID   uint64 `json:"palink_job_id"`  
 CommandParams string `json:"command_params"` 
 CommandState  int8   `json:"command_state"`  
 Log           string `json:"log"`                      
 CreatedAt     int64  `json:"created_at"`        
 UpdatedAt     int64  `json:"updated_at"`      
}

這裏並沒有直接基於 PalinkJob 實體來串聯整個調度過程,是因爲作業的狀態同步會直接作用於這個實體,如果調度過程也基於該實體,兩部分的邏輯就緊耦合了。

調度流程

下圖爲作業調度的流程圖:

palink pod 異步執行競爭分佈式鎖操作,保證同一時刻有且僅有一個實例獲取週期性監測權限,滿足條件的 Command 將直接被髮送到 Kafka 待執行隊列,同時變更其狀態,保證之後不再被調度。此外,所有的 palink pod 將充當待執行隊列消費者的角色,並歸屬於同一個消費者組,消費到消息的實例將獲取到最終的執行權。

執行流程

作業的執行實則是作業提交的過程,根據作業類型的不同提交工作流有所區別,可細分爲三類:

Command 狀態機

PalinkJobCommand 的狀態流轉如下圖所示:

作業狀態同步

作業成功提交至集羣后,由於集羣狀態的不確定性或者其他的一些因素最終導致任務異常終止了,平臺該如何及時感知到?這就涉及到我們即將要闡述的另一個話題「狀態同步」。

狀態同步流程

這裏首先要回答的一個問題是同步誰的狀態?有過離線或者 flink on yarn 開發經驗的同學一定知道,作業在部署到 yarn 上之後會有一個 application 與之對應,每一個 application 都有其對應的狀態和操作動作,比如我們可以執行 Yarn UI 上 Kill Application 操作來殺掉整個任務。同樣的,當我們翻閱 Flink 官方文檔或者進入 Flink UI 頁面也都可以看到每一個任務都有其對應的狀態和一系列操作行爲。最直接的想法肯定是以 flink 任務狀態爲準,畢竟這是我們最想拿到的,但仔細分析,其實二者的狀態對於平臺而言沒有太大區別,只是狀態的粒度有所不同而已,yarn application 的狀態已經是對 flink 狀態做了一次 state mapping。可是考慮到,Flink 在 HA 的時候,作業對外暴露的 URL 會發生變更,這種情況下只能通過獲取作業對應的 application 信息才能拿到最新的地址。與此同時,一次狀態同步的過程不僅僅只是希望拿到最新的狀態,對於任務的「checkpoint」等相關信息同樣是有同步的訴求。看來二者的信息在一次同步的過程中都需要獲取,最終的狀態同步設計如下:

前置流程和作業調度流程類似,有且僅有一個實例負責週期性監測工作,符合條件的 Job ID (注,並非所有的作業都用同步的必要,比如一些處於終態的作業)將發送到內部延遲隊列。之所以採用延遲隊列而非 Kafka 隊列,主要是爲了將同一時間點批量同步的需求在一定時間間隔內隨機打散,降低同步的壓力。最後,在獲取到作業的完整信息後,再做一次 state mapping 將狀態映射爲平臺抽象的狀態類型。

由於狀態同步是週期性進行的,存在一定的延遲。因此在平臺獲取作業詳情時,也會同步觸發一次狀態同步,保證獲取最新數據。

Job 狀態機

PalinkJob 的狀態流轉如下圖所示:

作業 HA 管理

解決了上述問題之後,另一個待討論的話題便是「作業 HA 管理」。我們需要回答用戶以下的兩個問題:

Flink 提供了兩種機制用於恢復作業:「Checkpoint」和「Savepoint」,本文統稱爲保存點。「Savepoint」可以看作是一種特殊的「Checkpoint」,只不過不像「Checkpoint」定期的從系統中生成,它是用戶通過命令觸發的,用戶可以控制保存點產生的時間點。任務啓動時,通過指定「Checkpoint」或「Savepoint」外部路徑,就可以達到從保存點恢復的效果。我們對於平臺作業 HA 的管理也是基於這兩者展開的。下圖爲管理的流程圖:

用戶有兩種方式來手動停止一個作業:暫停和終止。暫停操作通過調用 flink cancel api 實現,將觸發作業生成「Savepoint」。終止操作則是通過調用 yarn kill application api 實現,用於快速結束一個任務。被暫停的作業重啓時,系統將比較「Savepoint」和「Checkpoint」的生成時間點,按照最近的一個保存點啓動,而當作業被重新提交時,由於用戶可能變更了代碼邏輯,將直接由用戶決定是否按照保存點恢復。對於被終止的作業,無論是重啓或者是重新提交,都直接採取由用戶決定的方式,因爲終止操作本身就帶有丟棄作業狀態的色彩。

失敗狀態的作業是由於異常錯誤被迫停止的。對於這類作業,有三重保障。一是任務自身可以設置重啓策略自動恢復,外部平臺無感知。二是,對於內部重啓依舊失敗的任務在平臺側可再次設置上層重啓策略。三是,手動重啓或重新提交。僅在重新提交時,由用戶決定按照那種方式啓動,其餘場景皆按照最近的保存點啓動。

任務 SQL 化

Flink JAR 和 PyFlink 都是採用 Flink API 的形式開發作業,這樣的形式必然極大地增加用戶的學習成本,影響開發的效率。需要不斷輸入和培養具有該領域開發技能的工程師,才能滿足源源不斷的業務需求。而產品定位不僅僅是面向數據中臺的開發工程師們,我們期望可以和離線目標用戶保持一致,將目標羣體滲透至分析人員乃至業務研發和部分的產品經理,簡單的需求完全可以自己動手實現。要達到這個目的,必然開發的形式也要向離線看齊,作業 SQL 化是勢在必行的。

我們期望 Flink 可以提供一種類似於 Hive Cli 或者 Hive JDBC 的作業提交方式,用戶無需寫一行 Java 或 Scala 代碼。查閱官方文檔,Flink 確實提供了一個 SQL 客戶端以支持以一種簡單的方式來編寫、調試和提交表程序到 Flink 集羣,不過截止到目前最新的 release 1.13 版本,SQL 客戶端僅支持嵌入式模式,相關的功能還不夠健全,另外對於 connector 支持也是有限的。因此,需要尋求一種更穩定、更高可擴展性的實現方案。

經過一番調研後,我們發現袋鼠雲開源的「flinkStreamSQL」基本可以滿足我們目前的要求。此項目是基於開源的 Flink 打造的,並對其實時 SQL 進行了擴展,支持原生 Flink SQL 所有的語法。

實現機制

下圖爲 Flink 官方提供的作業角色流程圖,由圖可知,用戶提交的代碼將在 Client 端進行加工、轉換(最終生成 Jobgraph )然後提交至遠程集羣。

那麼要實現用戶層面的作業 SQL 化,底層的實現同樣是繞不開這個流程。實際上「flinkStreamSQL」項目就是通過定製化的手段實現了 Client 端的邏輯,可以將整個過程簡要地描述爲:

構建 PackagedProgram

利用 PackagedProgramUtils 生成 JobGraph。

通過 YarnClusterDescriptor 提交作業。

其中,第一步是最關鍵的,PackagedProgram 的構造方法如下:

PackagedProgram.newBuilder()
                .setJarFile(coreJarFile)
                .setArguments(execArgs)
                .setSavepointRestoreSettings(savepointRestoreSettings)
                .build();

execArgs 爲外部輸入參數,這裏就包含了用戶提交的 SQL。而 coreJarFile 對應的就是 API 開發方式時用戶提交的 JAR 文件,只不過這裏系統幫我們實現了。coreJarFile 的代碼對應項目中的 core module,該 module 本質上就是 API 開發方式的一個 template 模板。module 內實現了自定義 SQL 解析以及各類 connector plugin 注入。更多細節可通過開源項目進一步瞭解。

定製開發

我們基於「flinkStreamSQL」進行了二次開發,以滿足內部更多樣化的需求。主要分爲以下幾點:

除了上文提到的一些功能特性,平臺還支持了:

這些點就不在本文詳細闡述,但作爲一個實時計算平臺這些點又是必不可少的。

線上效果

作業總覽

作業詳情

作業監控

未來工作

隨着業務的繼續推進,平臺將在以下幾方面繼續迭代優化:

參考

本文轉自:https://tech.ipalfish.com/blog/2021/06/01/palink

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/saznc-v8myqWnubCu6aQzw