Spark 調優 - 一文搞定 Join 優化

SparkSQL 總體流程

在闡述 Join 實現之前,我們首先簡單介紹 SparkSQL 的總體流程,一般地,我們有兩種方式使用 SparkSQL,一種是直接寫 sql 語句,這個需要有元數據庫支持,例如 Hive 等,另一種是通過 Dataset/DataFrame 編寫 Spark 應用程序。如下圖所示,sql 語句被語法解析 (SQL AST) 成查詢計劃,或者我們通過 Dataset/DataFrame 提供的 APIs 組織成查詢計劃,查詢計劃分爲兩大類:邏輯計劃和物理計劃,這個階段通常叫做邏輯計劃,經過語法分析 (Analyzer)、一系列查詢優化(Optimizer) 後得到優化後的邏輯計劃,最後被映射成物理計劃,轉換成 RDD 執行。

對於語法解析、語法分析以及查詢優化,本文不做詳細闡述,本文重點介紹 Join 的物理執行過程。

Join 基本要素

如下圖所示,Join 大致包括三個要素:Join 方式、Join 條件以及過濾條件。其中過濾條件也可以通過 AND 語句放在 Join 條件中。

Spark 支持所有類型的 Join,包括:

下面分別闡述這幾種 Join 的實現。

Join 基本實現流程

總體上來說,Join 的基本實現流程如下圖所示,Spark 將參與 Join 的兩張表抽象爲流式遍歷表 (streamIter) 和查找表(buildIter),通常 streamIter 爲大表,buildIter 爲小表,我們不用擔心哪個表爲 streamIter,哪個表爲 buildIter,這個 spark 會根據 join 語句自動幫我們完成。

在實際計算時,spark 會基於 streamIter 來遍歷,每次取出 streamIter 中的一條記錄 rowA,根據 Join 條件計算 keyA,然後根據該 keyA 去 buildIter 中查找所有滿足 Join 條件 (keyB==keyA) 的記錄 rowBs,並將 rowBs 中每條記錄分別與 rowAjoin 得到 join 後的記錄,最後根據過濾條件得到最終 join 的記錄。

從上述計算過程中不難發現,對於每條來自 streamIter 的記錄,都要去 buildIter 中查找匹配的記錄,所以 buildIter 一定要是查找性能較優的數據結構。spark 提供了三種 join 實現:sort merge join、broadcast join 以及 hash join。

sort merge join 實現

要讓兩條記錄能 join 到一起,首先需要將具有相同 key 的記錄在同一個分區,所以通常來說,需要做一次 shuffle,map 階段根據 join 條件確定每條記錄的 key,基於該 key 做 shuffle write,將可能 join 到一起的記錄分到同一個分區中,這樣在 shuffle read 階段就可以將兩個表中具有相同 key 的記錄拉到同一個分區處理。前面我們也提到,對於 buildIter 一定要是查找性能較優的數據結構,通常我們能想到 hash 表,但是對於一張較大的表來說,不可能將所有記錄全部放到 hash 表中,另外也可以對 buildIter 先排序,查找時按順序查找,查找代價也是可以接受的,我們知道,spark shuffle 階段天然就支持排序,這個是非常好實現的,下面是 sort merge join 示意圖。

在 shuffle read 階段,分別對 streamIter 和 buildIter 進行 merge sort,在遍歷 streamIter 時,對於每條記錄,都採用順序查找的方式從 buildIter 查找對應的記錄,由於兩個表都是排序的,每次處理完 streamIter 的一條記錄後,對於 streamIter 的下一條記錄,只需從 buildIter 中上一次查找結束的位置開始查找,所以說每次在 buildIter 中查找不必重頭開始,整體上來說,查找性能還是較優的。

broadcast join 實現

爲了能具有相同 key 的記錄分到同一個分區,我們通常是做 shuffle,那麼如果 buildIter 是一個非常小的表,那麼其實就沒有必要大動干戈做 shuffle 了,直接將 buildIter 廣播到每個計算節點,然後將 buildIter 放到 hash 表中,如下圖所示。

