17 張圖帶你徹底理解 Hudi Upsert 原理

  1. 前言

========

如果要深入瞭解 Apache Hudi 技術的應用或是性能調優,那麼明白源碼中的原理對我們會有很大的幫助。Upsert是 Apache Hudi 的核心功能之一,主要完成增量數據在HDFS/對象存儲上的修改,並可以支持事務。而在 Hive 中修改數據需要重新分區或重新整個表,但是對於 Hudi 而言,更新可以是文件級別的重寫或是數據先進行追加後續再重寫,對比 Hive 大大提高了更新性能。upsert支持兩種模式的寫入Copy On WriteMerge On Read ,下面本文將介紹 Apache Hudi 在 Spark 中 Upsert 的內核原理。

  1. Upsert 場景執行流程介紹 ==================

對於 Hudi Upsert 操作整理了比較核心的幾個操作如下圖所示

  1. 開始提交:判斷上次任務是否失敗,如果失敗會觸發回滾操作。然後會根據當前時間生成一個事務開始的請求標識元數據。2. 構造 HoodieRecord Rdd 對象:Hudi 會根據元數據信息構造 HoodieRecord Rdd 對象,方便後續數據去重和數據合併。3. 數據去重: 一批增量數據中可能會有重複的數據,Hudi 會根據主鍵對數據進行去重避免重複數據寫入 Hudi 表。4. 數據 fileId 位置信息獲取: 在修改記錄中可以根據索引獲取當前記錄所屬文件的 fileid,在數據合併時需要知道數據 update 操作向那個 fileId 文件寫入新的快照文件。5. 數據合併:Hudi 有兩種模式 cow 和 mor。在 cow 模式中會重寫索引命中的 fileId 快照文件;在 mor 模式中根據 fileId 追加到分區中的 log 文件。6. 完成提交:在元數據中生成 xxxx.commit 文件,只有生成 commit 元數據文件,查詢引擎才能根據元數據查詢到剛剛 upsert 後的數據。7.compaction 壓縮:主要是 mor 模式中才會有,他會將 mor 模式中的 xxx.log 數據合併到 xxx.parquet 快照文件中去。8.hive 元數據同步:hive 的元素數據同步這個步驟需要配置非必需操作,主要是對於 hive 和 presto 等查詢引擎,需要依賴 hive 元數據才能進行查詢,所以 hive 元數據同步就是構造外表提供查詢。

介紹完 Hudi 的 upsert 運行流程,再來看下 Hudi 如何進行存儲並且保證事務,在每次 upsert 完成後都會產生 commit 文件記錄每次重新的快照文件。

例如上圖

時間 1 初始化寫入三個分區文件分別是 xxx-1.parquet;

時間 3 場景如果會修改分區 1 和分區 2xxx-1.parquet 的數據,那麼寫入完成後會生成新的快照文件分別是分區 1 和分區 2xxx-3.parquet 文件。(上述是 COW 模式的過程,而對於 MOR 模式的更新會生成 log 文件,如果 log 文件存在並且可追加則繼續追加數據)。

時間 5 修改分區 1 的數據那麼同理會生成分區 1 下的新快照文件。

可以看出對於 Hudi 每次修改都是會在文件級別重新寫入數據快照。查詢的時候就會根據最後一次快照元數據加載每個分區小於等於當前的元數據的 parquet 文件。Hudi 事務的原理就是通過元數據 mvcc 多版本控制寫入新的快照文件,在每個時間階段根據最近的元數據查找快照文件。因爲是重寫數據所以同一時間只能保證一個事務去重寫 parquet 文件。不過當前 Hudi 版本加入了併發寫機制,原理是 Zookeeper 分佈鎖控制或者 HMS 提供鎖的方式, 會保證同一個文件的修改只有一個事務會寫入成功。

下面將根據 Spark 調用 write 方法深入剖析 upsert 操作每個步驟的執行流程。

2.1 開始提交 & 數據回滾

在構造好 spark 的 rdd 後會調用 df.write.format("hudi") 方法執行數據的寫入,實際會調用 Hudi 源碼中的HoodieSparkSqlWriter#write方法實現。在執行任務前 Hudi 會創建 HoodieWriteClient 對象,並構造 HoodieTableMetaClient 調用 startCommitWithTime 方法開始一次事務。在開始提交前會獲取 hoodie 目錄下的元數據信息,判斷上一次寫入操作是否成功,判斷的標準是上次任務的快照元數據有 xxx.commit 後綴的元數據文件。如果不存在那麼 Hudi 會觸發回滾機制,回滾是將不完整的事務元數據文件刪除,並新建 xxx.rollback 元數據文件。如果有數據寫入到快照 parquet 文件中也會一起刪除。

