Spark SQL 邏輯計劃原理

Catalyst 優化器是 Spark 引擎中非常重要的組成部分,也是近年來 Spark 社區項目重點投入、並且發展十分迅速的核心模塊,對於 Spark 任務的性能提升起到了關鍵的基礎作用。

我們知道,在 Spark1.6 之前開發人員是通過 Spark 的 RDD 編程接口來實現對大規模數據的分析和處理的,到了 Spark1.6 版本後推出了 DataSet 和 DataFrame 的編程接口,這種數據結構與 RDD 的主要區別在於其攜帶了結構化數據的 Schema 信息,從而可以被 Spark Catalyst 用來做進一步的解析和優化;而 Spark SQL 則是比 DataSet 和 DataFrame 編程接口更爲簡單易用的大數據領域語言,其用戶可以是開發工程師、數據科學家、數據分析師等,並且與其他 SQL 語言類似,可以通過 SQL 引擎將 SQL 預先解析成一棵 AST 抽象語法樹;同時,AST 抽象語法樹、DataSet 及 DataFrame 接下來均會被 Spark Catalyst 優化器轉換成爲 Unresolved LogicalPlan、Resolved LogicalPlan,Physical Plan、以及 Optimized PhysicalPlan,也就是說帶有 schema 信息的 Spark 分佈式數據集都可以從 Spark Catalyst 中受益,這也是 Spark 任務性能得以提升的核心所在。

值得一提的是,在物理計劃樹的生成過程中,首先會將數據源解析成爲 RDD,也即在 Spark SQL 的物理計劃執行過程中所操作的對象實際是 RDD,一條 Spark SQL 在生成最終的物理計劃後仍然會經過前面文章中所提到的生成 DAG、劃分 Stage、並將 taskset 分發到特定的 executor 上運行等一系列的任務調度和執行過程來實現該 Spark SQL 的處理邏輯。

接下來,本文將着重講解 Spark SQL 邏輯計劃的相關實現原理,在後續的文章中會繼續解析 Spark SQL 的物理計劃。

生成 Unresolved LogicalPlan

用戶可以通過 spark-sql 等客戶端來提交 sql 語句,在 sparksession 初始化時通過 BaseSessionStateBuilder 的 build() 方法始化 SparkSqlParser、Analyser 以及 SparkOptimizer 對象等:

def build()SessionState = {
  new SessionState(
    session.sharedState,
    conf,
    experimentalMethods,
    functionRegistry,
    udfRegistration,
    () => catalog,
    sqlParser,
    () => analyzer,
    () => optimizer,
    planner,
    () => streamingQueryManager,
    listenerManager,
    () => resourceLoader,
    createQueryExecution,
    createClone,
    columnarRules,
    queryStagePrepRules)
}

當用戶程序調用 SparkSession 的 sql 接口時即開始瞭解析 sql 語句並執行對數據處理的過程:

def sql(sqlText: String)DataFrame = withActive {
  val tracker = new QueryPlanningTracker
  val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
    sessionState.sqlParser.parsePlan(sqlText)
  }
  Dataset.ofRows(self, plan, tracker)
}

其中通過 AbstractSqlParser 的 parsePlan 方法將 sql 語句轉換成抽象語法樹:

override def parsePlan(sqlText: String)LogicalPlan = parse(sqlText) { parser =>
  astBuilder.visitSingleStatement(parser.singleStatement()) match {
    case plan: LogicalPlan => plan
    case _ =>
      val position = Origin(None, None)
      throw new ParseException(Option(sqlText)"Unsupported SQL statement", position, position)
  }
}

1、從 SqlBaseParser 的 singleStatement() 方法開始基於 ANTLR4 lib 庫來解析 sql 語句中所有的詞法片段,生成一棵 AST 抽象語法樹;

2、訪問 AST 抽象語法樹並生成 Unresolved 邏輯計劃樹:

1)訪問 SingleStatementContext 節點:

SingleStatementContext 是整個抽象語法樹的根節點,因此以 AstBuilder 的 visitSingleStatement 方法爲入口來遍歷抽象語法樹:

override def visitSingleStatement(ctx: SingleStatementContext)LogicalPlan = withOrigin(ctx) {
  visit(ctx.statement).asInstanceOf[LogicalPlan]
}
...
public T visit(ParseTree tree) {
   return tree.accept(this);
}

2)根據訪問者模式執行 SingleStatementContext 節點的 accept 方法:

@Override
public <T> T accept(ParseTreeVisitor<? extends T> visitor) {
   if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitSingleStatement(this);
   else return visitor.visitChildren(this);
}
...
@Override public T visitSingleStatement(SqlBaseParser.SingleStatementContext ctx) { return visitChildren(ctx); }

3)迭代遍歷整棵 AST Tree:

@Override
public T visitChildren(RuleNode node) {
   T result = defaultResult();
   int n = node.getChildCount();
   for (int i=0; i<n; i++) {
      if (!shouldVisitNextChild(node, result)) {
         break;
      }

      ParseTree c = node.getChild(i);
      T childResult = c.accept(this);
      result = aggregateResult(result, childResult);
   }

   return result;
}

根據以上代碼,在遍歷 AST 樹的過程中,會首先解析父節點的所有子節點,並執行子節點上的 accept 方法來進行解析,當所有子節點均解析爲 UnresolvedRelation 或者 Expression 後,將這些結果進行聚合並返回到父節點,由此可見,AST 樹的遍歷所採用的是後序遍歷模式。

接下來以查詢語句中的 QuerySpecificationContext 節點的解析爲例進一步闡述以上過程:

如下爲一條基本的 sql 語句:

select col1 from tabname where col2 > 10

QuerySpecificationContext 節點下會產生用於掃描數據源的 FromClauseContext、過濾條件對應的 BooleanDefaultContext、以及投影時所需的 NamedExpressionSeqContext 節點。

1)FromClauseContext 繼續訪問其子節點,當訪問到 TableINameContext 節點時,訪問到 tableName 的 tocken 時根據表名生成 UnresolvedRelation:

override def visitTableName(ctx: TableNameContext)LogicalPlan = withOrigin(ctx) {
  val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
  val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))
  table.optionalMap(ctx.sample)(withSample)
}

2)BooleanDefaultContext 的子節點中分爲三個分支:代表 Reference 的 ValueExpressionDefaultContext、代表數值的 ValueExpressionDefaultContext、以及代表運算符的 ComparisonContext;

例如遍歷代表數據值 ValueExpressionDefaultContext 及其子節點,直到訪問到 IntegerLiteralContext:

override def visitIntegerLiteral(ctx: IntegerLiteralContext)Literal = withOrigin(ctx) {
  BigDecimal(ctx.getText) match {
    case v if v.isValidInt =>
      Literal(v.intValue)
    case v if v.isValidLong =>
      Literal(v.longValue)
    case v => Literal(v.underlying())
  }
}

而 Literal 的定義如下,是一個葉子類型的 Expression 節點:

case class Literal (value: Any, dataType: DataType) extends LeafExpression

3)NamedExpressionSeqContext 是投影節點,迭代遍歷直到 RegularQuerySpecificationContext 節點,然後通過訪問 withSelectQuerySpecification 方法創建出投影所需的 Project Logical Plan:

override def visitRegularQuerySpecification(
    ctx: RegularQuerySpecificationContext)LogicalPlan = withOrigin(ctx) {
  val from = OneRowRelation().optional(ctx.fromClause) {
    visitFromClause(ctx.fromClause)
  }
  withSelectQuerySpecification(
    ctx,
    ctx.selectClause,
    ctx.lateralView,
    ctx.whereClause,
    ctx.aggregationClause,
    ctx.havingClause,
    ctx.windowClause,
    from
  )
}
...
def createProject() = if (namedExpressions.nonEmpty) {
  Project(namedExpressions, withFilter)
} else {
  withFilter
}

總結一下以上處理過程中所涉及的類之間的關係,如下圖所示:

類圖

生成 Resolved LogicalPlan

Spark Analyser

在 SparkSession 的 sql 方法中,對 sql 語句進行過 Parser 解析並生成 Unresolved LogicalPlan 之後則通過執行 Dataset.ofRows(self, plan, tracker) 繼續進行 catalog 綁定,數據源綁定的過程如下:

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
  : DataFrame = sparkSession.withActive {
  val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
  qe.assertAnalyzed()
  new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
}
...
def assertAnalyzed()Unit = analyzed

由如下實現邏輯可見, analyzed 變量是通過懶加載方式初始化的,通過該變量的初始方法可見 Spark 的 catalog 實現邏輯主要通過 Analyser 類來實現的:

lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
  // We can't clone `logical` here, which will reset the `_analyzed` flag.
  sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}

其中,executeAndCheck 方法的執行是通過 Analyzer 的父類 RuleExecutor 的 execute 方法來實現的:

def execute(plan: TreeType)TreeType = {
...
  batches.foreach { batch =>
    val batchStartPlan = curPlan
    var iteration = 1
    var lastPlan = curPlan
    var continue = true

    // Run until fix point (or the max number of iterations as specified in the strategy.
    while (continue) {
      curPlan = batch.rules.foldLeft(curPlan) {
        case (plan, rule) =>
          val startTime = System.nanoTime()
          val result = rule(plan)
          val runTime = System.nanoTime() - startTime
          val effective = !result.fastEquals(plan)

          if (effective) {
            queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
            queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
            planChangeLogger.logRule(rule.ruleName, plan, result)
          }
          ...
          result
      }
      iteration += 1
      if (iteration > batch.strategy.maxIterations) {
        // Only log if this is a rule that is supposed to run more than once.
        if (iteration != 2) {
          val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
            "."
          } else {
            s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
          }
          val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
            s"$endingMsg"
          if (Utils.isTesting || batch.strategy.errorOnExceed) {
            throw new TreeNodeException(curPlan, message, null)
          } else {
            logWarning(message)
          }
        }
        // Check idempotence for Once batches.
        if (batch.strategy == Once &&
          Utils.isTesting && !blacklistedOnceBatches.contains(batch.name)) {
          checkBatchIdempotence(batch, curPlan)
        }
        continue = false
      }

      if (curPlan.fastEquals(lastPlan)) {
        logTrace(
          s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
        continue = false
      }
      lastPlan = curPlan
    }

    planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
  }
  planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)

  curPlan
}

如上代碼的主要處理過程如下:

1、遍歷的 Analyzer 類中的 batches 列表:

通過 batches 方法獲取所有的 catalog 綁定相關的規則,在 Analyzer 中包括 Substitution、Hints、Resolution、UDF、Subquery 等幾個規則組;

以較爲常見的 "Resolution" 規則組爲例,其具有非常多的規則用於解析函數、Namespace、數據表、視圖、列等信息,當然用戶也可以子定義相關規則:

Batch("Resolution", fixedPoint,
  ResolveTableValuedFunctions ::
  ResolveNamespace(catalogManager) ::
  new ResolveCatalogs(catalogManager) ::
  ResolveInsertInto ::
  ResolveRelations ::
  ResolveTables ::
  ResolveReferences ::
  ResolveCreateNamedStruct ::
  ResolveDeserializer ::
  ResolveNewInstance ::
  ResolveUpCast ::
  ResolveGroupingAnalytics ::
  ResolvePivot ::
  ResolveOrdinalInOrderByAndGroupBy ::
  ResolveAggAliasInGroupBy ::
  ResolveMissingReferences ::
  ...

其中,Batch 類的定義如下,包括 Batch 名稱、循環執行策略、具體的規則組集合,循環執行策略 Strategy 又分爲 Once 和 FixedPoint 兩種,即僅執行一次和固定次數:

protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

2、將每個 Batch 中所有的規則 Rule 對象實施於該 Unsolved LogicalPlan,並且該 Batch 中規則可能要執行多輪,直到執行的批數等於 batch.strategy.maxIterations 或者 logicalplan 與上個批次的結果比沒有變化,則退出執行;

其中在 Spark 中的定義如下,在 spark3.0 中默認可最大循環 100 次:

protected def fixedPoint =
  FixedPoint(
    conf.analyzerMaxIterations,
    errorOnExceed = true,
    maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)
...
val ANALYZER_MAX_ITERATIONS = buildConf("spark.sql.analyzer.maxIterations")
    .internal()
    .doc("The max number of iterations the analyzer runs.")
    .version("3.0.0")
    .intConf
    .createWithDefault(100)

接下來以將 ResolveRelations(解析數據表或者視圖)規則應用於 Unresolved LogicalPlan 的解析過程爲例, 支持解析 UnresolvedRelation、UnresolvedTable、UnresolvedTableOrView 等多種未解析的數據源:

def apply(plan: LogicalPlan)LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
 ...
  case u: UnresolvedRelation =>
    lookupRelation(u.multipartIdentifier).map(resolveViews).getOrElse(u)

  case u @ UnresolvedTable(identifier) =>
    lookupTableOrView(identifier).map {
      case v: ResolvedView =>
        u.failAnalysis(s"${v.identifier.quoted} is a view not table.")
      case table => table
    }.getOrElse(u)

  case u @ UnresolvedTableOrView(identifier) =>
    lookupTableOrView(identifier).getOrElse(u)
}