從上圖可以看到,不用做 shuffle,可以直接在一個 map 中完成,通常這種 join 也稱之爲 map join。那麼問題來了,什麼時候會用 broadcast join 實現呢?這個不用我們擔心,spark sql 自動幫我們完成,當 buildIter 的估計大小不超過參數 spark.sql.autoBroadcastJoinThreshold 設定的值 (默認 10M),那麼就會自動採用 broadcast join,否則採用 sort merge join。

hash join 實現

除了上面兩種 join 實現方式外,spark 還提供了 hash join 實現方式,在 shuffle read 階段不對記錄排序,反正來自兩格表的具有相同 key 的記錄會在同一個分區,只是在分區內不排序,將來自 buildIter 的記錄放到 hash 表中,以便查找,如下圖所示。

不難發現,要將來自 buildIter 的記錄放到 hash 表中,那麼每個分區來自 buildIter 的記錄不能太大,否則就存不下,默認情況下 hash join 的實現是關閉狀態,如果要使用 hash join,必須滿足以下四個條件:

所以說,使用 hash join 的條件其實是很苛刻的,在大多數實際場景中,即使能使用 hash join,但是使用 sort merge join 也不會比 hash join 差很多,所以儘量使用 hash

下面我們分別闡述不同 Join 方式的實現流程。

inner join

inner join 是一定要找到左右表中滿足 join 條件的記錄,我們在寫 sql 語句或者使用 DataFrmae 時,可以不用關心哪個是左表,哪個是右表,在 spark sql 查詢優化階段,spark 會自動將大表設爲左表,即 streamIter,將小表設爲右表,即 buildIter。這樣對小表的查找相對更優。其基本實現流程如下圖所示,在查找階段,如果右表不存在滿足 join 條件的記錄,則跳過。

left outer join

left outer join 是以左表爲準,在右表中查找匹配的記錄,如果查找失敗,則返回一個所有字段都爲 null 的記錄。我們在寫 sql 語句或者使用 DataFrmae 時,一般讓大表在左邊,小表在右邊。其基本實現流程如下圖所示。

right outer join

right outer join 是以右表爲準,在左表中查找匹配的記錄,如果查找失敗,則返回一個所有字段都爲 null 的記錄。所以說,右表是 streamIter,左表是 buildIter,我們在寫 sql 語句或者使用 DataFrmae 時,一般讓大表在右邊,小表在左邊。其基本實現流程如下圖所示。

full outer join

full outer join 相對來說要複雜一點,總體上來看既要做 left outer join,又要做 right outer join,但是又不能簡單地先 left outer join,再 right outer join,最後 union 得到最終結果,因爲這樣最終結果中就存在兩份 inner join 的結果了。因爲既然完成 left outer join 又要完成 right outer join,所以 full outer join 僅採用 sort merge join 實現,左邊和右表既要作爲 streamIter,又要作爲 buildIter,其基本實現流程如下圖所示。

由於左表和右表已經排好序,首先分別順序取出左表和右表中的一條記錄,比較 key,如果 key 相等,則 joinrowA 和 rowB,並將 rowA 和 rowB 分別更新到左表和右表的下一條記錄;如果keyA<keyB,則說明右表中沒有與左表 rowA 對應的記錄,那麼 joinrowA 與 nullRow,緊接着,rowA 更新到左表的下一條記錄;如果 keyA>keyB,則說明左表中沒有與右表 rowB 對應的記錄,那麼 joinnullRow 與 rowB,緊接着,rowB 更新到右表的下一條記錄。如此循環遍歷直到左表和右表的記錄全部處理完。

left semi join

left semi join 是以左表爲準,在右表中查找匹配的記錄,如果查找成功,則僅返回左邊的記錄,否則返回 null,其基本實現流程如下圖所示。

left anti join

left anti join 與 left semi join 相反,是以左表爲準,在右表中查找匹配的記錄,如果查找成功,則返回 null,否則僅返回左邊的記錄,其基本實現流程如下圖所示。

總結

Join 是數據庫查詢中一個非常重要的語法特性,在數據庫領域可以說是 “得 join 者的天下”,SparkSQL 作爲一種分佈式數據倉庫系統,給我們提供了全面的 join 支持,並在內部實現上無聲無息地做了很多優化,瞭解 join 的實現將有助於我們更深刻的瞭解我們的應用程序的運行軌跡。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/11KIfzzqrJRmHptsJvMhFA