2.2 構造 HoodieRecord Rdd 對象


HoodieRecord Rdd 對象的構造先是通過 map 算子提取 spark dataframe 中的 schema 和數據,構造 avro 的 GenericRecords Rdd,然後 Hudi 會在使用 map 算子封裝爲 HoodierRecord Rdd。對於 HoodileRecord Rdd 主要由currentLocation,newLocation,hoodieKey,data 組成。HoodileRecord 數據結構是爲後續數據去重和數據合併時提供基礎。

currentLocation 當前數據位置信息:只有數據在當前 Hudi 表中存在纔會有,主要存放 parquet 文件的 fileId,構造時默認爲空,在查找索引位置信息時被賦予數據。•newLocation 數據新位置信息:與 currentLocation 不同不管是否存在都會被賦值,newLocation 是存放當前數據需要被寫入到那個 fileID 文件中的位置信息,構造時默認爲空,在 merge 階段會被賦予位置信息。•HoodieKey 主鍵信息:主要包含 recordKey 和 patitionPath 。recordkey 是由 hoodie.datasource.write.recordkey.field 配置項根據列名從記錄中獲取的主鍵值。patitionPath 是分區路徑。Hudi 會根據 hoodie.datasource.write.partitionpath.field 配置項的列名從記錄中獲取的值作爲分區路徑。•data 數據:data 是一個泛型對象,泛型對象需要實現 HoodieRecordPayload 類,主要是實現合併方法和比較方法。默認實現 OverwriteWithLatestAvroPayload 類,需要配置 hoodie.datasource.write.precombine.field 配置項獲取記錄中列的值用於比較數據大小,去重和合並都是需要保留值最大的數據。

2.3 數據去重

在 upsert 場景中數據去重是默認要做的操作,如果不進行去重會導致數據重複寫入 parquet 文件中。當然 upsert 數據中如果沒有重複數據是可以關閉去重操作。配置是否去重參數爲hoodie.combine.before.upsert,默認爲 true 開啓。

在 Spark client 調用 upsert 操作是 Hudi 會創建 HoodieTable 對象,並且調用 upsert 方法。對於 HooideTable 的實現分別有 COW 和 MOR 兩種模式的實現。但是在數據去重階段和索引查找階段的操作都是一樣的。調用HoodieTable#upsert方法底層的實現都是SparkWriteHelper

在去重操作中,會先使用 map 算子提取 HoodieRecord 中的 HoodieatestAvroPayload 的實現是保留時間戳最大的記錄。**這裏要注意如果我們配置的是全局類型的索引,map 中的 key 值是 HoodieKey 對象中的 recordKey。**因爲全局索引是需要保證所有分區中的主鍵都是唯一的,避免不同分區數據重複。當然如果是非分區表,沒有必要使用全局索引。

2.4 數據位置信息索引查找

對於 Hudi 索引主要分爲兩大類:

• **非全局索引:**索引在查找數據位置信息時,只會檢索當前分區的索引,索引只保證當前分區內數據做 upsert。如果記錄的分區值發生變更就會導致數據重複。• **全局索引:**顧名思義在查找索引時會加載所有分區的索引,用於定位數據位置信息,即使發生分區值變更也能定位數據位置信息。這種方式因爲要加載所有分區文件的索引,對查找性能會有影響(HBase 索引除外)。

Spark 索引實現主要有如下幾種

• 布隆索引(BloomIndex)• 全局布隆索引(GlobalBloomIndex)• 簡易索引(SimpleIndex)• 簡易全局索引(GlobalSimpleIndex)• 全局 HBase 索引 (HbaseIndex)• 內存索引 (InMemoryHashIndex)。

下面會對索引的實現方式做一一介紹。

2.4.1 布隆索引(BloomIndex)