當解析對象爲 UnresolvedRelation 實例時,調用 lookupRelation 方法來對其進行解析,通過 SessionCatalog 或者擴展的 CatalogPlugin 來獲取數據源的元數據,並生成 Resolved LogicalPlan:

private def lookupRelation(identifier: Seq[String]): Option[LogicalPlan] = {
  expandRelationName(identifier) match {
    case SessionCatalogAndIdentifier(catalog, ident) =>
      lazy val loaded = CatalogV2Util.loadTable(catalog, ident).map {
        case v1Table: V1Table =>
          v1SessionCatalog.getRelation(v1Table.v1Table)
        case table =>
          SubqueryAlias(
            catalog.name +: ident.asMultipartIdentifier,
            DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
      }
    ...

最常見的是 SessionCatalog,作爲 SparkSession 級別 catalog 接口對象,其定義如下,包括 ExternalCatalog、GlobalTempViewManager、FunctionRegistry、SQLConf、Hadoop 的 Configuration、Parser、FunctionResourceLoader 對象;其中,ExternalCatalog 有兩個主要的實現類:HiveExternalCatalog 和 InMemoryCatalog,而 HiveExternalCatalog 則主要應用於企業級的業務場景中:

class SessionCatalog(
    externalCatalogBuilder: () => ExternalCatalog,
    globalTempViewManagerBuilder: () => GlobalTempViewManager,
    functionRegistry: FunctionRegistry,
    conf: SQLConf,
    hadoopConf: Configuration,
    parser: ParserInterface,
    functionResourceLoader: FunctionResourceLoader)

如果採用默認的 SessionCatalog,當需要獲取數據表時則通過 ExternalCatalog 實例調用其對應的接口來實現:

override def loadTable(ident: Identifier)Table = {
  val catalogTable = try {
    catalog.getTableMetadata(ident.asTableIdentifier)
  } catch {
    case _: NoSuchTableException =>
      throw new NoSuchTableException(ident)
  }

  V1Table(catalogTable)
}
...
def getTableMetadata(name: TableIdentifier)CatalogTable = {
    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
    val table = formatTableName(name.table)
    requireDbExists(db)
    requireTableExists(TableIdentifier(table, Some(db)))
    externalCatalog.getTable(db, table)
 }

接下來如果採用 ExternalCatalog 接口的實現類 HiveExternalCatalog 的情況下,則通過 HiveClientImpl 類從 Hive 的 metadata 中類獲取用戶表的元數據相關信息:

private def getRawTableOption(dbName: String, tableName: String): Option[HiveTable] = {
  Option(client.getTable(dbName, tableName, false /* do not throw exception */))
}

另外,如需擴展的 catalog 範圍可通過實現 CatalogPlugin 接口、並且配置 “spark.sql.catalog.spark_catalog” 參數來實現,例如在 iceberg 數據湖的實現中通過自定義其 catalog 來實現其個性化的邏輯:

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog

3、返回解析後的 Resolved LogicalPlan。

以上處理邏輯中所涉及的主要的類之間的關係如下所示:

接下來仍然以前面的 SQL 語句(select col1 from tabname where col2 > 10)爲例,簡要闡述如何將一個 Unresolved LogicalPlan 解析成爲 Analyzed LogicalPlan:

1、根據 Analyzer 的解析規則,UnResolvedRelation 節點可以應用到 ResolveRelations 規則,通過 CatalogManger 獲取數據源中表的信息,得到 Relation 的相關列的信息並加上標號,同時創建一個針對數據表的 SubqueryAlias 節點;

2、針對過濾條件 col2>10 的過濾條件,針對列 UnresolvedAttribute 可以適用到 ResolveReference 規則,根據第 1 步中得到的列信息可以進行解析;數字 10 可以應用到 ImplicitTypeCasts 規則對該數字匹配最合適的數據類型;

3、針對 Project 節點,接下來在進行下一輪解析,再次匹配到 ResolveReference 規則對投影列進行解析,從而將整棵樹解析爲 Resolved LogicalPlan。

生成 Optimized LogicalPlan

得到 Resolved LogicalPlan 之後,爲了使 SQL 語句的執行性能更優,則需要根據一些規則進一步優化邏輯計劃樹,生成 Optimized LogicalPlan。

本文采用的是 Spark 3.0 的源碼,生成 Optimized LogicalPlan 是通過懶加載的方式被調用的,並且 Optimizer 類與 Analyzer 類一樣繼承了 RuleExecutor 類,所有基於規則 (RBO) 的優化實際都是通過 RuleExecutor 類來執行,同樣也是將所有規則構建爲多個批次,並且將所有批次中規則應用於 Analyzed LogicalPlan,直到樹不再改變或者執行優化的循環次數超過最大限制(spark.sql.optimizer.maxIterations,默認 100):

lazy val optimizedPlan: LogicalPlan = executePhase(QueryPlanningTracker.OPTIMIZATION) {
  // clone the plan to avoid sharing the plan instance between different stages like analyzing,
  // optimizing and planning.
  val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
  // We do not want optimized plans to be re-analyzed as literals that have been constant folded
  // and such can cause issues during analysis. While `clone` should maintain the `analyzed` state
  // of the LogicalPlan, we set the plan as analyzed here as well out of paranoia.
  plan.setAnalyzed()
  plan
}
...
def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker)TreeType = {
    QueryPlanningTracker.withTracker(tracker) {
      execute(plan)
    }
}

邏輯計劃優化規則仍然又多個 Batch 組成,每個 Batch 中包含多個具體的 Rule 並且可以執行一次或者固定次數。其中比較常用的優化規則有:謂詞下推、常量累加、列剪枝等幾種。

謂詞下推將盡可能使得謂詞計算靠近數據源,根據不同的場景有 LimitPushDown、PushProjectionThroughUnion、PushDownPredicates 等多種實現,  PushDownPredicates 又包含 PushPredicateThroughNonJoin 和 PushPredicateThroughJoin;

其中,PushPredicateThroughJoin 可實現將謂詞計算下推至 join 算子的下面,從而可以提升數據表之間的 join 計算過程中所帶來的網絡、內存以及 IO 等性能開銷:

val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
  // push the where condition down into join filter
  case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition, hint)) =>
    val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
      split(splitConjunctivePredicates(filterCondition), left, right)
    joinType match {
      case _: InnerLike =>
        // push down the single side `where` condition into respective sides
        val newLeft = leftFilterConditions.
          reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
        val newRight = rightFilterConditions.
          reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
        val (newJoinConditions, others) =
          commonFilterCondition.partition(canEvaluateWithinJoin)
        val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And)

        val join = Join(newLeft, newRight, joinType, newJoinCond, hint)
        if (others.nonEmpty) {
          Filter(others.reduceLeft(And), join)
        } else {
          join
        }
      case RightOuter =>
        // push down the right side only `where` condition

