SparkSQL 源碼解析系列
上週,面試了一個候選人。因爲我們是 Spark SQL 的深度用戶,所以,我肯定要好好問問候選人在 Spark SQL 的掌握程度。候選人簡歷號稱是某互聯網大廠公司架構,精通各種源碼。搞得我面試也有點緊張,怕一不小心被人家虐死了。
我先讓他說了下 Spark SQL 的原理。他肯定做好了功課:Spark SQL 原理 = 邏輯執行計劃 + 解析後的邏輯執行計劃 + 優化後的邏輯執行計劃 + 物理執行計劃。臉上逐漸漏出了得意的笑容,於是,我連續發問:
-
Spark SQL 解析執行計劃時是如何遍歷語法樹的?
-
大家都說不推薦用 RDD,應該用 DataFrame,你從底層實現上說說爲什麼?
-
Spark SQL 用 RDD 了嗎?它是怎麼用的?
-
你前面說的是一個非常基礎,寬泛的流程,我想聽的是 Spark 物理執行計劃的生成細節?
-
Spark SQL 中的 Code Generate 過程底層是如何實現的?
-
Optimizer 優化器和 Analyzer 處理模式一樣嗎?怎麼處理的?
中間他不斷地想把我扯到其他領域,說着模棱兩口的回答。不斷地解釋他們公司的業務是如何如何,平臺是如何如何。但作爲一名常年面試的面試官,怎麼可能讓他帶偏呢。整個過程就是,我挖一個坑,他馬上就跳進去,然後自己再也出不來了。我再拉他出來,然後他自己又跳進去了。其實,說到底,他並不熟悉源碼。然後就沒有然後了...
其實這種面試,每週都在重複。我也是有點疲憊了。我也才慢慢發現,這個行業牛人不少,但混子更多。隨隨便便期望薪資就是 30K、40K。現在有很多公司都要上大數據平臺,給了這些混子更多的空間。
不管如何,持續學習、不斷進步是我們得以生存的重要途徑。所以,剛好這兩天有點時間,跟大家來聊聊 Spark SQL 源碼。因爲篇幅有限,沒辦法展開所有。但大家看完,如果對源碼不熟悉,肯定會有所啓發。
-
執行計劃
-
Hive 準備表和數據
-
編寫測試代碼
-
執行計劃
-
查看 Spark 執行日誌
-
JAVA 執行命令
-
日誌配置加載
-
提交 Spark 應用程序
-
加載默認 executor 配置
-
權限驗證
-
啓動 Spark 重要服務
-
塊管理器
-
加載 Spark SQL 元數據
-
發送廣播提交 JOB
-
DAG 調度
-
Task 調度
-
生成代碼、完成任務
-
計劃執行流程源碼解析
-
設置斷點並啓動調試
-
SparkSession.sql API
-
豐富的 SessionState 組件
-
LogicPlan
-
ProjectLogicPlan
-
UnresolvedRelation
-
Dataset.ofRows API
-
邏輯執行計劃生成器源碼解析
-
邏輯執行計劃解析器源碼解析
-
Analyzer 組件
-
解析邏輯執行計劃流程
-
應用規則解析邏輯執行計劃
-
邏輯執行計劃優化器源碼解析
-
物理執行計劃生成源碼解析
-
源碼解析:Code Generate 代碼生成
執行計劃
Hive 準備表和數據
create database if not exists test;
create table if not exists test.t\_name(
name string
);
insert into
test.t\_name
values
('test1')
, ('test2')
, ('test3')
;
編寫測試代碼
爲了方便調試 Spark SQL 源碼,我把 SQL 語句寫在了 scala 代碼中。同時,在程序執行的末尾添加了一個阻塞標準輸入。這樣我們就可以去查看下 Spark 的 WebUI 了。
def main(args: Array\[String\]): Unit = {
val conf = new SparkConf
conf.set("spark.hive.enable", "true")
conf.set("spark.sql.hive.metastore.version","2.3")
conf.set("spark.sql.hive.metastore.jars","path")
// 顯示所有的執行計劃
conf.set("spark.sql.ui.explainMode", "extended")
val spark = SparkSession
.builder()
.config(conf)
.master("local\[1\]")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
// 支持方式
val sql =
"""
| select
| \*
| from
| test.t\_name
|""".stripMargin
val df = spark.sql(sql)
df.show()
System.in.read()
}
執行計劃
查看解析執行計劃
\== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Project \[cast(name#0 as string) AS name#3\]
+- Project \[name#0\]
+- SubqueryAlias spark\_catalog.test.t\_name
+- HiveTableRelation \[\`test\`.\`t\_name\`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: \[name#0\], Partition Cols: \[\]\]
可以看到,執行計劃樹包含以下幾個部分內容:
-
讀取 Hive 的關係型表(test.t_name),使用的是 Hive SerDe2 讀取,包含了一個 name 列,沒有分區
-
執行計劃中使用的表的別名爲:spark_catalog.test.t_name
-
查詢 name#0 這一列的數據
-
將 name#0 這一列轉換爲 String 類型,並取別名爲 name#3
-
最後添加一個 limit 爲 21 的限制(這個是 Spark 默認給我們添加上的)
查看分析後的執行計劃
\== Analyzed Logical Plan ==
name: string
GlobalLimit 21
+- LocalLimit 21
+- Project \[cast(name#0 as string) AS name#3\]
+- Project \[name#0\]
+- SubqueryAlias spark\_catalog.test.t\_name
+- HiveTableRelation \[\`test\`.\`t\_name\`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: \[name#0\], Partition Cols: \[\]\]
我們看到,分析後的執行計劃只添加了 name 的類型。解析執行計劃僅僅是解析 SQL 爲語法樹,在解析執行計劃階段,Spark SQL 是不知道列的類型的。
優化後的執行計劃
\== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- HiveTableRelation \[\`test\`.\`t\_name\`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: \[name#0\], Partition Cols: \[\]\]
我們發現,優化後的執行計劃要比分析後的執行計劃簡單很多。不會進行類型轉換,也無需執行投影查詢,保留了 Spark SQL 默認的 LIMIT 限制。
物理執行計劃
\== Physical Plan ==
CollectLimit 21
+- Scan hive test.t\_name \[name#0\], HiveTableRelation \[\`test\`.\`t\_name\`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: \[name#0\], Partition Cols: \[\]\]
物理執行計劃就是真正能夠在 Spark 集羣上運行的。
image-20210423170530691
查看 Spark 執行日誌
日誌是非常重要了,這也是我們日常開發、排錯必不可少的。此處,我們通過 INFO 級別的日誌,來更細一點的粒度地來看看 Spark SQL 的執行過程。爲了方便分析,我省略掉一些不是特別重要的日誌信息。
不要指望,通過看日誌瞭解 Spark 的所有執行機制,我們這一步就是要看看 Spark 執行過程有哪些組件是比較重要的。後續有助於我們分析源代碼。
JAVA 執行命令
C:\\opt\\Java\\jdk1.8.0\_181\\bin\\java.exe "-javaagent:C:\\Program Files\\JetBrains\\IntelliJ IDEA Community Edition 2020.3.2\\lib\\idea\_rt.jar=57446:C:\\Program Files\\JetBrains\\IntelliJ IDEA Community Edition 2020.3.2\\bin" -Dfile.encoding=UTF-8 -classpath C:\\Users\\China\\AppData\\Local\\Temp\\classpath918267796.jar cn.com.hnzycfc.SparkSQL\_ResourceTest01
在 IDEA 中,調試命令非常簡潔。除了 IDEA 爲了方便調試添加了 JVM 代理之外,有一個比較重要的參數,-classpath C:\Users\China\AppData\Local\Temp\classpath918267796.jar
,要運行 Spark,我們可以確定的是,JVM 一定要加載 Spark 的核心 JAR 包。而這些 JAR 包如果我們沒有配置本地 JVM 的 CLASSPATH,一定會通過 JAVA 命令傳遞給 JVM,否則,一定會出現 CLASS NOT FOUND 錯誤。
我們打開這個 classpath918267796.jar 包,一看便知。這個 JAR 包中只有一個文件:
image-20210423171223760
這個文件是 JAR 包的元數據描述文件。很明顯可以看到有一個 Class-Path,這個是 JDK 的標準加載 CLASSPATH 方式。(如果大家不熟悉 CLASSPATH,可以看一下我之前寫的這篇文章:https://mp.weixin.qq.com/s/qcQKPCdRFXEqun5B3augyg)
Class-Path: file:/E:/05.git\_projectspark\_sql\_test/target/classes/ file:/C:/opt/scala/lib/scala-library.jar file:/C:/opt/scala/lib/scala-reflect.jar file:/C:/opt/apache-maven-3.6.1/repository/org/apache/spark/spark-core\_2.12/3.1.1/spark-core\_2.12-3.1.1.jar file:/C:/opt/apache-maven3.6.1/repository/com/thoughtworks/paranamer/paranamer/2.8/paranamer-2.8.jar file...
裏面其實包含了一眼望不到邊的 JAR 包。其中就包含了 Spark、Hive 相關的依賴包。
關鍵對象
StaticLoggerBinder,負責構建 LogFactory。
日誌配置加載
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in \[jar:file:/C:/opt/apache-maven-3.6.1/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class\]
SLF4J: Found binding in \[jar:file:/C:/opt/apache-maven-3.6.1/repository/org/slf4j/slf4j-simple/1.7.25/slf4j-simple-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class\]
SLF4J: See http://www.slf4j.org/codes.html#multiple\_bindings for an explanation.
SLF4J: Actual binding is of type \[org.slf4j.impl.Log4jLoggerFactory\]
log4j:WARN No appenders could be found for logger (org.apache.hadoop.hive.conf.HiveConf).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
這裏,我們並沒有配置 log4j.property,所以,Spark 使用了它自己 JAR 包中的 Log4j 配置文件。這個配置文件我們可以在spark-core_2.12-3.1.1.jar
中找到。
image-20210423172125770
提交 Spark 應用程序
21/04/23 16:54:51 INFO SparkContext: Running Spark version 3.1.1
21/04/23 16:54:51 INFO ResourceUtils: ==============================================================
# 沒有爲spark.driver自定義配置
21/04/23 16:54:51 INFO ResourceUtils: No custom resources configured for spark.driver.
21/04/23 16:54:51 INFO ResourceUtils: ==============================================================
21/04/23 16:54:51 INFO SparkContext: Submitted application: b72a357f-bba5-47c2-871a-1ae5db3e45a2
Spark 應用程序是需要提交到集羣中運行的。當前我們在 IDEA 中運行,那麼 Spark 會使用 JVM 線程模擬一個集羣來運行應用程序。
關鍵對象
ResourceUtils,這是負責加載配置 spark.driver 的類
加載默認 executor 配置
21/04/23 16:54:51 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/04/23 16:54:51 INFO ResourceProfile: Limiting resource is cpu
21/04/23 16:54:51 INFO ResourceProfileManager: Added ResourceProfile id: 0
這段日誌描述的是 executor 的資源。因爲我們當前沒有指定 executor 配置,所以使用的是默認的 executor 配置,executor 使用 1 個 core、1gb 內存、沒有使用堆外內存,每個 task 使用 1 個 vcore,也就是一個 executor 執行一個任務。
注意:咱們留意一下,這個 ResourceProfile 的 id 爲 0。
關鍵對象
ResourceProfile,這是負責配置描述 executor 配置的。
權限驗證
21/04/23 16:54:52 INFO SecurityManager: Changing view acls to: China,hdfs
21/04/23 16:54:52 INFO SecurityManager: Changing modify acls to: China,hdfs
21/04/23 16:54:52 INFO SecurityManager: Changing view acls groups to:
21/04/23 16:54:52 INFO SecurityManager: Changing modify acls groups to:
21/04/23 16:54:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(China, hdfs); groups with view permissions: Set(); users with modify permissions: Set(China, hdfs); groups with modify permissions: Set()
當前我們使用的是所屬組是 hdfs。
image-20210423173000840
因爲我們配置了 HADOOP_USER_NAME 環境變量。當前,Spark Kubernetes Security 是沒有開啓的,認證是禁用的。
image-20210423173331281
關鍵對象
SecurityManager,這是負責 Kubernetes 身份認證的,注意哦,不是權限認證。
啓動 Spark 重要服務
21/04/23 16:54:55 INFO SparkEnv: Registering MapOutputTracker
21/04/23 16:54:55 INFO SparkEnv: Registering BlockManagerMaster
21/04/23 16:54:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/04/23 16:54:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/04/23 16:54:55 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/04/23 16:54:55 INFO DiskBlockManager: Created local directory at C:\\Users\\China\\AppData\\Local\\Temp\\blockmgr-89e66659-c81c-4922-85b0-dbcb38570c81
21/04/23 16:54:56 INFO MemoryStore: MemoryStore started with capacity 4.1 GiB
21/04/23 16:54:56 INFO SparkEnv: Registering OutputCommitCoordinator
21/04/23 16:54:56 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/04/23 16:54:56 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://admin:4040
21/04/23 16:54:56 INFO Executor: Starting executor ID driver on host admin
21/04/23 16:54:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57484.
21/04/23 16:54:56 INFO NettyBlockTransferService: Server created on admin:57484
可以看到,註冊了 MapOutputTracker、BlockManagerMaster、BlockManagerMasterEndpoint、OutputCommitCoordinator 這些組件,然後啓動了 Spark UI、以及啓動了 Netty 數據傳輸服務。
關鍵對象
SparkEnv,這是負責註冊 Spark 運行環境所需要的組件。
SparkUI,這是負責運行 Spark WebUI 的組件。
NettyBlockTransferService,這是負責使用 Netty 進行數據塊傳輸的組件。
塊管理器
21/04/23 16:54:56 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/04/23 16:54:56 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, admin, 57484, None)
21/04/23 16:54:56 INFO BlockManagerMasterEndpoint: Registering block manager admin:57484 with 4.1 GiB RAM, BlockManagerId(driver, admin, 57484, None)
21/04/23 16:54:56 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, admin, 57484, None)
21/04/23 16:54:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, admin, 57484, None)
BlockManager 翻譯過來就是塊管理器,Spark 有自己的內存管理,這個就是 Spark 用於管理存儲塊的。當前 BlockManager 使用的是隨機塊副本策略。並且註冊了 Block Manager 在某個端口。我們看到 BlockManager 使用的就是前面的 NettyBlockTransfer 服務的 57484 端口。而且,BlockManager 能夠使用的內存是 4.1GB 的內存。
image-20210423175446285
在 Web UI 中,我們可以看到。這個 4.1GB 顯示的是 Storage Memory,也就是內存中能夠存儲的數據量。
加載 Spark SQL 元數據
21/04/23 16:54:56 INFO SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('hdfs://node1:8020/user/hive/warehouse').
21/04/23 16:54:56 INFO SharedState: Warehouse path is 'hdfs://node1:8020/user/hive/warehouse'.
21/04/23 16:54:57 INFO HiveUtils: Initializing HiveMetastoreConnection version 2.3 using path:
21/04/23 16:54:59 INFO deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
21/04/23 16:55:00 INFO SessionState: Created HDFS directory: /tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4
21/04/23 16:55:00 INFO SessionState: Created local directory: C:/Users/China/AppData/Local/Temp/China/b750e106-5b2f-4c67-8217-8cba7f95aaf4
21/04/23 16:55:00 INFO SessionState: Created HDFS directory: /tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4/\_tmp\_space.db
21/04/23 16:55:00 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.7) is hdfs://node1:8020/user/hive/warehouse
21/04/23 16:55:00 INFO metastore: Trying to connect to metastore with URI thrift://node1:9083
21/04/23 16:55:00 INFO metastore: Opened a connection to metastore, current connections: 1
21/04/23 16:55:00 INFO JniBasedUnixGroupsMapping: Error getting groups for hdfs: Unknown error.
21/04/23 16:55:00 INFO metastore: Connected to metastore.
21/04/23 16:55:02 INFO SQLStdHiveAccessController: Created SQLStdHiveAccessController for session context : HiveAuthzSessionContext \[sessionString=b750e106-5b2f-4c67-8217-8cba7f95aaf4, clientType=HIVECLI\]
21/04/23 16:55:02 WARN SessionState: METASTORE\_FILTER\_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
21/04/23 16:55:02 INFO metastore: Mestastore configuration hive.metastore.filter.hook changed from org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl to org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook
21/04/23 16:55:02 INFO metastore: Closed a connection to metastore, current connections: 0
21/04/23 16:55:02 INFO metastore: Trying to connect to metastore with URI thrift://node1:9083
21/04/23 16:55:02 INFO metastore: Opened a connection to metastore, current connections: 1
21/04/23 16:55:02 INFO metastore: Connected to metastore.
任何的 SQL 引擎都是需要加載元數據的,不然,連執行計劃都生成不了。上述例子總的來說,做了兩件事情。
-
加載元數據
-
創建會話連接 Hive MetaStore
首先,Spark 檢測到我們沒有設置 spark.sql.warehouse.dir,然後就開始找我們在 hite-site.xml 中配置的 hive.metastore.warehouse.dir。
image-20210423180055371
然後,SparkSession 在 HDFS 臨時位置創建了 / tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4/_tmp_space.db 目錄。
\[hadoop@node2 root\]$ hdfs dfs -ls -R /tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4/
drwx------ - hdfs hadoop 0 2021-04-23 16:55 /tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4/\_tmp\_space.db
最後,Spark 開始通過 thrift RPC 去連接 Hive 的 MetaStore Server。(thrift://node1:9083)
關鍵對象
SessionState,負責存儲所有 SparkSession 的狀態。
發送廣播提交 JOB
21/04/23 16:55:02 INFO MemoryStore: Block broadcast\_0 stored as values in memory (estimated size 375.5 KiB, free 4.1 GiB)
21/04/23 16:55:02 INFO MemoryStore: Block broadcast\_0\_piece0 stored as bytes in memory (estimated size 34.4 KiB, free 4.1 GiB)
21/04/23 16:55:02 INFO BlockManagerInfo: Added broadcast\_0\_piece0 in memory on admin:57484 (size: 34.4 KiB, free: 4.1 GiB)
21/04/23 16:55:02 INFO SparkContext: Created broadcast 0 from
21/04/23 16:55:02 INFO FileInputFormat: Total input files to process : 1
21/04/23 16:55:02 INFO SparkContext: Starting job: show at SparkSQL\_ResourceTest01.scala:35
首先,MemoryStore 是 BlockManager 的組件,負責操作在內存中的數據存儲。
image-20210423181914879
當前有一個 broadcast_0 塊添加到內存中了,評估的大小爲 375KB。
其次,可以看到 Sprk 還是使用 Hadoop 的 FileInputFormat 來讀取 HDFS 文件,當前讀取到的是一個文件。
最後,SparkContext 提交了一個作業。注意:一個 Spark 應用可以提交很多的 JOB。
核心對象
MemoryStore,負責 BlockManager 中的內存數據存儲。
SparkContext,負責創建廣播、並提交作業。
DAG 調度
21/04/23 16:55:02 INFO DAGScheduler: Got job 0 (show at SparkSQL\_ResourceTest01.scala:35) with 1 output partitions
21/04/23 16:55:02 INFO DAGScheduler: Final stage: ResultStage 0 (show at SparkSQL\_ResourceTest01.scala:35)
21/04/23 16:55:02 INFO DAGScheduler: Parents of final stage: List()
21/04/23 16:55:03 INFO DAGScheduler: Missing parents: List()
21/04/23 16:55:03 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD\[4\] at show at SparkSQL\_ResourceTest01.scala:35), which has no missing parents
21/04/23 16:55:03 INFO MemoryStore: Block broadcast\_1 stored as values in memory (estimated size 9.2 KiB, free 4.1 GiB)
21/04/23 16:55:03 INFO MemoryStore: Block broadcast\_1\_piece0 stored as bytes in memory (estimated size 4.8 KiB, free 4.1 GiB)
21/04/23 16:55:03 INFO BlockManagerInfo: Added broadcast\_1\_piece0 in memory on admin:57484 (size: 4.8 KiB, free: 4.1 GiB)
21/04/23 16:55:03 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1383
21/04/23 16:55:03 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD\[4\] at show at SparkSQL\_ResourceTest01.scala:35) (first 15 tasks are for partitions Vector(0))
首先,DAGScheduler 接收到 SparkContext 提交的作業,然後找到了最終的 Stage(名稱爲:show at SparkSQL_ResourceTest01.scala:35),再提交 Stage。因爲咱們的 SQL 非常簡單,所以只有一個 Stage,沒有父 Stage。
其次,BlockManager 的 MemoryStore 創建了第二個廣播,並放入到內存中。
最後,DAGScheduler 從 DAG 中拿到的 Stage 提交任務。
關鍵對象
DAGScheduler,負責調度 Stage,並提交 TaskSet。
Task 調度
21/04/23 16:55:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
21/04/23 16:55:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (admin, executor driver, partition 0, ANY, 4530 bytes) taskResourceAssignments Map()
21/04/23 16:55:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/04/23 16:55:03 INFO HadoopRDD: Input split: hdfs://node1:8020/user/hive/warehouse/test.db/t\_name/000000\_0:0+18
首先,TaskScheduler 添加了一個任務集(0.0),並且指定使用的是 Resource Profile 0。這不是前面 Driver 中加載到的 ResourceProfile 麼~
其實,使用啓動 task 0.0 任務。當前我們運行的是本地模式,所以是在 driver 上直接運行的。當前任務讀取的是 HDFS 上的文件——hdfs://node1:8020/user/hive/warehouse/test.db/t_name/000000_0
\[hadoop@node2 root\]$ hdfs dfs -tail hdfs://node1:8020/user/hive/warehouse/test.db/t\_name/000000\_0
2021-04-23 18:35:52,783 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
test1
test2
test3
關鍵對象
TaskSchedulerImpl,負責調度任務。
TaskSetManager,負責啓動任務。
Executor,負責啓動任務。
HadoopRDD,負責讀取數據。
生成代碼、完成任務
21/04/23 16:55:03 INFO CodeGenerator: Code generated in 174.966701 ms
21/04/23 16:55:05 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1402 bytes result sent to driver
21/04/23 16:55:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2375 ms on admin (executor driver) (1/1)
21/04/23 16:55:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
21/04/23 16:55:05 INFO DAGScheduler: ResultStage 0 (show at SparkSQL\_ResourceTest01.scala:35) finished in 2.482 s
21/04/23 16:55:05 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/04/23 16:55:05 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/04/23 16:55:05 INFO DAGScheduler: Job 0 finished: show at SparkSQL\_ResourceTest01.scala:35, took 2.533098 s
21/04/23 16:55:05 INFO CodeGenerator: Code generated in 11.8474 ms
我們看到有一個 CodeGenerator 組件,看名字就知道是生成代碼的。也就是說,我們的 SQL 會由 CodeGenerator 生成代碼,然後由 Executor 執行。TaskSetManagerImpl 檢測到任務集執行完後,會從池中移除(注意,TaskScheduler 有自己的任務調度器,默認爲 FIFO,也可以配置爲公平調度)。當所有 Stage 中的任務執行完後,TaskScheduler 會關閉所有的任務線程。然後結束作業。
關鍵對象
- CodeGenerator:代碼生成器,負責生成在 Task 中運行的代碼。
可以看到,INFO 輸出把整個 Spark 應用的執行流程關鍵節點輸出到日誌了。一目瞭然,沒有特別細節的組件執行過程。大家將來輸出日誌也可以參考 Spark 這種輸出方式,有利於排錯。
計劃執行流程源碼解析
上面,通過查看日誌,我們大概看到了 Spark 程序的執行過程。但其實,關於 Spark SQL 執行計劃的部分,我們並沒有看到。接下來,我們要根據上面分析的日誌執行過程,來分析 Spark SQL 的執行過程。
設置斷點並啓動調試
設置 Spark 的日誌級別爲 DEBUG。
spark.sparkContext.setLogLevel("DEBUG")
在 spark.sql 執行 SQL 語句的位置,打上斷點。並開始啓動調試。單步進入到 spark.sql 方法類。
image-20210423190058769
SparkSession.sql API
def sql(sqlText: String): DataFrame = withActive {
// 用於跟蹤查詢計劃的執行,例如:查詢計劃要執行計劃要執行哪些Rule、跟蹤記錄各個階段執行的時間等。
val tracker = new QueryPlanningTracker
// 調用measurePhase統計解析執行計劃的時間。
// 這是一個高階函數:def measurePhase\[T\](phase: String)(f: => T):
// 執行一個操作,並計算其執行的時間。
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
// SessionState的SQL Parser負責解析SQL,並生成解析的執行計劃
// 接口定義爲:def parsePlan(sqlText: String): LogicalPlan
sessionState.sqlParser.parsePlan(sqlText)
}
// 生成物理執行計劃並生成DataSet(就是DataFrame)
Dataset.ofRows(self, plan, tracker)
}
sql 方法會調用 Spark Session 中的 Session State 的 SQL 解析器來解析 SQL 語句爲邏輯執行計劃(LogicalPlan)。
豐富的 SessionState 組件
SessionState 的組件非常豐富。大家前面看到了它有個 SqlParser 成員,負責解析 SQL 文本爲邏輯執行計劃。除此之外,它還包含了其他的重要組件。
image-20210423191720506
-
conf:Spark SQL 配置
-
functionRegistry:函數註冊
-
udfRegistration:UDF 註冊
-
catalogBuilder:構建 Catalog
-
sqlParser:SQL 解析器
-
analyzerBuilder:分析後的執行計劃構建器
-
optimizerBuilder:優化後的執行計劃構建器
-
streamingQueryManagerBuilder:Streaming 查詢管理構建器
-
listenerManager:監聽器管理器
-
resourceLoaderBuilder:資源加載構建器
-
createQueryExecution:create 語句執行器
-
columnarRules:自定義規則實現,在執行計劃中實現運算符實現
-
queryStagePrepRules:查詢階段預處理規則
-
analyzer:分析器
-
optimizer:優化器
-
resourceLoader:資源加載器
-
streamingQueryManager:流查詢管理器
-
catalogManager:Catalog 管理器
大概掃一眼,重要的組件都組合在這裏了。
LogicPlan
LogicPlan,邏輯執行計劃。它是一個抽象類。
image-20210423234433003
這是 LogicPlan 的繼承樹,其實,它就是一顆樹。我們看到父類 TreeNode 是一個非常牛逼的 TreeNode。裏面包含了 Children TreeNode,還實現了很多的函數式算子,也就是,我們可以在一棵樹節點上執行任意的函數式計算。
image-20210423234632343
LogicPlan 有一個非常重要的方法:resolve。
def resolve(
nameParts: Seq\[String\],
resolver: Resolver): Option\[NamedExpression\] =
outputAttributes.resolve(nameParts, resolver)
它表示將所有的帶有名稱的部分解析爲命名錶達式。我們來看下當前我們的邏輯執行計劃樹是什麼樣子的。當前的執行計劃的根節點是一個 Project 類型。來看下它的繼承樹。
image-20210423235809607
由此,我們可以得出結論,在 SQL 中的任意一個操作,都可以找到一種類型的 LogicPlan,其實就是 TreeNode。包括 DDL、包括 explain、包括 LoadData 等等所有。所以,想看下 Spark 支持什麼操作,上面 LogicPlan 的子類就知道了。
image-20210423235417786
這裏有三個重要的變量:
-
resolved=false,表示邏輯執行計劃未解析
-
projectList,當前節點是一個 Project,也就是投影查詢。裏面是一個 *,表示查詢所有字段
-
allChildren,表示是所有的子節點。
ProjectLogicPlan
我們來看下 ProjectLogicPlan 的源碼。
case class Project(projectList: Seq\[NamedExpression\], child: LogicalPlan)
extends OrderPreservingUnaryNode {
// 執行Project操作的輸出描述
override def output: Seq\[Attribute\] = projectList.map(\_.toAttribute)
// 子節點的最大行限制
override def maxRows: Option\[Long\] = child.maxRows
// 是否解析
override lazy val resolved: Boolean = {
// 檢測是否存在聚合表達式、生成操作、開窗操作
val hasSpecialExpressions = projectList.exists ( \_.collect {
case agg: AggregateExpression => agg
case generator: Generator => generator
case window: WindowExpression => window
}.nonEmpty
)
// Project中表達式存在未解析、所有子節點有未解析的表達式、且不包含特殊表達式,都認爲是未解析的
!expressions.exists(!\_.resolved) && childrenResolved && !hasSpecialExpressions
}
// 獲取所有有效的約束條件
override lazy val validConstraints: ExpressionSet =
getAllValidConstraints(projectList)
}
UnresolvedRelation
該節點表示還沒有從 Catalog 解析的關係。此時,還僅僅是將 SQL 解析成爲了一個語法樹。我加上了一些註釋。
case class UnresolvedRelation(
multipartIdentifier: Seq\[String\],
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
override val isStreaming: Boolean = false)
extends LeafNode with NamedRelation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.\_
/\*\* Returns a \`.\` separated name for this relation. \*/
def tableName: String = multipartIdentifier.quoted
// 表名
override def name: String = tableName
// 要輸出的屬性
override def output: Seq\[Attribute\] = Nil
// 是否被解析
override lazy val resolved = false
}
Dataset.ofRows API
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
: DataFrame = sparkSession.withActive {
// QueryExecution是爲Spark執行關係型查詢的主要工作流,它包含了所有的執行計劃。
val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
// 解析邏輯執行計劃
qe.assertAnalyzed()
// 創建DataSet,獲取解析後的邏輯執行計劃對應的schema,這個schema其實就是DataSet要使用的schema。
new Dataset\[Row\](qe, RowEncoder(qe.analyzed.schema))
}
我們看到了,Dataset.ofRows 實際上會創建一個查詢執行器,然後根據物理執行計劃創建 DataSet。
我們來調試一下 QueryExecution 就知道了。
image-20210424003057138
看到了嗎?
-
logical 是未解析的邏輯執行計劃
-
analyzed 是解析後的邏輯執行計劃
-
optimizedPlan 是優化後的邏輯執行計劃
-
executePlan 是物理執行計劃
-
sparkPlan 是 Spark 執行計劃
到此所有的計劃都已經生成好了,DataSet 也構建好了,就等着調度執行了。到此,所有的操作都是在 Driver 端執行的。
image-20210424004430001
邏輯執行計劃生成器源碼解析
核心實現類:SqlBaseLexer、SqlBaseParser
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
// 此處設置斷點
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}
我們接下來,需要單步進入到 parsePlan 中,看看 Spark 是如何將 SQL 語句轉換爲邏輯執行計劃的。來看下 ParserDriver.scala 中的 parse 方法,該方法實現 SQL 語句解析。ParserDriver 是 Spark-catalyst 的一個類。Spark 使用的是 ANTLR 庫進行 SQL 語法的解析。
image-20210424005415058
protected def parse\[T\](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
// 創建詞法分析器
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
val tokenStream = new CommonTokenStream(lexer)
// 創建此法解析器
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
parser.legacy\_setops\_precedence\_enbled = conf.setOpsPrecedenceEnforced
parser.legacy\_exponent\_literal\_as\_decimal\_enabled = conf.exponentLiteralAsDecimalEnabled
// 是否開啓標準SQL
parser.SQL\_standard\_keyword\_behavior = conf.ansiEnabled
try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
// 執行語法解析
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()
// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}
}
邏輯執行計劃解析器源碼解析
Analyzer 組件
核心實現類:Analyzer
Analyzer 是負責解析邏輯執行計劃的。來看一下它的類定義。
class Analyzer(override val catalogManager: CatalogManager)
extends RuleExecutor\[LogicalPlan\] with CheckAnalysis with LookupCatalog with SQLConfHelper
Analyzer 其實就是一個 RuleExecutor,專門針對每個 LogicPlan 執行的一個個的 Rule,進行解析。後續馬上我們就會看到一個個的 Rule 了。不同類型的語法樹節點,都會有對應的 Rule 來進行解析。
Analyzer 需要使用 Catalog 管理器進行初始化。我們看到,當前我們加載的就是 Hive 的 MetaStore Catalog。
image-20210424011933985
解析邏輯執行計劃流程
我們來看下它的執行流程。
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
// 因爲要解析整棵語法樹,需要遞歸檢查執行計劃是否已經完成解析。其實就是遞歸的出口
if (plan.analyzed) return plan
AnalysisHelper.markInAnalyzer {
// 執行解析,此處是調用父類RuleExecutor對應的executeAndTrack方法
val analyzed = executeAndTrack(plan, tracker)
try {
checkAnalysis(analyzed)
analyzed
} catch {
case e: AnalysisException =>
val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}
}
}
解析邏輯執行計劃的代碼來了。
// 執行解析
def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = {
QueryPlanningTracker.withTracker(tracker) {
execute(plan)
}
}
/\*\*
\* 串行執行子類定義的一些列規則。這些規則由指定的執行策略執行。
\*/
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
val planChangeLogger = new PlanChangeLogger\[TreeType\]()
val tracker: Option\[QueryPlanningTracker\] = QueryPlanningTracker.get
val beforeMetrics = RuleExecutor.getCurrentMetrics()
// 對初始輸入執行初始化檢查,確保邏輯執行計劃是完整的
// 每個邏輯執行計劃都有自己的唯一ID,例如:name#0、cast(name#0 as string) AS name#3。
// 此處檢查需要讓邏輯執行計劃具備有唯一的ID,且輸出表達式ID是否也是唯一的
// 一旦檢測出來有重複的,就會拋出解析失敗的異常
if (!isPlanIntegral(plan)) {
val message = "The structural integrity of the input plan is broken in " +
s"${this.getClass.getName.stripSuffix("$")}."
throw new TreeNodeException(plan, message, null)
}
// 挨個挨個執行Analyzer組件中定義的每一批解析策略。
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
// 對邏輯執行計劃應用規則,應用規則成功後的邏輯執行計劃就是解析後的邏輯執行計劃
val result = rule(plan)
val runTime = System.nanoTime() - startTime
val effective = !result.fastEquals(plan)
if (effective) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
planChangeLogger.logRule(rule.ruleName, plan, result)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)
// Record timing information using QueryPlanningTracker
tracker.foreach(\_.recordRuleInvocation(rule.ruleName, runTime, effective))
// Run the structural integrity checker against the plan after each rule.
if (effective && !isPlanIntegral(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
"the structural integrity of the plan is broken."
throw new TreeNodeException(result, message, null)
}
result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
"."
} else {
s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
}
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
s"$endingMsg"
if (Utils.isTesting || batch.strategy.errorOnExceed) {
throw new TreeNodeException(curPlan, message, null)
} else {
logWarning(message)
}
}
// Check idempotence for Once batches.
if (batch.strategy == Once &&
Utils.isTesting && !excludedOnceBatches.contains(batch.name)) {
checkBatchIdempotence(batch, curPlan)
}
continue = false
}
if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}
planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
}
planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)
curPlan
}
Batch 解析邏輯執行計劃組
scala 中定義了一堆的 Batch,這個 Batch 可以翻譯過來稱爲邏輯計劃執行策略組更容易理解一些。裏面包含了針對每一類別 LogicalPlan 的處理類。每個 Batch 需要指定其名稱、執行策略、以及規則。
protected case class Batch(name: String, strategy: Strategy, rules: Rule\[TreeType\]\*)
abstract class Strategy {
/\*\* 最大執行次數. \*/
def maxIterations: Int
/\*\* 超過最大執行次數是否報錯. \*/
def errorOnExceed: Boolean = false
/\*\* 配置最大執行次數的配置key值 \*/
def maxIterationsSetting: String = null
}
/\*\* 有且僅執行一次. \*/
case object Once extends Strategy { val maxIterations = 1 }
/\*\*
\* 指定固定的執行次數.
\*/
case class FixedPoint(
override val maxIterations: Int,
override val errorOnExceed: Boolean = false,
override val maxIterationsSetting: String = null) extends Strategy
所有的 Spark SQL 的解析執行計劃都在這裏了。
override def batches: Seq\[Batch\] = Seq(
// 替換策略組
Batch("Substitution", fixedPoint,
// This rule optimizes \`UpdateFields\` expression chains so looks more like optimization rule.
// However, when manipulating deeply nested schema, \`UpdateFields\` expression tree could be
// very complex and make analysis impossible. Thus we need to optimize \`UpdateFields\` early
// at the beginning of analysis.
OptimizeUpdateFields,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
SubstituteUnresolvedOrdinals),
// 禁用Hint策略組
Batch("Disable Hints", Once,
new ResolveHints.DisableHints),
// Hint策略組
Batch("Hints", fixedPoint,
ResolveHints.ResolveJoinStrategyHints,
ResolveHints.ResolveCoalesceHints),
// 簡單檢查策略組
Batch("Simple Sanity Check", Once,
LookupFunctions),
// 關係策略組
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveNamespace(catalogManager) ::
new ResolveCatalogs(catalogManager) ::
ResolveUserSpecifiedColumns ::
ResolveInsertInto ::
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
AddMetadataColumns ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ResolveOutputRelation ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables ::
ResolveHigherOrderFunctions(v1SessionCatalog) ::
ResolveLambdaVariables ::
ResolveTimeZone ::
ResolveRandomSeed ::
ResolveBinaryArithmetic ::
ResolveUnion ::
TypeCoercion.typeCoercionRules ++
extendedResolutionRules : \_\*),
// 應用字符填充策略組
Batch("Apply Char Padding", Once,
ApplyCharTypePadding),
// 可以執行的鉤子策略組
Batch("Post-Hoc Resolution", Once,
Seq(ResolveNoopDropTable) ++
postHocResolutionRules: \_\*),
// Alter Table策略組
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
// 移除未解析的Hint策略組
Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints),
// 非確定策略組
Batch("Nondeterministic", Once,
PullOutNondeterministic),
// UDF策略組
Batch("UDF", Once,
HandleNullInputsForUDF,
ResolveEncodersInUDF),
// 更新爲null策略組
Batch("UpdateNullability", Once,
UpdateAttributeNullability),
// 子查詢策略組
Batch("Subquery", Once,
UpdateOuterReferences),
// 清理策略組
Batch("Cleanup", fixedPoint,
CleanupAliases)
)
解析邏輯執行計劃規則
每一個 Batch 中都對應了一組規則。Spark SQL 的 Analyzer 組件,就是使用這些規則來解析邏輯執行計劃的。
image-20210424013330014
而所有的規則都是從 Rule 繼承。
image-20210424013458780
Rule 是一個抽象類,它的定義非常簡單,只有一個規則名稱、以及 apply 方法,也就是對邏輯執行計劃應用規則。
abstract class Rule\[TreeType <: TreeNode\[\_\]\] extends SQLConfHelper with Logging {
/\*\* Name for this rule, automatically inferred based on class name. \*/
val ruleName: String = {
val className = getClass.getName
if (className endsWith "$") className.dropRight(1) else className
}
def apply(plan: TreeType): TreeType
}
應用規則解析邏輯執行計劃
既然規則要解析邏輯執行計劃,那麼規則勢必就要改變邏輯執行計劃。我們來看看規則到底是如何應用到邏輯執行計劃上的。此處,我找了一個,ResolveUserSpecifiedColumns 規則來分析其源碼。
object ResolveUserSpecifiedColumns extends Rule\[LogicalPlan\] {
// 調用邏輯執行計劃的resolveOperators方法,該方法會克隆一個邏輯執行計劃樹,並應用規則。
// 這個resolveOperators是一個遞歸方法,會將整棵邏輯執行計劃樹都執行規則。遞歸方式是層級遍歷。
// 該函數調用中就是規則的定義,描述了應該對邏輯執行計劃如何應用規則
// 我把代碼貼在這兒,大家做參考。
// val afterRule = CurrentOrigin.withOrigin(origin) {
// rule.applyOrElse(self, identity\[LogicalPlan\])
// }
// // Check if unchanged and then possibly return old copy to avoid gc churn.
// if (self fastEquals afterRule) {
// mapChildren(\_.resolveOperatorsDown(rule))
// } else {
// afterRule.mapChildren(\_.resolveOperatorsDown(rule))
// }
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
// 只匹配InsertInto語句,而且insert語句中的表已經被解析、且insert對應的子查詢也已經被解析、且用戶指定列是非空才執行
case i: InsertIntoStatement if i.table.resolved && i.query.resolved &&
i.userSpecifiedCols.nonEmpty =>
// 解析用戶指定列
val resolved = resolveUserSpecifiedColumns(i)
// 獲取到INSERT INTO子查詢中的所有列
val projection = addColumnListOnQuery(i.table.output, resolved, i.query)
// 將子查詢解析的列拷貝到userSpecifiedCols
i.copy(userSpecifiedCols = Nil, query = projection)
}
// 解析用戶指定列
private def resolveUserSpecifiedColumns(i: InsertIntoStatement): Seq\[NamedExpression\] = {
// 檢查列名是否重複
SchemaUtils.checkColumnNameDuplication(
i.userSpecifiedCols, "in the column list", resolver)
// 按照INSERT INTO中用戶的指定列,解析每一個列
i.userSpecifiedCols.map { col =>
i.table.resolve(Seq(col), resolver)
.getOrElse(i.table.failAnalysis(s"Cannot resolve column name $col"))
}
}
每個邏輯執行計劃都有一個 copy 方法,用於將解析後的表達式拷貝到執行計劃中。
一旦所有的規則應用完畢,邏輯執行計劃樹就解析完了。
重要提示!規則是按照層級底層應用的。
邏輯執行計劃優化器源碼解析
找到 Optimizer 類,該類實現了對執行計劃的優化。和 Analyzer 類似,它也是 RuleExecutor 的子類,也就是說,它也是按照規則來進行優化的,所以肯定也是按照 Batch 和 Rule 的方式來對執行計劃進行優化。Spark 3.x 的優化策略組也是非常豐富的。
val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateResolvedHint,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
RewriteNonCorrelatedExists,
ComputeCurrentTime,
GetCurrentDatabaseAndCatalog(catalogManager),
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
// - Do the first call of CombineUnions before starting the major Optimizer rules,
// since it can reduce the number of iteration and the other rules could add/move
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("OptimizeLimitZero", Once,
OptimizeLimitZero) ::
// Run this once earlier. This might simplify the plan and reduce cost of optimizer.
// For example, a query such as Filter(LocalRelation) would go through all the heavy
// optimizer rules that are triggered when there is a filter
// (e.g. InferFiltersFromConstraints). If we run this batch earlier, the query becomes just
// LocalRelation and does not trigger many rules.
Batch("LocalRelation early", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation,
// PropagateEmptyRelation can change the nullability of an attribute from nullable to
// non-nullable when an empty relation child of a Union is removed
UpdateAttributeNullability) ::
Batch("Pullup Correlated Expressions", Once,
PullupCorrelatedPredicates) ::
// Subquery batch applies the optimizer rules recursively. Therefore, it makes no sense
// to enforce idempotence on it and we change this batch from Once to FixedPoint(1).
Batch("Subquery", FixedPoint(1),
OptimizeSubqueries) ::
Batch("Replace Operators", fixedPoint,
RewriteExceptAll,
RewriteIntersectAll,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithFilter,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
// This batch rewrites plans after the operator optimization and
// before any batches that depend on stats.
Batch("Pre CBO Rules", Once, preCBORules: \_\*) :+
// This batch pushes filters and projections into scan nodes. Before this batch, the logical
// plan may contain nodes that do not report stats. Anything that uses stats must run after
// this batch.
Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: \_\*) :+
// Since join costs in AQP can change between multiple runs, there is no reason that we have an
// idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once.
Batch("Join Reorder", FixedPoint(1),
CostBasedJoinReorder) :+
Batch("Eliminate Sorts", Once,
EliminateSorts) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
// This batch must run after "Decimal Optimizations", as that one may change the
// aggregate distinct column
Batch("Distinct Aggregate Rewrite", Once,
RewriteDistinctAggregates) :+
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters,
ObjectSerializerPruning,
ReassignLambdaVariableID) :+
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation,
// PropagateEmptyRelation can change the nullability of an attribute from nullable to
// non-nullable when an empty relation child of a Union is removed
UpdateAttributeNullability) :+
// The following batch should be executed after batch "Join Reorder" and "LocalRelation".
Batch("Check Cartesian Products", Once,
CheckCartesianProducts) :+
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
ColumnPruning,
CollapseProject,
RemoveNoopOperators) :+
// This batch must be executed after the \`RewriteSubquery\` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) :+
Batch("ReplaceUpdateFieldsExpression", Once, ReplaceUpdateFieldsExpression)
// remove any batches with no rules. this may happen when subclasses do not add optional rules.
batches.filter(\_.rules.nonEmpty)
}
此處,受限於篇幅,就不去進入到這些優化規則分析了。如果大家有興趣,可以執行去單步,它的執行方式和 Analyzer 類似。
物理執行計劃生成源碼解析
物理執行計劃是由 SparkPlanner 來實現的。來看一下它的類定義。
class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods)
extends SparkStrategies with SQLConfHelper
SparkPlanner 會使用一系列的策略來生成一組候選的物理執行計劃。
override def strategies: Seq\[Strategy\] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
new DataSourceV2Strategy(session) ::
FileSourceStrategy ::
DataSourceStrategy ::
SpecialLimits ::
Aggregation ::
Window ::
JoinSelection ::
InMemoryScans ::
BasicOperators :: Nil)
來看一下 SparkPlan 的定義:
abstract class SparkPlan extends QueryPlan\[SparkPlan\] with Logging with Serializable
它也是從 QueryPlan 中繼承的。它也是一顆樹。
image-20210424025222172
大多數的 SparkPlan 節點是以 Exec 結尾,這也表示了這些節點是 Spark 可執行的。SparkPlan 中定義了 doExecute 方法,這是每種 SparkPlan 都需要自己實現的。
protected def doExecute(): RDD\[InternalRow\]
我們來看下 HiveTableScanExec 實現。
protected override def doExecute(): RDD\[InternalRow\] = {
// Using dummyCallSite, as getCallSite can turn out to be expensive with
// multiple partitions.
val rdd = if (!relation.isPartitioned) {
Utils.withDummyCallSite(sqlContext.sparkContext) {
hadoopReader.makeRDDForTable(hiveQlTable)
}
} else {
Utils.withDummyCallSite(sqlContext.sparkContext) {
hadoopReader.makeRDDForPartitionedTable(prunedPartitions)
}
}
val numOutputRows = longMetric("numOutputRows")
// Avoid to serialize MetastoreRelation because schema is lazy. (see SPARK-15649)
val outputSchema = schema
rdd.mapPartitionsWithIndexInternal { (index, iter) =>
// 調用自動生成的代碼
val proj = UnsafeProjection.create(outputSchema)
proj.initialize(index)
iter.map { r =>
numOutputRows += 1
proj(r)
}
}
}
我們終於看到了 RDD。這就是 spark-core,底層最終還是由 RDD 來執行的。雖然 RDD 不推薦直接使用,但其實底層 Spark 還是基於 RDD 實現的,只不過 Spark SQL 做了大量的優化而已。我們看到 HiveScan 操作,先通過 HadoopReader 的 markRDDForTable 方法,創建 RDD,然後執行了 mapPartitionsWithIndexInternal 算子。
源碼解析:Code Generate 代碼生成
在生成了 RDD 後,RDD 算子中執行的代碼會通過 Code Generate 自動生成代碼來執行。以下是 CodeGenerator 的代碼。
abstract class CodeGenerator\[InType <: AnyRef, OutType <: AnyRef\] extends Logging {
protected val genericMutableRowType: String = classOf\[GenericInternalRow\].getName
/\*\*
\* Generates a class for a given input expression. Called when there is not cached code
\* already available.
\*/
protected def create(in: InType): OutType
/\*\*
\* Canonicalizes an input expression. Used to avoid double caching expressions that differ only
\* cosmetically.
\*/
protected def canonicalize(in: InType): InType
/\*\* Binds an input expression to a given input schema \*/
protected def bind(in: InType, inputSchema: Seq\[Attribute\]): InType
/\*\* Generates the requested evaluator binding the given expression(s) to the inputSchema. \*/
def generate(expressions: InType, inputSchema: Seq\[Attribute\]): OutType =
generate(bind(expressions, inputSchema))
/\*\* Generates the requested evaluator given already bound expression(s). \*/
def generate(expressions: InType): OutType = create(canonicalize(expressions))
/\*\*
\* Create a new codegen context for expression evaluator, used to store those
\* expressions that don't support codegen
\*/
def newCodeGenContext(): CodegenContext = {
new CodegenContext
}
}
其中,generate 方法就是直接根據表達式來生成可執行的 Java 代碼的。我們可以來看下 GenerateUnsafeProjection 的實現。
private def create(
expressions: Seq\[Expression\],
subexpressionEliminationEnabled: Boolean): UnsafeProjection = {
val ctx = newCodeGenContext()
val eval = createCode(ctx, expressions, subexpressionEliminationEnabled)
val codeBody =
s"""
|public java.lang.Object generate(Object\[\] references) {
| return new SpecificUnsafeProjection(references);
|}
|
|class SpecificUnsafeProjection extends ${classOf\[UnsafeProjection\].getName} {
|
| private Object\[\] references;
| ${ctx.declareMutableStates()}
|
| public SpecificUnsafeProjection(Object\[\] references) {
| this.references = references;
| ${ctx.initMutableStates()}
| }
|
| public void initialize(int partitionIndex) {
| ${ctx.initPartition()}
| }
|
| // Scala.Function1 need this
| public java.lang.Object apply(java.lang.Object row) {
| return apply((InternalRow) row);
| }
|
| public UnsafeRow apply(InternalRow ${ctx.INPUT\_ROW}) {
| ${eval.code}
| return ${eval.value};
| }
|
| ${ctx.declareAddedFunctions()}
|}
""".stripMargin
val code = CodeFormatter.stripOverlappingComments(
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\\n${CodeFormatter.format(code)}")
val (clazz, \_) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf\[UnsafeProjection\]
}
大家看到了麼?此處有 Java 代碼的模板。Spark 使用的是 codehaus 的 janino 直接生成 Java 字節碼,並由 CLASSLOADER 動態加載。然後由 SparkPlan 中調用執行。所有的表達式直接將 Java 代碼直接生成好,並在一個並行度中執行。
到此,Spark SQL 的整個過程就給大家分析了下。
這其中每個過程都可以用數萬字來描述,所以,後續再挨個組件給大家剖析剖析。我抬頭看了看牆上的掛鐘,已經凌晨 4 點了。我們下期再見。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。