Spark 布隆索引的實現類是 SparkHoodieBloomIndex,要想知道布隆索引需要了解下布隆算法一般用於判斷數據是否存在的場景,在 Hudi 中用來判斷數據在 parquet 文件中是否存在。其原理是計算 RecordKey 的 hash 值然後將其存儲到 bitmap 中去,key 值做 hash 可能出現 hash 碰撞的問題,爲了較少 hash 值的碰撞使用多個 hash 算法進行計算後將 hash 值存入 BitMap,一般三次 hash 最佳,算法詳細參考《漫畫:什麼是布隆算法?》。但是索引任然有 hash 碰撞的問題,但是誤判率極低可以保證 99% 以上的數據判斷是正確的。即使有個別數據發生了誤判也沒有關係在合併操作中如果匹配不到也會被丟棄。

索引實現類調用 tagLocation 開始查找索引記錄存在哪個 parquet 文件中,步驟如下

  1. 提取所有的分區路徑和主鍵值,然後計算每個分區路徑中需要根據主鍵查找的索引的數量。2. 有了需要加載的分區後,調用 LoadInvolvedFiles 方法加載分區下所有的 parquet 文件。在加載 paquet 文件只是加載文件中的頁腳信息,頁腳存放的有布隆過濾器、記錄最小值、記錄最大值。對於布隆過濾器其實是存放的是 bitmap 序列化的對象。3. 加載好 parquet 的頁腳信息後會根據最大值和最小值構造線段樹。4. 根據 Rdd 中 RecordKey 進行數據匹配查找數據屬於那個 parqeut 文件中,對於 RecordKey 查找只有符合最大值和最小值範圍纔會去查找布隆過濾器中的 bitmap ,RecordKey 小於最小值找左子樹,RecordKey 大於最大值的 key 找右子樹。遞歸查詢後如果查找到節點爲空說明 RecordKey 在當前分區中不存在,當前 Recordkey 是新增數據。查找索引時 spark 會自定義分區避免大量數據在一個分區查找導致分區數據傾斜。查找到 RecordKey 位置信息後會構造 <HoodieKey,HoodieRecordLocation> Rdd 對象。

線段樹的數據結構如下:

  1. 以 Rdd 爲左表和 <HoodieKey,HoodieRecordLocation>Rdd 做左關聯,提取 HoodieRecordLocation 位置信息賦值到 HoodieRecord 的 currentLocation 變量上, 最後得到新的 HoodieRecord Rdd。在 HoodieRecordLocation 對象中包含文件 fileID 和快照時間。

說明:Parquet 文件名稱組成主要是 36 位 fileId、文件編號、spark 任務自定義分區編號、spark 任務 stage 編號、spark 任務 attempt id、commit 時間。

2.4.2 全局布隆索引(GlobalBloomIndex)

全局布隆索引底層算法和布隆索引一樣,只是全局布隆索引在查找每個 RecordKey 屬於那個 parquet 文件中,會加載所有 parquet 文件的頁腳信息構造線段樹,然後在去查詢索引。

因爲 Hudi 需要加載所有的 parquet 文件和線段樹節點變多對於查找性能會比普通的布隆索引要差。但是對於分區字段的值發生了修改,如果還是使用普通的布隆索引會導致在當前分區查詢不到當成新增數據寫入 Hudi 表。這樣我們的數據就重複了,在很多業務場景是不被允許的。所以在選擇那個字段做分區列時,儘量選擇列值永遠不會發生變更的,這樣我們使用普通布隆索引就可以了。

全局布隆的實現是繼承布隆索引的實現,重寫了索引數據的加載和 HoodieRecord Rdd 左關聯部分。加載所有文件的頁腳信息剛剛已經提到過了,數據不在知道在哪個分區所以要加載全部文件進行判斷。在左關聯操作中與普通布隆索引不同的是,如果分區發生了變更,默認情況下會修改 HoodieKey 中的 partitionPath, 數據是不會寫到變更後的分區路徑下,而是會重寫到之前的分區路徑下,但是數據的內容還是會更新。如果希望刪除舊分區數據,新數據寫入當前記錄的分區。需要設置hoodie.bloom.index.update.partition.path配置項爲 true,允許分區數據變更到其他分區。此時,關聯查詢會在原有的基礎上在生成一條刪除記錄,刪除記錄 HoodieRecordPayload 的實現類是 EmptyHoodieRecordPayload。EmptyHoodieRecordPayload 只會存放 hoodieKey 的主鍵信息,在數據合併時會被忽略,達到數據硬刪除的目的。這裏可以根據業務場景選擇是否開啓分區變更。

2.4.3 簡易索引(SimpleIndex)