常量摺疊是通過 ConstantFolding 規則來實現的,如果表達式中的算子是可以摺疊的則在該階段直接生成計算結果,以避免在實際的 sql 執行過程中產生逐行計算,從而可以降低 CPU 的計算開銷:

object ConstantFolding extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan)LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsDown {
      // Skip redundant folding of literals. This rule is technically not necessary. Placing this
      // here avoids running the next rule for Literal values, which would create a new Literal
      // object and running eval unnecessarily.
      case l: Literal => l

      // Fold expressions that are foldable.
      case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType)
    }
  }
}

列剪枝規則通過 ColumnPruning 規則來實現,去掉不需要處理的列,可避免從數據源讀取較多的數據列、將不需要的列加載至內存中計算計算計算中、以及返回不需要數據(想象一下大寬表的情況),從而獲得較大的性能收益:

object ColumnPruning extends Rule[LogicalPlan] 
def apply(plan: LogicalPlan): LogicalPlan
prunedChild(c: LogicalPlan, allReferences: AttributeSet):LogicalPlan=
    if (!c.outputSet.subsetOf(allReferences)) {

Optimizer 所涉及的主要類的關聯關係如下圖所示:

spark_optimizer

當所有優化規則完成對於 Aanalyzed LogicalPlan 的應用則可生成 Optimized LogicalPlan。

本文重點講解了 Spark SQL 解析爲 AST 抽象語法樹、生成 Unresolved LogicalPlan、生成 Resolved LogicalPlan 以及 Optimized LogicalPlan 的過程,爲接下來進一步生成物理計劃 Spark Plan 做好了準備。

作者簡介

焦媛,負責民生銀行 Hadoop 大數據平臺的生產運維工作,以及 HDFS 和 Spark 周邊開源產品的技術支持,並致力於 Spark 雲原生技術的支持與推廣。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/qQGPsnBG-wUsBjheduYdUA