SparkSQL 源碼解析系列

上週,面試了一個候選人。因爲我們是 Spark SQL 的深度用戶,所以,我肯定要好好問問候選人在 Spark SQL 的掌握程度。候選人簡歷號稱是某互聯網大廠公司架構,精通各種源碼。搞得我面試也有點緊張,怕一不小心被人家虐死了。

我先讓他說了下 Spark SQL 的原理。他肯定做好了功課:Spark SQL 原理 = 邏輯執行計劃 + 解析後的邏輯執行計劃 + 優化後的邏輯執行計劃 + 物理執行計劃。臉上逐漸漏出了得意的笑容,於是,我連續發問:

中間他不斷地想把我扯到其他領域,說着模棱兩口的回答。不斷地解釋他們公司的業務是如何如何,平臺是如何如何。但作爲一名常年面試的面試官,怎麼可能讓他帶偏呢。整個過程就是,我挖一個坑,他馬上就跳進去,然後自己再也出不來了。我再拉他出來,然後他自己又跳進去了。其實,說到底,他並不熟悉源碼。然後就沒有然後了...

其實這種面試,每週都在重複。我也是有點疲憊了。我也才慢慢發現,這個行業牛人不少,但混子更多。隨隨便便期望薪資就是 30K、40K。現在有很多公司都要上大數據平臺,給了這些混子更多的空間。

不管如何,持續學習、不斷進步是我們得以生存的重要途徑。所以,剛好這兩天有點時間,跟大家來聊聊 Spark SQL 源碼。因爲篇幅有限,沒辦法展開所有。但大家看完,如果對源碼不熟悉,肯定會有所啓發。

執行計劃

Hive 準備表和數據

create database if not exists test;

create table if not exists test.t\_name(
    name string
);

insert into 
    test.t\_name 
values
    ('test1')
    , ('test2')
    , ('test3')
;

編寫測試代碼

爲了方便調試 Spark SQL 源碼,我把 SQL 語句寫在了 scala 代碼中。同時,在程序執行的末尾添加了一個阻塞標準輸入。這樣我們就可以去查看下 Spark 的 WebUI 了。

    def main(args: Array\[String\])Unit = {
        val conf = new SparkConf
        conf.set("spark.hive.enable""true")
        conf.set("spark.sql.hive.metastore.version","2.3")
        conf.set("spark.sql.hive.metastore.jars","path")
        // 顯示所有的執行計劃
        conf.set("spark.sql.ui.explainMode""extended")

        val spark = SparkSession
            .builder()
            .config(conf)
            .master("local\[1\]")
            .enableHiveSupport()
            .getOrCreate()

        spark.sparkContext.setLogLevel("INFO")

        // 支持方式
        val sql =
            """
              | select
              |     \*
              | from
              |     test.t\_name
              |""".stripMargin

        val df = spark.sql(sql)

        df.show()

        System.in.read()
    }

執行計劃

查看解析執行計劃

\== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
   +- Project \[cast(name#0 as string) AS name#3\]
      +- Project \[name#0\]
         +- SubqueryAlias spark\_catalog.test.t\_name
            +- HiveTableRelation \[\`test\`.\`t\_name\`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: \[name#0\], Partition Cols: \[\]\]

可以看到,執行計劃樹包含以下幾個部分內容:

  1. 讀取 Hive 的關係型表(test.t_name),使用的是 Hive SerDe2 讀取,包含了一個 name 列,沒有分區

  2. 執行計劃中使用的表的別名爲:spark_catalog.test.t_name

  3. 查詢 name#0 這一列的數據

  4. 將 name#0 這一列轉換爲 String 類型,並取別名爲 name#3

  5. 最後添加一個 limit 爲 21 的限制(這個是 Spark 默認給我們添加上的)

查看分析後的執行計劃

\== Analyzed Logical Plan ==
name: string
GlobalLimit 21
+- LocalLimit 21
   +- Project \[cast(name#0 as string) AS name#3\]
      +- Project \[name#0\]
         +- SubqueryAlias spark\_catalog.test.t\_name
            +- HiveTableRelation \[\`test\`.\`t\_name\`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: \[name#0\], Partition Cols: \[\]\]

我們看到,分析後的執行計劃只添加了 name 的類型。解析執行計劃僅僅是解析 SQL 爲語法樹,在解析執行計劃階段,Spark SQL 是不知道列的類型的。

優化後的執行計劃

\== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
   +- HiveTableRelation \[\`test\`.\`t\_name\`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: \[name#0\], Partition Cols: \[\]\]

我們發現,優化後的執行計劃要比分析後的執行計劃簡單很多。不會進行類型轉換,也無需執行投影查詢,保留了 Spark SQL 默認的 LIMIT 限制。

物理執行計劃

\== Physical Plan ==
CollectLimit 21
+- Scan hive test.t\_name \[name#0\], HiveTableRelation \[\`test\`.\`t\_name\`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: \[name#0\], Partition Cols: \[\]\]

物理執行計劃就是真正能夠在 Spark 集羣上運行的。

image-20210423170530691

查看 Spark 執行日誌

日誌是非常重要了,這也是我們日常開發、排錯必不可少的。此處,我們通過 INFO 級別的日誌,來更細一點的粒度地來看看 Spark SQL 的執行過程。爲了方便分析,我省略掉一些不是特別重要的日誌信息。

不要指望,通過看日誌瞭解 Spark 的所有執行機制,我們這一步就是要看看 Spark 執行過程有哪些組件是比較重要的。後續有助於我們分析源代碼。

JAVA 執行命令

C:\\opt\\Java\\jdk1.8.0\_181\\bin\\java.exe "-javaagent:C:\\Program Files\\JetBrains\\IntelliJ IDEA Community Edition 2020.3.2\\lib\\idea\_rt.jar=57446:C:\\Program Files\\JetBrains\\IntelliJ IDEA Community Edition 2020.3.2\\bin" -Dfile.encoding=UTF-8 -classpath C:\\Users\\China\\AppData\\Local\\Temp\\classpath918267796.jar cn.com.hnzycfc.SparkSQL\_ResourceTest01

在 IDEA 中,調試命令非常簡潔。除了 IDEA 爲了方便調試添加了 JVM 代理之外,有一個比較重要的參數,-classpath C:\Users\China\AppData\Local\Temp\classpath918267796.jar,要運行 Spark,我們可以確定的是,JVM 一定要加載 Spark 的核心 JAR 包。而這些 JAR 包如果我們沒有配置本地 JVM 的 CLASSPATH,一定會通過 JAVA 命令傳遞給 JVM,否則,一定會出現 CLASS NOT FOUND 錯誤。

我們打開這個 classpath918267796.jar 包,一看便知。這個 JAR 包中只有一個文件:

image-20210423171223760

這個文件是 JAR 包的元數據描述文件。很明顯可以看到有一個 Class-Path,這個是 JDK 的標準加載 CLASSPATH 方式。(如果大家不熟悉 CLASSPATH,可以看一下我之前寫的這篇文章:https://mp.weixin.qq.com/s/qcQKPCdRFXEqun5B3augyg)

Class-Path: file:/E:/05.git\_projectspark\_sql\_test/target/classes/ file:/C:/opt/scala/lib/scala-library.jar file:/C:/opt/scala/lib/scala-reflect.jar file:/C:/opt/apache-maven-3.6.1/repository/org/apache/spark/spark-core\_2.12/3.1.1/spark-core\_2.12-3.1.1.jar file:/C:/opt/apache-maven3.6.1/repository/com/thoughtworks/paranamer/paranamer/2.8/paranamer-2.8.jar file...

裏面其實包含了一眼望不到邊的 JAR 包。其中就包含了 Spark、Hive 相關的依賴包。

關鍵對象

StaticLoggerBinder,負責構建 LogFactory。

日誌配置加載

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in \[jar:file:/C:/opt/apache-maven-3.6.1/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class\]
SLF4J: Found binding in \[jar:file:/C:/opt/apache-maven-3.6.1/repository/org/slf4j/slf4j-simple/1.7.25/slf4j-simple-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class\]
SLF4J: See http://www.slf4j.org/codes.html#multiple\_bindings for an explanation.
SLF4J: Actual binding is of type \[org.slf4j.impl.Log4jLoggerFactory\]
log4j:WARN No appenders could be found for logger (org.apache.hadoop.hive.conf.HiveConf).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

這裏,我們並沒有配置 log4j.property,所以,Spark 使用了它自己 JAR 包中的 Log4j 配置文件。這個配置文件我們可以在spark-core_2.12-3.1.1.jar中找到。

image-20210423172125770

提交 Spark 應用程序

21/04/23 16:54:51 INFO SparkContext: Running Spark version 3.1.1
21/04/23 16:54:51 INFO ResourceUtils: ==============================================================
# 沒有爲spark.driver自定義配置
21/04/23 16:54:51 INFO ResourceUtils: No custom resources configured for spark.driver.
21/04/23 16:54:51 INFO ResourceUtils: ==============================================================
21/04/23 16:54:51 INFO SparkContext: Submitted application: b72a357f-bba5-47c2-871a-1ae5db3e45a2

Spark 應用程序是需要提交到集羣中運行的。當前我們在 IDEA 中運行,那麼 Spark 會使用 JVM 線程模擬一個集羣來運行應用程序。

關鍵對象

ResourceUtils,這是負責加載配置 spark.driver 的類

加載默認 executor 配置

21/04/23 16:54:51 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/04/23 16:54:51 INFO ResourceProfile: Limiting resource is cpu
21/04/23 16:54:51 INFO ResourceProfileManager: Added ResourceProfile id: 0

這段日誌描述的是 executor 的資源。因爲我們當前沒有指定 executor 配置,所以使用的是默認的 executor 配置,executor 使用 1 個 core、1gb 內存、沒有使用堆外內存,每個 task 使用 1 個 vcore,也就是一個 executor 執行一個任務。

注意:咱們留意一下,這個 ResourceProfile 的 id 爲 0。

關鍵對象

ResourceProfile,這是負責配置描述 executor 配置的。

權限驗證

21/04/23 16:54:52 INFO SecurityManager: Changing view acls to: China,hdfs
21/04/23 16:54:52 INFO SecurityManager: Changing modify acls to: China,hdfs
21/04/23 16:54:52 INFO SecurityManager: Changing view acls groups to: 
21/04/23 16:54:52 INFO SecurityManager: Changing modify acls groups to: 
21/04/23 16:54:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(China, hdfs); groups with view permissions: Set(); users  with modify permissions: Set(China, hdfs); groups with modify permissions: Set()

當前我們使用的是所屬組是 hdfs。

image-20210423173000840

因爲我們配置了 HADOOP_USER_NAME 環境變量。當前,Spark Kubernetes Security 是沒有開啓的,認證是禁用的。

image-20210423173331281

關鍵對象

SecurityManager,這是負責 Kubernetes 身份認證的,注意哦,不是權限認證。

啓動 Spark 重要服務

21/04/23 16:54:55 INFO SparkEnv: Registering MapOutputTracker
21/04/23 16:54:55 INFO SparkEnv: Registering BlockManagerMaster
21/04/23 16:54:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/04/23 16:54:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/04/23 16:54:55 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/04/23 16:54:55 INFO DiskBlockManager: Created local directory at C:\\Users\\China\\AppData\\Local\\Temp\\blockmgr-89e66659-c81c-4922-85b0-dbcb38570c81
21/04/23 16:54:56 INFO MemoryStore: MemoryStore started with capacity 4.1 GiB
21/04/23 16:54:56 INFO SparkEnv: Registering OutputCommitCoordinator
21/04/23 16:54:56 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/04/23 16:54:56 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://admin:4040
21/04/23 16:54:56 INFO Executor: Starting executor ID driver on host admin
21/04/23 16:54:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57484.
21/04/23 16:54:56 INFO NettyBlockTransferService: Server created on admin:57484

可以看到,註冊了 MapOutputTracker、BlockManagerMaster、BlockManagerMasterEndpoint、OutputCommitCoordinator 這些組件,然後啓動了 Spark UI、以及啓動了 Netty 數據傳輸服務。

關鍵對象

  • SparkEnv,這是負責註冊 Spark 運行環境所需要的組件。

  • SparkUI,這是負責運行 Spark WebUI 的組件。

  • NettyBlockTransferService,這是負責使用 Netty 進行數據塊傳輸的組件。

塊管理器

21/04/23 16:54:56 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/04/23 16:54:56 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, admin, 57484, None)
21/04/23 16:54:56 INFO BlockManagerMasterEndpoint: Registering block manager admin:57484 with 4.1 GiB RAM, BlockManagerId(driver, admin, 57484, None)
21/04/23 16:54:56 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, admin, 57484, None)
21/04/23 16:54:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, admin, 57484, None)

BlockManager 翻譯過來就是塊管理器,Spark 有自己的內存管理,這個就是 Spark 用於管理存儲塊的。當前 BlockManager 使用的是隨機塊副本策略。並且註冊了 Block Manager 在某個端口。我們看到 BlockManager 使用的就是前面的 NettyBlockTransfer 服務的 57484 端口。而且,BlockManager 能夠使用的內存是 4.1GB 的內存。

image-20210423175446285

在 Web UI 中,我們可以看到。這個 4.1GB 顯示的是 Storage Memory,也就是內存中能夠存儲的數據量。

加載 Spark SQL 元數據

21/04/23 16:54:56 INFO SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('hdfs://node1:8020/user/hive/warehouse').
21/04/23 16:54:56 INFO SharedState: Warehouse path is 'hdfs://node1:8020/user/hive/warehouse'.
21/04/23 16:54:57 INFO HiveUtils: Initializing HiveMetastoreConnection version 2.3 using path: 
21/04/23 16:54:59 INFO deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
21/04/23 16:55:00 INFO SessionState: Created HDFS directory: /tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4
21/04/23 16:55:00 INFO SessionState: Created local directory: C:/Users/China/AppData/Local/Temp/China/b750e106-5b2f-4c67-8217-8cba7f95aaf4
21/04/23 16:55:00 INFO SessionState: Created HDFS directory: /tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4/\_tmp\_space.db
21/04/23 16:55:00 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.7) is hdfs://node1:8020/user/hive/warehouse
21/04/23 16:55:00 INFO metastore: Trying to connect to metastore with URI thrift://node1:9083
21/04/23 16:55:00 INFO metastore: Opened a connection to metastore, current connections: 1
21/04/23 16:55:00 INFO JniBasedUnixGroupsMapping: Error getting groups for hdfs: Unknown error.
21/04/23 16:55:00 INFO metastore: Connected to metastore.
21/04/23 16:55:02 INFO SQLStdHiveAccessController: Created SQLStdHiveAccessController for session context : HiveAuthzSessionContext \[sessionString=b750e106-5b2f-4c67-8217-8cba7f95aaf4, clientType=HIVECLI\]
21/04/23 16:55:02 WARN SessionState: METASTORE\_FILTER\_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
21/04/23 16:55:02 INFO metastore: Mestastore configuration hive.metastore.filter.hook changed from org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl to org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook
21/04/23 16:55:02 INFO metastore: Closed a connection to metastore, current connections: 0
21/04/23 16:55:02 INFO metastore: Trying to connect to metastore with URI thrift://node1:9083
21/04/23 16:55:02 INFO metastore: Opened a connection to metastore, current connections: 1
21/04/23 16:55:02 INFO metastore: Connected to metastore.

任何的 SQL 引擎都是需要加載元數據的,不然,連執行計劃都生成不了。上述例子總的來說,做了兩件事情。

  1. 加載元數據

  2. 創建會話連接 Hive MetaStore

首先,Spark 檢測到我們沒有設置 spark.sql.warehouse.dir,然後就開始找我們在 hite-site.xml 中配置的 hive.metastore.warehouse.dir。

image-20210423180055371

然後,SparkSession 在 HDFS 臨時位置創建了 / tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4/_tmp_space.db 目錄。

\[hadoop@node2 root\]$ hdfs dfs -ls -R /tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4/
drwx------   - hdfs hadoop          0 2021-04-23 16:55 /tmp/hive/hdfs/b750e106-5b2f-4c67-8217-8cba7f95aaf4/\_tmp\_space.db

最後,Spark 開始通過 thrift RPC 去連接 Hive 的 MetaStore Server。(thrift://node1:9083)

關鍵對象

SessionState,負責存儲所有 SparkSession 的狀態。

發送廣播提交 JOB

21/04/23 16:55:02 INFO MemoryStore: Block broadcast\_0 stored as values in memory (estimated size 375.5 KiB, free 4.1 GiB)
21/04/23 16:55:02 INFO MemoryStore: Block broadcast\_0\_piece0 stored as bytes in memory (estimated size 34.4 KiB, free 4.1 GiB)
21/04/23 16:55:02 INFO BlockManagerInfo: Added broadcast\_0\_piece0 in memory on admin:57484 (size: 34.4 KiB, free: 4.1 GiB)
21/04/23 16:55:02 INFO SparkContext: Created broadcast 0 from 
21/04/23 16:55:02 INFO FileInputFormat: Total input files to process : 1
21/04/23 16:55:02 INFO SparkContext: Starting job: show at SparkSQL\_ResourceTest01.scala:35

首先,MemoryStore 是 BlockManager 的組件,負責操作在內存中的數據存儲。

image-20210423181914879

當前有一個 broadcast_0 塊添加到內存中了,評估的大小爲 375KB。

其次,可以看到 Sprk 還是使用 Hadoop 的 FileInputFormat 來讀取 HDFS 文件,當前讀取到的是一個文件。

最後,SparkContext 提交了一個作業。注意:一個 Spark 應用可以提交很多的 JOB。

核心對象

  • MemoryStore,負責 BlockManager 中的內存數據存儲。

  • SparkContext,負責創建廣播、並提交作業。

DAG 調度

21/04/23 16:55:02 INFO DAGScheduler: Got job 0 (show at SparkSQL\_ResourceTest01.scala:35) with 1 output partitions
21/04/23 16:55:02 INFO DAGScheduler: Final stage: ResultStage 0 (show at SparkSQL\_ResourceTest01.scala:35)
21/04/23 16:55:02 INFO DAGScheduler: Parents of final stage: List()
21/04/23 16:55:03 INFO DAGScheduler: Missing parents: List()
21/04/23 16:55:03 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD\[4\] at show at SparkSQL\_ResourceTest01.scala:35), which has no missing parents
21/04/23 16:55:03 INFO MemoryStore: Block broadcast\_1 stored as values in memory (estimated size 9.2 KiB, free 4.1 GiB)
21/04/23 16:55:03 INFO MemoryStore: Block broadcast\_1\_piece0 stored as bytes in memory (estimated size 4.8 KiB, free 4.1 GiB)
21/04/23 16:55:03 INFO BlockManagerInfo: Added broadcast\_1\_piece0 in memory on admin:57484 (size: 4.8 KiB, free: 4.1 GiB)
21/04/23 16:55:03 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1383
21/04/23 16:55:03 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD\[4\] at show at SparkSQL\_ResourceTest01.scala:35) (first 15 tasks are for partitions Vector(0))

首先,DAGScheduler 接收到 SparkContext 提交的作業,然後找到了最終的 Stage(名稱爲:show at SparkSQL_ResourceTest01.scala:35),再提交 Stage。因爲咱們的 SQL 非常簡單,所以只有一個 Stage,沒有父 Stage。

其次,BlockManager 的 MemoryStore 創建了第二個廣播,並放入到內存中。

最後,DAGScheduler 從 DAG 中拿到的 Stage 提交任務。

關鍵對象

DAGScheduler,負責調度 Stage,並提交 TaskSet。

Task 調度

21/04/23 16:55:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
21/04/23 16:55:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (admin, executor driver, partition 0, ANY, 4530 bytes) taskResourceAssignments Map()
21/04/23 16:55:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/04/23 16:55:03 INFO HadoopRDD: Input split: hdfs://node1:8020/user/hive/warehouse/test.db/t\_name/000000\_0:0+18

首先,TaskScheduler 添加了一個任務集(0.0),並且指定使用的是 Resource Profile 0。這不是前面 Driver 中加載到的 ResourceProfile 麼~

其實,使用啓動 task 0.0 任務。當前我們運行的是本地模式,所以是在 driver 上直接運行的。當前任務讀取的是 HDFS 上的文件——hdfs://node1:8020/user/hive/warehouse/test.db/t_name/000000_0

\[hadoop@node2 root\]$ hdfs dfs -tail hdfs://node1:8020/user/hive/warehouse/test.db/t\_name/000000\_0
2021-04-23 18:35:52,783 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
test1
test2
test3

關鍵對象

  • TaskSchedulerImpl,負責調度任務。

  • TaskSetManager,負責啓動任務。

  • Executor,負責啓動任務。

  • HadoopRDD,負責讀取數據。

生成代碼、完成任務

21/04/23 16:55:03 INFO CodeGenerator: Code generated in 174.966701 ms
21/04/23 16:55:05 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1402 bytes result sent to driver
21/04/23 16:55:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2375 ms on admin (executor driver) (1/1)
21/04/23 16:55:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/04/23 16:55:05 INFO DAGScheduler: ResultStage 0 (show at SparkSQL\_ResourceTest01.scala:35) finished in 2.482 s
21/04/23 16:55:05 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/04/23 16:55:05 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/04/23 16:55:05 INFO DAGScheduler: Job 0 finished: show at SparkSQL\_ResourceTest01.scala:35, took 2.533098 s
21/04/23 16:55:05 INFO CodeGenerator: Code generated in 11.8474 ms

我們看到有一個 CodeGenerator 組件,看名字就知道是生成代碼的。也就是說,我們的 SQL 會由 CodeGenerator 生成代碼,然後由 Executor 執行。TaskSetManagerImpl 檢測到任務集執行完後,會從池中移除(注意,TaskScheduler 有自己的任務調度器,默認爲 FIFO,也可以配置爲公平調度)。當所有 Stage 中的任務執行完後,TaskScheduler 會關閉所有的任務線程。然後結束作業。

關鍵對象

  • CodeGenerator:代碼生成器,負責生成在 Task 中運行的代碼。

可以看到,INFO 輸出把整個 Spark 應用的執行流程關鍵節點輸出到日誌了。一目瞭然,沒有特別細節的組件執行過程。大家將來輸出日誌也可以參考 Spark 這種輸出方式,有利於排錯。

計劃執行流程源碼解析

上面,通過查看日誌,我們大概看到了 Spark 程序的執行過程。但其實,關於 Spark SQL 執行計劃的部分,我們並沒有看到。接下來,我們要根據上面分析的日誌執行過程,來分析 Spark SQL 的執行過程。

設置斷點並啓動調試

設置 Spark 的日誌級別爲 DEBUG。

spark.sparkContext.setLogLevel("DEBUG")

在 spark.sql 執行 SQL 語句的位置,打上斷點。並開始啓動調試。單步進入到 spark.sql 方法類。

image-20210423190058769

SparkSession.sql API

def sql(sqlText: String)DataFrame = withActive {
  // 用於跟蹤查詢計劃的執行,例如:查詢計劃要執行計劃要執行哪些Rule、跟蹤記錄各個階段執行的時間等。
  val tracker = new QueryPlanningTracker
  // 調用measurePhase統計解析執行計劃的時間。
  // 這是一個高階函數:def measurePhase\[T\](phase: String)(f: => T):
  // 執行一個操作,並計算其執行的時間。
  val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
    // SessionState的SQL Parser負責解析SQL,並生成解析的執行計劃
    // 接口定義爲:def parsePlan(sqlText: String): LogicalPlan
    sessionState.sqlParser.parsePlan(sqlText)
  }
  // 生成物理執行計劃並生成DataSet(就是DataFrame)
  Dataset.ofRows(self, plan, tracker)
}

sql 方法會調用 Spark Session 中的 Session State 的 SQL 解析器來解析 SQL 語句爲邏輯執行計劃(LogicalPlan)。

豐富的 SessionState 組件

SessionState 的組件非常豐富。大家前面看到了它有個 SqlParser 成員,負責解析 SQL 文本爲邏輯執行計劃。除此之外,它還包含了其他的重要組件。

image-20210423191720506

大概掃一眼,重要的組件都組合在這裏了。

LogicPlan

LogicPlan,邏輯執行計劃。它是一個抽象類。

image-20210423234433003

這是 LogicPlan 的繼承樹,其實,它就是一顆樹。我們看到父類 TreeNode 是一個非常牛逼的 TreeNode。裏面包含了 Children TreeNode,還實現了很多的函數式算子,也就是,我們可以在一棵樹節點上執行任意的函數式計算。

image-20210423234632343

LogicPlan 有一個非常重要的方法:resolve。

  def resolve(
      nameParts: Seq\[String\],
      resolver: Resolver): Option\[NamedExpression\] =
    outputAttributes.resolve(nameParts, resolver)

它表示將所有的帶有名稱的部分解析爲命名錶達式。我們來看下當前我們的邏輯執行計劃樹是什麼樣子的。當前的執行計劃的根節點是一個 Project 類型。來看下它的繼承樹。

image-20210423235809607

由此,我們可以得出結論,在 SQL 中的任意一個操作,都可以找到一種類型的 LogicPlan,其實就是 TreeNode。包括 DDL、包括 explain、包括 LoadData 等等所有。所以,想看下 Spark 支持什麼操作,上面 LogicPlan 的子類就知道了。

image-20210423235417786

這裏有三個重要的變量:

ProjectLogicPlan

我們來看下 ProjectLogicPlan 的源碼。

case class Project(projectList: Seq\[NamedExpression\], child: LogicalPlan)
    extends OrderPreservingUnaryNode {
  // 執行Project操作的輸出描述
  override def output: Seq\[Attribute\] = projectList.map(\_.toAttribute)
  // 子節點的最大行限制
  override def maxRows: Option\[Long\] = child.maxRows

  // 是否解析
  override lazy val resolved: Boolean = {
    // 檢測是否存在聚合表達式、生成操作、開窗操作
    val hasSpecialExpressions = projectList.exists ( \_.collect {
        case agg: AggregateExpression => agg
        case generator: Generator => generator
        case window: WindowExpression => window
      }.nonEmpty
    )

    // Project中表達式存在未解析、所有子節點有未解析的表達式、且不包含特殊表達式,都認爲是未解析的
    !expressions.exists(!\_.resolved) && childrenResolved && !hasSpecialExpressions
  }

  // 獲取所有有效的約束條件
  override lazy val validConstraints: ExpressionSet =
    getAllValidConstraints(projectList)
}

UnresolvedRelation

該節點表示還沒有從 Catalog 解析的關係。此時,還僅僅是將 SQL 解析成爲了一個語法樹。我加上了一些註釋。

case class UnresolvedRelation(
    multipartIdentifier: Seq\[String\],
    options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
    override val isStreaming: Boolean = false)
  extends LeafNode with NamedRelation {
  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.\_

  /\*\* Returns a \`.\` separated name for this relation. \*/
  def tableName: String = multipartIdentifier.quoted
  // 表名
  override def name: String = tableName
  // 要輸出的屬性
  override def output: Seq\[Attribute\] = Nil
  // 是否被解析
  override lazy val resolved = false
}

Dataset.ofRows API

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)DataFrame = sparkSession.withActive {
  // QueryExecution是爲Spark執行關係型查詢的主要工作流,它包含了所有的執行計劃。
  val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
  // 解析邏輯執行計劃
  qe.assertAnalyzed()
  // 創建DataSet,獲取解析後的邏輯執行計劃對應的schema,這個schema其實就是DataSet要使用的schema。
  new Dataset\[Row\](qe, RowEncoder(qe.analyzed.schema))
}

我們看到了,Dataset.ofRows 實際上會創建一個查詢執行器,然後根據物理執行計劃創建 DataSet。

我們來調試一下 QueryExecution 就知道了。

image-20210424003057138

看到了嗎?

到此所有的計劃都已經生成好了,DataSet 也構建好了,就等着調度執行了。到此,所有的操作都是在 Driver 端執行的。

image-20210424004430001

邏輯執行計劃生成器源碼解析

核心實現類:SqlBaseLexer、SqlBaseParser

  def sql(sqlText: String)DataFrame = withActive {
    val tracker = new QueryPlanningTracker
    val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
      // 此處設置斷點
      sessionState.sqlParser.parsePlan(sqlText)
    }
    Dataset.ofRows(self, plan, tracker)
  }

我們接下來,需要單步進入到 parsePlan 中,看看 Spark 是如何將 SQL 語句轉換爲邏輯執行計劃的。來看下 ParserDriver.scala 中的 parse 方法,該方法實現 SQL 語句解析。ParserDriver 是 Spark-catalyst 的一個類。Spark 使用的是 ANTLR 庫進行 SQL 語法的解析。

image-20210424005415058

  protected def parse\[T\](command: String)(toResult: SqlBaseParser => T)T = {
    logDebug(s"Parsing command: $command")

    // 創建詞法分析器
    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)

    val tokenStream = new CommonTokenStream(lexer)
    // 創建此法解析器
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)
    parser.legacy\_setops\_precedence\_enbled = conf.setOpsPrecedenceEnforced
    parser.legacy\_exponent\_literal\_as\_decimal\_enabled = conf.exponentLiteralAsDecimalEnabled
    // 是否開啓標準SQL
    parser.SQL\_standard\_keyword\_behavior = conf.ansiEnabled

    try {
      try {
        // first, try parsing with potentially faster SLL mode
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
        // 執行語法解析
        toResult(parser)
      }
      catch {
        case e: ParseCancellationException =>
          // if we fail, parse with LL mode
          tokenStream.seek(0) // rewind input stream
          parser.reset()

          // Try Again.
          parser.getInterpreter.setPredictionMode(PredictionMode.LL)
          toResult(parser)
      }
    }
    catch {
      case e: ParseException if e.command.isDefined =>
        throw e
      case e: ParseException =>
        throw e.withCommand(command)
      case e: AnalysisException =>
        val position = Origin(e.line, e.startPosition)
        throw new ParseException(Option(command), e.message, position, position)
    }
  }
}

邏輯執行計劃解析器源碼解析

Analyzer 組件

核心實現類:Analyzer

Analyzer 是負責解析邏輯執行計劃的。來看一下它的類定義。

class Analyzer(override val catalogManager: CatalogManager)
  extends RuleExecutor\[LogicalPlan\] with CheckAnalysis with LookupCatalog with SQLConfHelper

Analyzer 其實就是一個 RuleExecutor,專門針對每個 LogicPlan 執行的一個個的 Rule,進行解析。後續馬上我們就會看到一個個的 Rule 了。不同類型的語法樹節點,都會有對應的 Rule 來進行解析。

Analyzer 需要使用 Catalog 管理器進行初始化。我們看到,當前我們加載的就是 Hive 的 MetaStore Catalog。

image-20210424011933985

解析邏輯執行計劃流程

我們來看下它的執行流程。

  def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker)LogicalPlan = {
    // 因爲要解析整棵語法樹,需要遞歸檢查執行計劃是否已經完成解析。其實就是遞歸的出口
    if (plan.analyzed) return plan
    AnalysisHelper.markInAnalyzer {
      // 執行解析,此處是調用父類RuleExecutor對應的executeAndTrack方法
      val analyzed = executeAndTrack(plan, tracker)
      try {
        checkAnalysis(analyzed)
        analyzed
      } catch {
        case e: AnalysisException =>
          val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
          ae.setStackTrace(e.getStackTrace)
          throw ae
      }
    }
  }

解析邏輯執行計劃的代碼來了。

  // 執行解析
  def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker)TreeType = {
    QueryPlanningTracker.withTracker(tracker) {
      execute(plan)
    }
  }

  /\*\*
   \* 串行執行子類定義的一些列規則。這些規則由指定的執行策略執行。
   \*/
  def execute(plan: TreeType)TreeType = {
    var curPlan = plan
    val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
    val planChangeLogger = new PlanChangeLogger\[TreeType\]()
    val tracker: Option\[QueryPlanningTracker\] = QueryPlanningTracker.get
    val beforeMetrics = RuleExecutor.getCurrentMetrics()

    // 對初始輸入執行初始化檢查,確保邏輯執行計劃是完整的
    // 每個邏輯執行計劃都有自己的唯一ID,例如:name#0、cast(name#0 as string) AS name#3。
    // 此處檢查需要讓邏輯執行計劃具備有唯一的ID,且輸出表達式ID是否也是唯一的
    // 一旦檢測出來有重複的,就會拋出解析失敗的異常
    if (!isPlanIntegral(plan)) {
      val message = "The structural integrity of the input plan is broken in " +
        s"${this.getClass.getName.stripSuffix("$")}."
      throw new TreeNodeException(plan, message, null)
    }

    // 挨個挨個執行Analyzer組件中定義的每一批解析策略。
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            // 對邏輯執行計劃應用規則,應用規則成功後的邏輯執行計劃就是解析後的邏輯執行計劃
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            val effective = !result.fastEquals(plan)

            if (effective) {
              queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
              queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
              planChangeLogger.logRule(rule.ruleName, plan, result)
            }
            queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
            queryExecutionMetrics.incNumExecution(rule.ruleName)

            // Record timing information using QueryPlanningTracker
            tracker.foreach(\_.recordRuleInvocation(rule.ruleName, runTime, effective))

            // Run the structural integrity checker against the plan after each rule.
            if (effective && !isPlanIntegral(result)) {
              val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
                "the structural integrity of the plan is broken."
              throw new TreeNodeException(result, message, null)
            }

            result
        }
        iteration += 1
        if (iteration > batch.strategy.maxIterations) {
          // Only log if this is a rule that is supposed to run more than once.
          if (iteration != 2) {
            val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
              "."
            } else {
              s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
            }
            val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
              s"$endingMsg"
            if (Utils.isTesting || batch.strategy.errorOnExceed) {
              throw new TreeNodeException(curPlan, message, null)
            } else {
              logWarning(message)
            }
          }
          // Check idempotence for Once batches.
          if (batch.strategy == Once &&
            Utils.isTesting && !excludedOnceBatches.contains(batch.name)) {
            checkBatchIdempotence(batch, curPlan)
          }
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) {
          logTrace(
            s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
    }
    planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)

    curPlan
  }

Batch 解析邏輯執行計劃組

scala 中定義了一堆的 Batch,這個 Batch 可以翻譯過來稱爲邏輯計劃執行策略組更容易理解一些。裏面包含了針對每一類別 LogicalPlan 的處理類。每個 Batch 需要指定其名稱、執行策略、以及規則。

  protected case class Batch(name: String, strategy: Strategy, rules: Rule\[TreeType\]\*)
  abstract class Strategy {

    /\*\* 最大執行次數. \*/
    def maxIterations: Int

    /\*\* 超過最大執行次數是否報錯. \*/
    def errorOnExceed: Boolean = false

    /\*\* 配置最大執行次數的配置key值 \*/
    def maxIterationsSetting: String = null
  }

  /\*\* 有且僅執行一次. \*/
  case object Once extends Strategy { val maxIterations = 1 }

  /\*\*
   \* 指定固定的執行次數.
   \*/
  case class FixedPoint(
    override val maxIterations: Int,
    override val errorOnExceed: Boolean = false,
    override val maxIterationsSetting: String = null) extends Strategy

所有的 Spark SQL 的解析執行計劃都在這裏了。

  override def batches: Seq\[Batch\] = Seq(
    // 替換策略組
    Batch("Substitution", fixedPoint,
      // This rule optimizes \`UpdateFields\` expression chains so looks more like optimization rule.
      // However, when manipulating deeply nested schema, \`UpdateFields\` expression tree could be
      // very complex and make analysis impossible. Thus we need to optimize \`UpdateFields\` early
      // at the beginning of analysis.
      OptimizeUpdateFields,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      SubstituteUnresolvedOrdinals),
    // 禁用Hint策略組
    Batch("Disable Hints", Once,
      new ResolveHints.DisableHints),
    // Hint策略組
    Batch("Hints", fixedPoint,
      ResolveHints.ResolveJoinStrategyHints,
      ResolveHints.ResolveCoalesceHints),
    // 簡單檢查策略組
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
    // 關係策略組
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveNamespace(catalogManager) ::
      new ResolveCatalogs(catalogManager) ::
      ResolveUserSpecifiedColumns ::
      ResolveInsertInto ::
      ResolveRelations ::
      ResolveTables ::
      ResolvePartitionSpec ::
      AddMetadataColumns ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveAggAliasInGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveSubqueryColumnAliases ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ResolveOutputRelation ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables ::
      ResolveHigherOrderFunctions(v1SessionCatalog) ::
      ResolveLambdaVariables ::
      ResolveTimeZone ::
      ResolveRandomSeed ::
      ResolveBinaryArithmetic ::
      ResolveUnion ::
      TypeCoercion.typeCoercionRules ++
      extendedResolutionRules : \_\*),
    // 應用字符填充策略組
    Batch("Apply Char Padding", Once,
      ApplyCharTypePadding),
    // 可以執行的鉤子策略組
    Batch("Post-Hoc Resolution", Once,
      Seq(ResolveNoopDropTable) ++
      postHocResolutionRules: \_\*),
    // Alter Table策略組
    Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
    // 移除未解析的Hint策略組
    Batch("Remove Unresolved Hints", Once,
      new ResolveHints.RemoveAllHints),
    // 非確定策略組
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    // UDF策略組
    Batch("UDF", Once,
      HandleNullInputsForUDF,
      ResolveEncodersInUDF),
    // 更新爲null策略組
    Batch("UpdateNullability", Once,
      UpdateAttributeNullability),
    // 子查詢策略組
    Batch("Subquery", Once,
      UpdateOuterReferences),
    // 清理策略組
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

解析邏輯執行計劃規則

每一個 Batch 中都對應了一組規則。Spark SQL 的 Analyzer 組件,就是使用這些規則來解析邏輯執行計劃的。

image-20210424013330014

而所有的規則都是從 Rule 繼承。

image-20210424013458780

Rule 是一個抽象類,它的定義非常簡單,只有一個規則名稱、以及 apply 方法,也就是對邏輯執行計劃應用規則。

abstract class Rule\[TreeType <: TreeNode\[\_\]\] extends SQLConfHelper with Logging {

  /\*\* Name for this rule, automatically inferred based on class name. \*/
  val ruleName: String = {
    val className = getClass.getName
    if (className endsWith "$") className.dropRight(1) else className
  }

  def apply(plan: TreeType): TreeType
}

應用規則解析邏輯執行計劃

既然規則要解析邏輯執行計劃,那麼規則勢必就要改變邏輯執行計劃。我們來看看規則到底是如何應用到邏輯執行計劃上的。此處,我找了一個,ResolveUserSpecifiedColumns 規則來分析其源碼。

  object ResolveUserSpecifiedColumns extends Rule\[LogicalPlan\] {
    // 調用邏輯執行計劃的resolveOperators方法,該方法會克隆一個邏輯執行計劃樹,並應用規則。
    // 這個resolveOperators是一個遞歸方法,會將整棵邏輯執行計劃樹都執行規則。遞歸方式是層級遍歷。
    // 該函數調用中就是規則的定義,描述了應該對邏輯執行計劃如何應用規則
    // 我把代碼貼在這兒,大家做參考。
    // val afterRule = CurrentOrigin.withOrigin(origin) {
    //   rule.applyOrElse(self, identity\[LogicalPlan\])
    // }
    // // Check if unchanged and then possibly return old copy to avoid gc churn.
    // if (self fastEquals afterRule) {
    //   mapChildren(\_.resolveOperatorsDown(rule))
    // } else {
    //   afterRule.mapChildren(\_.resolveOperatorsDown(rule))
    // }
    override def apply(plan: LogicalPlan)LogicalPlan = plan.resolveOperators {
      // 只匹配InsertInto語句,而且insert語句中的表已經被解析、且insert對應的子查詢也已經被解析、且用戶指定列是非空才執行
      case i: InsertIntoStatement if i.table.resolved && i.query.resolved &&
          i.userSpecifiedCols.nonEmpty =>
       // 解析用戶指定列
        val resolved = resolveUserSpecifiedColumns(i)
       // 獲取到INSERT INTO子查詢中的所有列
        val projection = addColumnListOnQuery(i.table.output, resolved, i.query)
       // 將子查詢解析的列拷貝到userSpecifiedCols
        i.copy(userSpecifiedCols = Nil, query = projection)
    }
    
    // 解析用戶指定列
    private def resolveUserSpecifiedColumns(i: InsertIntoStatement): Seq\[NamedExpression\] = {
      // 檢查列名是否重複
      SchemaUtils.checkColumnNameDuplication(
        i.userSpecifiedCols, "in the column list", resolver)
   // 按照INSERT INTO中用戶的指定列,解析每一個列
      i.userSpecifiedCols.map { col =>
          i.table.resolve(Seq(col), resolver)
            .getOrElse(i.table.failAnalysis(s"Cannot resolve column name $col"))
      }
    }

每個邏輯執行計劃都有一個 copy 方法,用於將解析後的表達式拷貝到執行計劃中。

一旦所有的規則應用完畢,邏輯執行計劃樹就解析完了。

重要提示!規則是按照層級底層應用的。

邏輯執行計劃優化器源碼解析

找到 Optimizer 類,該類實現了對執行計劃的優化。和 Analyzer 類似,它也是 RuleExecutor 的子類,也就是說,它也是按照規則來進行優化的,所以肯定也是按照 Batch 和 Rule 的方式來對執行計劃進行優化。Spark 3.x 的優化策略組也是非常豐富的。

    val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
    // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
    // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
    // However, because we also use the analyzer to canonicalized queries (for view definition),
    // we do not eliminate subqueries or compute current time in the analyzer.
    Batch("Finish Analysis", Once,
      EliminateResolvedHint,
      EliminateSubqueryAliases,
      EliminateView,
      ReplaceExpressions,
      RewriteNonCorrelatedExists,
      ComputeCurrentTime,
      GetCurrentDatabaseAndCatalog(catalogManager),
      ReplaceDeduplicateWithAggregate) ::
    //////////////////////////////////////////////////////////////////////////////////////////
    // Optimizer rules start here
    //////////////////////////////////////////////////////////////////////////////////////////
    // - Do the first call of CombineUnions before starting the major Optimizer rules,
    //   since it can reduce the number of iteration and the other rules could add/move
    //   extra operators between two adjacent Union operators.
    // - Call CombineUnions again in Batch("Operator Optimizations"),
    //   since the other rules might make two separate Unions operators adjacent.
    Batch("Union", Once,
      CombineUnions) ::
    Batch("OptimizeLimitZero", Once,
      OptimizeLimitZero) ::
    // Run this once earlier. This might simplify the plan and reduce cost of optimizer.
    // For example, a query such as Filter(LocalRelation) would go through all the heavy
    // optimizer rules that are triggered when there is a filter
    // (e.g. InferFiltersFromConstraints). If we run this batch earlier, the query becomes just
    // LocalRelation and does not trigger many rules.
    Batch("LocalRelation early", fixedPoint,
      ConvertToLocalRelation,
      PropagateEmptyRelation,
      // PropagateEmptyRelation can change the nullability of an attribute from nullable to
      // non-nullable when an empty relation child of a Union is removed
      UpdateAttributeNullability) ::
    Batch("Pullup Correlated Expressions", Once,
      PullupCorrelatedPredicates) ::
    // Subquery batch applies the optimizer rules recursively. Therefore, it makes no sense
    // to enforce idempotence on it and we change this batch from Once to FixedPoint(1).
    Batch("Subquery", FixedPoint(1),
      OptimizeSubqueries) ::
    Batch("Replace Operators", fixedPoint,
      RewriteExceptAll,
      RewriteIntersectAll,
      ReplaceIntersectWithSemiJoin,
      ReplaceExceptWithFilter,
      ReplaceExceptWithAntiJoin,
      ReplaceDistinctWithAggregate) ::
    Batch("Aggregate", fixedPoint,
      RemoveLiteralFromGroupExpressions,
      RemoveRepetitionFromGroupExpressions) :: Nil ++
    operatorOptimizationBatch) :+
    // This batch rewrites plans after the operator optimization and
    // before any batches that depend on stats.
    Batch("Pre CBO Rules", Once, preCBORules: \_\*) :+
    // This batch pushes filters and projections into scan nodes. Before this batch, the logical
    // plan may contain nodes that do not report stats. Anything that uses stats must run after
    // this batch.
    Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: \_\*) :+
    // Since join costs in AQP can change between multiple runs, there is no reason that we have an
    // idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once.
    Batch("Join Reorder", FixedPoint(1),
      CostBasedJoinReorder) :+
    Batch("Eliminate Sorts", Once,
      EliminateSorts) :+
    Batch("Decimal Optimizations", fixedPoint,
      DecimalAggregates) :+
    // This batch must run after "Decimal Optimizations", as that one may change the
    // aggregate distinct column
    Batch("Distinct Aggregate Rewrite", Once,
      RewriteDistinctAggregates) :+
    Batch("Object Expressions Optimization", fixedPoint,
      EliminateMapObjects,
      CombineTypedFilters,
      ObjectSerializerPruning,
      ReassignLambdaVariableID) :+
    Batch("LocalRelation", fixedPoint,
      ConvertToLocalRelation,
      PropagateEmptyRelation,
      // PropagateEmptyRelation can change the nullability of an attribute from nullable to
      // non-nullable when an empty relation child of a Union is removed
      UpdateAttributeNullability) :+
    // The following batch should be executed after batch "Join Reorder" and "LocalRelation".
    Batch("Check Cartesian Products", Once,
      CheckCartesianProducts) :+
    Batch("RewriteSubquery", Once,
      RewritePredicateSubquery,
      ColumnPruning,
      CollapseProject,
      RemoveNoopOperators) :+
    // This batch must be executed after the \`RewriteSubquery\` batch, which creates joins.
    Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) :+
    Batch("ReplaceUpdateFieldsExpression", Once, ReplaceUpdateFieldsExpression)

    // remove any batches with no rules. this may happen when subclasses do not add optional rules.
    batches.filter(\_.rules.nonEmpty)
  }

此處,受限於篇幅,就不去進入到這些優化規則分析了。如果大家有興趣,可以執行去單步,它的執行方式和 Analyzer 類似。

物理執行計劃生成源碼解析

物理執行計劃是由 SparkPlanner 來實現的。來看一下它的類定義。

class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods)
  extends SparkStrategies with SQLConfHelper

SparkPlanner 會使用一系列的策略來生成一組候選的物理執行計劃。

  override def strategies: Seq\[Strategy\] =
    experimentalMethods.extraStrategies ++
      extraPlanningStrategies ++ (
      LogicalQueryStageStrategy ::
      PythonEvals ::
      new DataSourceV2Strategy(session) ::
      FileSourceStrategy ::
      DataSourceStrategy ::
      SpecialLimits ::
      Aggregation ::
      Window ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)

來看一下 SparkPlan 的定義:

abstract class SparkPlan extends QueryPlan\[SparkPlan\] with Logging with Serializable

它也是從 QueryPlan 中繼承的。它也是一顆樹。

image-20210424025222172

大多數的 SparkPlan 節點是以 Exec 結尾,這也表示了這些節點是 Spark 可執行的。SparkPlan 中定義了 doExecute 方法,這是每種 SparkPlan 都需要自己實現的。

protected def doExecute(): RDD\[InternalRow\]

我們來看下 HiveTableScanExec 實現。

  protected override def doExecute(): RDD\[InternalRow\] = {
    // Using dummyCallSite, as getCallSite can turn out to be expensive with
    // multiple partitions.
    val rdd = if (!relation.isPartitioned) {
      Utils.withDummyCallSite(sqlContext.sparkContext) {
        hadoopReader.makeRDDForTable(hiveQlTable)
      }
    } else {
      Utils.withDummyCallSite(sqlContext.sparkContext) {
        hadoopReader.makeRDDForPartitionedTable(prunedPartitions)
      }
    }
    val numOutputRows = longMetric("numOutputRows")
    // Avoid to serialize MetastoreRelation because schema is lazy. (see SPARK-15649)
    val outputSchema = schema
    rdd.mapPartitionsWithIndexInternal { (index, iter) =>
      // 調用自動生成的代碼
      val proj = UnsafeProjection.create(outputSchema)
      proj.initialize(index)
      iter.map { r =>
        numOutputRows += 1
        proj(r)
      }
    }
  }

我們終於看到了 RDD。這就是 spark-core,底層最終還是由 RDD 來執行的。雖然 RDD 不推薦直接使用,但其實底層 Spark 還是基於 RDD 實現的,只不過 Spark SQL 做了大量的優化而已。我們看到 HiveScan 操作,先通過 HadoopReader 的 markRDDForTable 方法,創建 RDD,然後執行了 mapPartitionsWithIndexInternal 算子。

源碼解析:Code Generate 代碼生成

在生成了 RDD 後,RDD 算子中執行的代碼會通過 Code Generate 自動生成代碼來執行。以下是 CodeGenerator 的代碼。

abstract class CodeGenerator\[InType <: AnyRef, OutType <: AnyRef\] extends Logging {

  protected val genericMutableRowType: String = classOf\[GenericInternalRow\].getName

  /\*\*
   \* Generates a class for a given input expression.  Called when there is not cached code
   \* already available.
   \*/
  protected def create(in: InType): OutType

  /\*\*
   \* Canonicalizes an input expression. Used to avoid double caching expressions that differ only
   \* cosmetically.
   \*/
  protected def canonicalize(in: InType): InType

  /\*\* Binds an input expression to a given input schema \*/
  protected def bind(in: InType, inputSchema: Seq\[Attribute\]): InType

  /\*\* Generates the requested evaluator binding the given expression(s) to the inputSchema. \*/
  def generate(expressions: InType, inputSchema: Seq\[Attribute\])OutType =
    generate(bind(expressions, inputSchema))

  /\*\* Generates the requested evaluator given already bound expression(s)\*/
  def generate(expressions: InType)OutType = create(canonicalize(expressions))

  /\*\*
   \* Create a new codegen context for expression evaluator, used to store those
   \* expressions that don't support codegen
   \*/
  def newCodeGenContext()CodegenContext = {
    new CodegenContext
  }
}

其中,generate 方法就是直接根據表達式來生成可執行的 Java 代碼的。我們可以來看下 GenerateUnsafeProjection 的實現。

  private def create(
      expressions: Seq\[Expression\],
      subexpressionEliminationEnabled: Boolean)UnsafeProjection = {
    val ctx = newCodeGenContext()
    val eval = createCode(ctx, expressions, subexpressionEliminationEnabled)

    val codeBody =
      s"""
         |public java.lang.Object generate(Object\[\] references) {
         |  return new SpecificUnsafeProjection(references);
         |}
         |
         |class SpecificUnsafeProjection extends ${classOf\[UnsafeProjection\].getName} {
         |
         |  private Object\[\] references;
         |  ${ctx.declareMutableStates()}
         |
         |  public SpecificUnsafeProjection(Object\[\] references) {
         |    this.references = references;
         |    ${ctx.initMutableStates()}
         |  }
         |
         |  public void initialize(int partitionIndex) {
         |    ${ctx.initPartition()}
         |  }
         |
         |  // Scala.Function1 need this
         |  public java.lang.Object apply(java.lang.Object row) {
         |    return apply((InternalRow) row);
         |  }
         |
         |  public UnsafeRow apply(InternalRow ${ctx.INPUT\_ROW}) {
         |    ${eval.code}
         |    return ${eval.value};
         |  }
         |
         |  ${ctx.declareAddedFunctions()}
         |}
       """.stripMargin

    val code = CodeFormatter.stripOverlappingComments(
      new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
    logDebug(s"code for ${expressions.mkString(",")}:\\n${CodeFormatter.format(code)}")

    val (clazz, \_) = CodeGenerator.compile(code)
    clazz.generate(ctx.references.toArray).asInstanceOf\[UnsafeProjection\]
  }

大家看到了麼?此處有 Java 代碼的模板。Spark 使用的是 codehaus 的 janino 直接生成 Java 字節碼,並由 CLASSLOADER 動態加載。然後由 SparkPlan 中調用執行。所有的表達式直接將 Java 代碼直接生成好,並在一個並行度中執行。

到此,Spark SQL 的整個過程就給大家分析了下。

這其中每個過程都可以用數萬字來描述,所以,後續再挨個組件給大家剖析剖析。我抬頭看了看牆上的掛鐘,已經凌晨 4 點了。我們下期再見。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。