簡易索引與布隆索引的不同是直接加載分區中所有的 parquet 數據然後在與當前的數據比較是否存在,實現比較簡單,我想也是因爲這樣才這麼命名的。以下是簡易索引的執行步驟:

  1. 提取所有的分區路徑和主鍵值。2. 根據分區路徑加載所有涉及分區路徑的 parquet 文件的數據主要是 HooieKey 和 fileID 兩列的數據,構造 <HoodieKey,HoodieRecordLocation> Rdd 對象。3. 同布隆索引一樣以 Rdd 爲左表和 <HoodieKey,HoodieRecordLocation>Rdd 做左關聯,提取 HoodieRecordLocation 位置信息賦值到 HoodieRecord 的 currentLocation 變量上, 最後得到新的 HoodieRecord Rdd

2.4.4 簡易全局索引(GlobalSimpleIndex)

簡易全局索引同布隆全局索引一樣,需要加載所有分區的 parquet 文件數據,構造 <HoodieKey,HoodieRecordLocation>Rdd 然後後進行關聯。在簡易索引中hoodie.simple.index.update.partition.path配置項也是可以選擇是否允許分區數據變更。數據文件比較多數據量很大,這個過程會很耗時。

2.4.5 HBase 索引 (HBaseIndex)

HBase 索引與布隆索引、簡易索引不同,本身就是一個全局索引,那種場景都可以用。但是需要額外 HBase 服務來存儲 Hudi 的索引信息,一旦 HBase 出現故障會導致 Hudi upsert 無法工作。不管在布隆索引或簡易索引中索引是和 parquet 文件是一體的,要麼一起成功,要麼一起失敗,但是在 HBase 索引中文件和索引是分開在特定的情況下可能有一致性的問題。不過 HBase 索引最大程度的避免了這一問題。

HBase 索引實現步驟如下

  1. 連接 hbase 數據庫 2. 批量請求 hbase 數據庫 3. 檢查 get 獲取的數據是否爲有效索引,這時 Hudi 會連接元數據檢查 commit 時間是否有效,如果無效 currentLocation 將不會被賦值。檢查是否爲有效索引的目的是當索引更新一半 hbase 宕機導致任務失敗,保證不會加載過期索引。避免 hbase 索引和數據不一致導致數據進入錯誤的分區。4. 檢查是否開啓允許分區變更,這裏的做法和全局布隆索引、全局簡易索引的實現方式一樣。

在 Hudi 中使用 HBase 索引需要提前建表,HBase 表的列簇爲_s。示例如下

create 'table_name','_s'

參數配置:

hoodie.index.hbase.zkquorum   必填項:zk連接地址
hoodie.index.hbase.zkport    必填項:zk連接端口
hoodie.index.hbase.zknode.path  必填項:zookeeper znode路徑
hoodie.index.hbase.table  必填項:hbase中的表名
hoodie.index.hbase.get.batch.size   默認值100 :每批量請求大小
hoodie.hbase.index.update.partition.path  默認值false: 是否允許分區變更 
hoodie.index.hbase.put.batch.size.autocompute  默認值false :是否開啓自動計算每個批次大小
hoodie.index.hbase.rollback.sync   默認值false:rollback階段是否開啓同步索引。如果設置爲true在寫入hbase索引導致hbase 宕機或者jvm oom任務失敗,在觸發rollback 階段 會刪除失敗任務的索引保證索引和數據一致。在上次任務失敗且數據分區字段值反覆變更時可以避免數據重複。

2.4.6 內存索引 (InMemoryHashIndex)

內存索引目前 Spark 的實現只是構造的一個 ConcurrentMap 在內存中,不會加載 parquet 文件中的索引,當調用 tagLocation 方法會在 map 中判斷 key 值是否存在。Spark 內存索引當前是用來測試的索引。

2.4.7 索引的選擇

普通索引:主要用於非分區表和分區不會發生分區列值變更的表。當然如果你不關心多分區主鍵重複的情況也是可以使用。他的優勢是隻會加載 upsert 數據中的分區下的每個文件中的索引,相對於全局索引需要掃描的文件少。並且索引只會命中當前分區的 fileid 文件,需要重寫的快照也少相對全局索引高效。但是某些情況下我們的設置的分區列的值就是會變那麼必須要使用全局索引保證數據不重複,這樣 upsert 寫入速度就會慢一些。其實對於非分區表他就是個分區列值不會變且只有一個分區的表,很適合普通索引,如果非分區表硬要用全局索引其實和普通索引性能和效果是一樣的。

全局索引:分區表場景要考慮分區值變更,需要加載所有分區文件的索引比普通索引慢。

布隆索引:加載 fileid 文件頁腳布隆過濾器,加載少量數據數據就能判斷數據是否在文件存在。缺點是有一定的誤判,但是 merge 機制可以避免重複數據寫入。parquet 文件多會影響索引加載速度。適合沒有分區變更和非分區表。主鍵如果是類似自增的主鍵布隆索引可以提供更高的性能,因爲布隆索引記錄的有最大 key 和最小 key 加速索引查找。

全局布隆索引:解決分區變更場景,原理和布隆索引一樣,在分區表中比普通布隆索引慢。

簡易索引:直接加載文件裏的數據不會像布隆索引一樣誤判,但是加載的數據要比布隆索引要多,left join 關聯的條數也要比布隆索引多。大多數場景沒布隆索引高效,但是極端情況命中所有的 parquet 文件,那麼此時還不如使用簡易索引加載所有文件裏的數據進行判斷。

全局簡易索引:解決分區變更場景,原理和簡易索引一樣,在分區表中比普通簡易索引慢。建議優先使用全局布隆索引。

HBase 索引:不受分區變跟場景的影響,操作算子要比布隆索引少,在大量的分區和文件的場景中比布隆全局索引高效。因爲每條數據都要查詢 hbase ,upsert 數據量很大會對 hbase 有負載的壓力需要考慮 hbase 集羣承受壓力,適合微批分區表的寫入場景 。在非分區表中數量不大文件也少,速度和布隆索引差不多,這種情況建議用布隆索引。

內存索引:用於測試不適合生產環境

2.5 數據合併

COW 模式和 MOR 模式在前面的操作都是一樣的,不過在合併的時候 Hudi 構造的執行器不同。對於 COW 會根據位置信息中 fileId 重寫 parquet 文件,在重寫中如果數據是更新會比較 parquet 文件的數據和當前的數據的大小進行更新,完成更新數據和插入數據。而 MOR 模式會根據 fileId 生成一個 log 文件,將數據直接寫入到 log 文件中,如果 fileID 的 log 文件已經存在,追加數據寫入到 log 文件中。與 COW 模式相比少了數據比較的工作所以性能要好,但是在 log 文件中可能保存多次寫有重複數據在讀 log 數據時候就不如 cow 模式了。還有在 mor 模式中 log 文件和 parquet 文件都是存在的,log 文件的數據會達到一定條件和 parqeut 文件合併。所以 mor 有兩個視圖,ro 後綴的視圖是讀優化視圖(read-optimized)只查詢 parquet 文件的數據。rt 後綴的視圖是實時視圖(real-time)查詢 parquet 和 log 日誌中的內容。

2.5.1 Copy on Write 模式

COW 模式數據合併實現邏輯調用BaseSparkCommitActionExecutor#excute方法,實現步驟如下:

  1. 通過 countByKey 算子提取分區路徑和文件位置信息並統計條數,用於後續根據分區文件寫入的數據量大小評估如何分桶。2. 統計完成後會將結果寫入到 workLoadProfile 對象的 map 中,這個時候已經完成合並數據的前置條件。Hudi 會調用 saveWorkloadProfileMetadataToInfilght 方法寫入 infight 標識文件到. hoodie 元數據目錄中。在 workLoadProfile 的統計信息中套用的是類似雙層 map 數據結構, 統計是到 fileid 文件級別。3. 根據 workLoadProfile 統計信息生成自定義分區 ,這個步驟就是分桶的過程。首先會對更新的數據做分桶,因爲是更新數據在合併時要麼覆蓋老數據要麼丟棄,所以不存在 parquet 文件過於膨脹,這個過程會給將要發生修改的 fileId 都會添加一個桶。然後會對新增數據分配桶,新增數據分桶先獲取分區路徑下所有的 fileid 文件, 判斷數據是否小於 100 兆。小於 100 兆會被認爲是小文件後續新增數據會被分配到這個文件桶中,大於 100 兆的文件將不會被分配桶。獲取到小文件後會計算每個 fileid 文件還能存少數據然後分配一個桶。如果小文件 fileId 的桶都分配完了還不夠會根據數據量大小分配 n 個新增桶。最後分好桶後會將桶信息存入一個 map 集合中,當調用自定義實現 getpartition 方法時直接向 map 中獲取。所以 spark 在運行的時候一個桶會對應一個分區的合併計算。分桶參數

  2. 分桶結束後調用 handleUpsertPartition 合併數據。首先會獲取 map 集合中的桶信息,桶類型有兩種新增和修改兩種。如果桶 fileid 文件只有新增數據操作,直接追加文件或新建 parquet 文件寫入就好,這裏會調用 handleInsert 方法。如果桶 fileid 文件既有新增又有修改或只有修改一定會走 handUpdate 方法。這裏設計的非常的巧妙對於新增多修改改少的場景大部分的數據直接可以走新增的邏輯可以很好的提升性能。對於 handUpdate 方法的處理會先構造 HoodieMergeHandle 對象初始化一個 map 集合,這個 map 集合很特殊擁有存在內存的 map 集合和存在磁盤的 map 集合,這個 map 集合是用來存放所有需要 update 數據的集合用來遍歷 fileid 舊文件時查詢文件是否存在要不要覆蓋舊數據。這裏使用內存加磁盤爲了避免 update 桶中數據特別大情況可以將一部分存磁盤避免 jvm oom。update 數據會優先存放到內存 map 如果內存 map 不夠纔會存在磁盤 map,而內存 Map 默認大小是 1g 。DiskBasedMap 存儲是 key 信息存放的還是 record key ,value 信息存放 value 的值存放到那個文件上,偏移量是多少、存放大小和時間。這樣如果命中了磁盤上 map 就可以根據 value 存放的信息去獲取 hoodieRecord 了。

hoodie.memory.spillable.map.path   默認值 /tmp/ : 存放DiskBasedMap的路徑
hoodie.memory.merge.max.size       默認值 1024*1024*1024(1g):內存map的最大容量
  1. 構造 sparkMergHelper 開始合併數據寫入到新的快照文件。在 SparkMergHelper 內部會構造一個 BoundedInMemoryExecutor 的隊列,在這個隊列中會構造多個生產者和一個消費者(file 文件一般情況只有一個文件所以生產者也會是一個)。producers 會加載老數據的 fileId 文件裏的數據構造一個迭代器,執行的時候先調用 producers 完成初始化後調用 consumer。而 consumer 被調用後會比較數據是否存在 ExternalSpillableMap 中如果不存在重新寫入數據到新的快照文件,如果存在調用當前的 HoodileRecordPayload 實現類 combineAndGetUpdateValue 方法進行比較來確定是寫入老數據還是新數據,默認比較那個數據時間大。這裏有個特別的場景就是硬刪除,對於硬刪除裏面的數據是空的,比較後會直接忽略寫入達到數據刪除的目的。

2.5.2 Merge on Read 模式

在 MOR 模式中的實現和前面 COW 模式分桶階段邏輯相同,這裏主要說下最後的合併和 COW 模式不一樣的操作。在 MOR 合併是調用AbstarctSparkDeltaCommitActionExecutor#execute方法,會構造 HoodieAppaendHandle 對象。在寫入時調用 append 向 log 日誌文件追加數據,如果日誌文件不存在或不可追加將新建 log 文件。

寫入參數:

hoodie.logfile.max.size   默認值:1024 * 1024 * 1024(1g) 日誌文件最大大小
hoodie.logfile.data.block.max.size  默認值:256 * 1024 * 1024(256兆) 寫入多少數據後刷一次磁盤

分桶相關參數與 COW 模式通用

2.6 索引更新


數據寫入到 log 文件或者是 parquet 文件,這個時候需要更新索引。簡易索引和布隆索引對於他們來說索引在 parquet 文件中是不需要去更新索引的。這裏索引更新只有 HBase 索引和內存索引需要更新。內存索引是更新通過 map 算子寫入到內存 map 上,HBase 索引通過 map 算子 put 到 HBase 上。

2.7 完成提交

2.7.1 提交 & 元數據信息歸檔

上述操作如果都成功且寫入時 writeStatus 中沒有任何錯誤記錄,Hudi 會進行完成事務的提交和元數據歸檔操作,步驟如下

1.sparkRddWriteClient 調用 commit 方法,首先會向 Hdfs 上提交一個. commit 後綴的文件,裏面記錄的是 writeStatus 的信息包括寫入多少條數據、fileID 快照的信息、Schema 結構等等。當 commit 文件寫入成功就意味着一次 upsert 已經成功,Hudi 內的數據就可以查詢到。2. 爲了不讓元數據一直增長下去需要對元數據做歸檔操作。元數據歸檔會先創建 HoodieTimelineArchiveLog 對象,通過 HoodieTableMetaClient 獲取. hoodie 目錄下所有的元數據信息,根據這些元數據信息來判斷是否達到歸檔條件。如果達到條件構造 HooieLogFormatWrite 對象對 archived 文件進行追加。每個元數據文件會封裝成 HoodieLogBlock 對象批量寫入。歸檔參數

hoodie.keep.max.commits   默認30:最多保留多少個commit元數據,默認會在第31個commit的時候會觸發一次元數據歸檔操作,由這個參數來控制元數據歸檔時機。
hoodie.keep.min.commits   默認20: 最少保留多少個commit元數據,默認會將當前所有的commimt提交的個數減去20,剩下的11個元數據被歸檔,這個參數間接控制每次回收元數據個數。
hoodie.commits.archival.batch  默認10 :每多少個元數據寫入一次到archived文件裏,這裏就是一個刷盤的間隔。

2.7.2 數據清理

元數據清理後 parquet 文件也是要去清理,在 Hudi 有專有 spark 任務去清理文件。因爲是通過 spark 任務去清理文件也有對應 XXX.clean.request、xxx.clean.infight、xxx.clean 元數據來標識任務的每個任務階段。數據清理步驟如下:

  1. 構造 baseCleanPanActionExecutor 執行器,並調用 requestClean 方法獲取元數據生成清理計劃對象 HoodieCleanPlan。判斷 HoodieCleanPlan 對象滿足觸發條件會向元數據寫入 xxx.request 標識,表示可以開始清理計劃。2. 生成執行計劃後調用 baseCleanPanActionExecutor 的繼承類 clean 方法完成執行 spark 任務前的準備操作,然後向 hdfs 寫入 xxx.clean.inflight 對象準備執行 spark 任務。3.spark 任務獲取 HoodieCleanPlan 中所有分區序列化成爲 Rdd 並調用 flatMap 迭代每個分區的文件。然後在 mapPartitions 算子中調用 deleteFilesFunc 方法刪除每一個分區的過期的文件。最後 reduceBykey 彙總刪除文件的結果構造成 HoodieCleanStat 對象,將結果元數據寫入 xxx.clean 中完成數據清理。
hoodie.clean.automatic  默認true :是否開啓自動數據清理,如果關閉upsert 不會執行清理任務。
hoodie.clean.async   默認false: 是否異步清理文件。開啓異步清理文件的原理是開啓一個後臺線程,在client執行upsert時就會被調用。
hoodie.cleaner.policy  默認 HoodieCleaningPolicy.KEEP_LATEST_COMMITS :數據清理策略參數,清理策略參數有兩個配置KEEP_LATEST_FILE_VERSIONS和KEEP_LATEST_COMMITS。
hoodie.cleaner.commits.retained 默認10 :在KEEP_LATEST_COMMITS策略中配置生效,根據commit提交次數計算保留多少個fileID版本文件。因爲是根據commit提交次數來計算,參數不能大於hoodie.keep.min.commits(最少保留多少次commmit元數據)。
hoodie.cleaner.fileversions.retained  默認3 :在KEEP_LATEST_FILE_VERSIONS策略中配置生效,根據文件版本數計算保留多少個fileId版本文件。

2.7.3 數據壓縮

數據壓縮是 mor 模式纔會有的操作,目的是讓 log 文件合併到新的 fileId 快照文件中。因爲數據壓縮也是 spark 任務完成的,所以在運行時也對應的 xxx.compaction.requet、xxx.compaction.clean、xxx.compaction 元數據生成記錄每個階段。數據壓縮實現步驟如下:

1.sparkRDDwirteClient 調用 compaction 方法構造 BaseScheduleCompationActionExecutor 對象並調用 scheduleCompaction 方法,計算是否滿足數據壓縮條件生成 HoodieCompactionPlan 執行計劃元數據。如果滿足條件會向 hdfs 寫入 xxx.compation.request 元數據標識請求提交 spark 任務。2.BaseScheduleCompationActionExecutor 會調用繼承類 SparkRunCompactionExecutor 類並調用 compact 方法構造 HoodieSparkMergeOnReadTableCompactor 對象來實現壓縮邏輯,完成一切準備操作後向 hdfs 寫入 xxx.compation.inflight 標識。3.spark 任務執行 parallelize 加載 HooideCompactionPlan 的執行計劃, 然後調用 compact 迭代執行每個分區中 log 的合併邏輯。在 compact 會構造 HoodieMergelogRecordScanner 掃描文件對象,加載分區中的 log 構造迭代器遍歷 log 中的數據寫入 ExtemalSpillableMap。這個 ExtemalSpillableMap 和 cow 模式中內存加載磁盤的 map 是一樣的。至於合併邏輯是和 cow 模式的合併邏輯是一樣的,這裏不重複闡述都是調用 cow 模式的 handleUpdate 方法。4. 完成合並操作會構造 writeStatus 結果信息,並寫入 xxx.compaction 標識到 hdfs 中完成合並操作。

壓縮參數:

hoodie.compact.inline  默認false:是否在一個事務完成後內聯執行壓縮操作,這裏開啓並不一定每次都會觸發索引操作後面還有策略判斷。
hoodie.compact.inline.trigger.strategy  默認CompactionTriggerStrategy.NUM_COMMITS: 壓縮策略參數。該參數有NUM_COMMITS、TIME_ELAPSED、NUM_AND_TIME、NUM_OR_TIME。NUM_COMMITS根據提交次數來判斷是否進行壓縮;TIME_ELAPSED根據實際來判斷是否進行壓縮;NUM_AND_TIME 根據提交次數和時間來判斷是否進行壓縮;NUM_OR_TIME根據提交次數或時間來判斷是否進行壓縮。
hoodie.compact.inline.max.delta.commits  默認5 :設置提交多少次後觸發壓縮策略。在NUM_COMMITS、NUM_AND_TIME和NUM_OR_TIME策略中生效。
hoodie.compact.inline.max.delta.seconds  默認60 * 60(1小時):設置在經過多長時間後觸發壓縮策略。在TIME_ELAPSED、NUM_AND_TIME和NUM_OR_TIME策略中生效。

2.8 Hive 元數據同步


實現原理比較簡單就是根據 Hive 外表和 Hudi 表當前表結構和分區做比較,是否有新增字段和新增分區如果有會添加字段和分區到 Hive 外表。如果不同步 Hudi 新寫入的分區無法查詢。在 COW 模式中只會有 ro 表(讀優化視圖,而在 mor 模式中有 ro 表(讀優化視圖)和 rt 表(實時視圖)。

2.9 提交成功通知回調

當事務提交成功後向外部系統發送通知消息,通知的方式有兩種,一種是發送 http 服務消息,一種是發送 kafka 消息。這個通知過程我們可以把清理操作、壓縮操作、hive 元數據操作,都交給外部系統異步處理或者做其他擴展。也可以自己實現HoodieWriteCommitCallback的接口自定義實現。

回調參數

hoodie.write.commit.callback.on   默認false:是否開啓提交成功後向外部系統發送回調指令。
hoodie.write.commit.callback.class   默認org.apache.Hudi.callback.impl.HoodieWriteCommitHttpCallback: 配置回調實現類,默認通過Http的方式發送消息到外部系統
http實現類配置參數
hoodie.write.commit.callback.http.url   無默認配置項:外部服務http url地址。
hoodie.write.commit.callback.http.api.key  默認Hudi_write_commit_http_callback:外部服務http請求頭Hudi-CALLBACK-KEY的值,可用於服務請求驗籤使用。
hoodie.write.commit.callback.http.timeout.seconds  默認3秒:請求超時時間。
kafka實現類配置參數
hoodie.write.commit.callback.kafka.bootstrap.servers      無默認值:配置kafka broker 服務地址。
hoodie.write.commit.callback.kafka.topic   無默認值:配置kafka topic名稱。
hoodie.write.commit.callback.kafka.partition  無默認值:配置發送到那個kafka broker分區。
hoodie.write.commit.callback.kafka.acks    默認值all:配置kafka ack。
hoodie.write.commit.callback.kafka.retries  默認值值3:配置kafka 失敗重試次數。
  1. 最後 =====

在分析 Spark upsert 源碼中還有很多細節是略過,如時間線服務、併發寫機制、Cluster 模式、HFile 格式的寫入和 Merge 等等。篇幅有限先解析這麼多,希望本文能幫你瞭解 Spark upsert 的內核原理。謝謝大家閱讀本文。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/BYHwv16A7LAKZMnOZ8y6mA