Spark SQL 字段血緣在 vivo 互聯網的實踐

vivo 互聯網服務器團隊 - Hao Guangshi

一、背景

字段血緣是在表處理的過程中將字段的處理過程保留下來。爲什麼會需要字段血緣呢?

有了字段間的血緣關係,便可以知道數據的來源去處,以及字段之間的轉換關係,這樣對數據的質量,治理有很大的幫助。 

Spark SQL 相對於 Hive 來說通常情況下效率會比較高,對於運行時間、資源的使用上面等都會有較大的收益。

平臺計劃將 Hive 任務遷移到 Spark SQL 上,同時也需要實現字段血緣的功能。

二、前期調研

開發前我們做了很多相關調研,從中得知 Spark 是支持擴展的:允許用戶對 Spark SQL 的 SQL 解析、邏輯計劃的分析和檢查、邏輯計劃的優化、物理計劃的形成等進行擴展。

該方案可行,且對 Spark 的源碼沒有改動,代價也比較小,確定使用該方案。

三、Spark SQL 擴展

3.1 Spark 可擴展的內容

 SparkSessionExtensions 是比較重要的一個類,其中定義了注入規則的方法,現在支持以下內容:

在以上六種可以用戶自定義的地方,我們選擇了【Check Analysis Rules】。因爲該檢查規則在方法調用的時候是不需要有返回值的,也就意味着不需要對當前遍歷的邏輯計劃樹進行修改,這正是我們需要的。

而【Analyzer Rules】、【Optimizer Rules】則需要對當前的邏輯計劃進行修改,使得我們難以迭代整個樹,難以得到我們想要的結果。

3.2 實現自己的擴展

class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {
  override def apply(spark: SparkSessionExtensions): Unit = {
    //字段血緣
    spark.injectCheckRule(FieldLineageCheckRuleV3)
    //sql解析器
    spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }
  }
}

上面按照這種方式實現擴展,並在 apply 方法中把自己需要的規則注入到 SparkSessionExtensions 即可,除了以上四種可以注入的以外還有其他的規則。要讓 ExtralSparkExtension 起到作用的話我們需要在 spark-default.conf 下配置

spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension

在啓動 Spark 任務的時候即可生效。

注意到我們也實現了一個自定義的 SQL 解析器,其實該解析器並沒有做太多的事情。只是在判斷如果該語句包含 insert 的時候就將 SQLText(SQL 語句)設置到一個爲 FIELD_LINE_AGE_SQL,之所以將 SQLText 放到 FIELD_LINE_AGE_SQL 裏面。因爲在 DheckRule 裏面是拿不到 SparkPlan 的我們需要對 SQL 再次解析拿到 SprkPlan,而 FieldLineageCheckRuleV3 的實現也特別簡單,重要的在另一個線程實現裏面。

這裏我們只關注了 insert 語句,因爲插入語句裏面有從某些個表裏面輸入然後寫入到某個表。

class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{
  override def parsePlan(sqlText: String): LogicalPlan = {
    val lineAgeEnabled = SparkSession.getActiveSession
      .get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
    logDebug(s"SqlText: $sqlText")
    if(sqlText.toLowerCase().contains("insert")){
      if(lineAgeEnabled){
        if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
          //線程本地變量在這裏
          FIELD_LINE_AGE_SQL.set(sqlText)
        }
        FIELD_LINE_AGE_SQL_COULD_SET.remove()
      }
    }
    delegate.parsePlan(sqlText)
  }
  //調用原始的sqlparser
  override def parseExpression(sqlText: String): Expression = {
    delegate.parseExpression(sqlText)
  }
  //調用原始的sqlparser
  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
    delegate.parseTableIdentifier(sqlText)
  }
  //調用原始的sqlparser
  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
    delegate.parseFunctionIdentifier(sqlText)
  }
  //調用原始的sqlparser
  override def parseTableSchema(sqlText: String): StructType = {
    delegate.parseTableSchema(sqlText)
  }
  //調用原始的sqlparser
  override def parseDataType(sqlText: String): DataType = {
    delegate.parseDataType(sqlText)
  }
}

3.3 擴展的規則類

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {
  val executor: ThreadPoolExecutor =
    ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
  override def apply(plan: LogicalPlan): Unit = {
    val sql = FIELD_LINE_AGE_SQL.get
    FIELD_LINE_AGE_SQL.remove()
    if(sql != null){
      //這裏我們拿到sql然後啓動一個線程做剩餘的解析任務
      val task = new FieldLineageRunnableV3(sparkSession,sql)
      executor.execute(task)
    }
  }
}

很簡單,我們只是拿到了 SQL 然後便啓動了一個線程去得到 SparkPlan,實際邏輯在

FieldLineageRunnableV3。

3.4 具體的實現方法

3.4.1 得到 SparkPlan

我們在 run 方法中得到 SparkPlan:

override def run(): Unit = {
  val parser = sparkSession.sessionState.sqlParser
  val analyzer = sparkSession.sessionState.analyzer
  val optimizer = sparkSession.sessionState.optimizer
  val planner = sparkSession.sessionState.planner
      ............
  val newPlan = parser.parsePlan(sql)
  PASS_TABLE_AUTH.set(true)
  val analyzedPlan = analyzer.executeAndCheck(newPlan)
  val optimizerPlan = optimizer.execute(analyzedPlan)
  //得到sparkPlan
  val sparkPlan = planner.plan(optimizerPlan).next()
  ...............
if(targetTable != null){
  val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
  val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
  //projection
  projectionLineAge(levelProject, sparkPlan.child)
  //predication
  predicationLineAge(predicates, sparkPlan.child)
  ...............

爲什麼要使用 SparkPlan 呢?當初我們考慮的時候,物理計劃拿取字段關係的時候是比較準的,且鏈路比較短也更直接。

在這裏補充一下 Spark SQL 解析的過程如下:

圖片

經過 SqlParser 後會得到邏輯計劃,此時表名、函數等都沒有解析,還不能執行;經過 Analyzer 會分析一些綁定信息,例如表驗證、字段信息、函數信息;經過 Optimizer 後邏輯計劃會根據既定規則被優化,這裏的規則是 RBO,當然 Spark 還支持 CBO 的優化;經過 SparkPlanner 後就成了可執行的物理計劃。

我們看一個邏輯計劃與物理計劃對比的例子:

一個 SQL 語句:

select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3

邏輯計劃是這樣的:

圖片

物理計劃是這樣的:

圖片

顯然簡化了很多。

得到 SparkPlan 後,我們就可以根據不同的 SparkPlan 節點做迭代處理。

我們將字段血緣分爲兩種類型:projection(select 查詢字段)、predication(wehre 查詢條件)。

這兩種是一種點對點的關係, 即從原始表的字段生成目標表的字段的對應關係。

想象一個查詢是一棵樹,那麼迭代關係會如下從樹的頂端開始迭代,直到樹的葉子節點,葉子節點即爲原始表:

圖片

那麼我們迭代查詢的結果應該爲

id ->tab1.id , 

name->tab1.name,tabb2.name,

age→tabb2.age。

注意到有該變量

 val levelProject = new ArrayBuffer

ArrayBuffer[NameExpressionHolder],通過 projecti-onLineAge 迭代後 levelProject 存儲了頂層 id,name,age 對應的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

當然也不是簡單的遞歸迭代,還需要考慮特殊情況例如:Join、ExplandExec、Aggregate、Explode、GenerateExec 等都需要特殊考慮。

例子及效果:

SQL:

with A as (select id,name,age from tab1 where id > 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
   select C.id,concat(C.name,B.name) as name, B.age from
     B,C where C.id = B.id

效果:

{
  "edges": [
    {
      "sources": [
        3
      ],
      "targets": [
        0
      ],
      "expression": "id",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        4,
        7
      ],
      "targets": [
        1
      ],
      "expression": "name",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        5
      ],
      "targets": [
        2
      ],
      "expression": "age",
      "edgeType": "PROJECTION"
    },
    {
      "sources": [
        6,
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "INNER",
      "edgeType": "PREDICATE"
    },
    {
      "sources": [
        6,
        5
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
      "edgeType": "PREDICATE"
    },
    {
      "sources": [
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
      "edgeType": "PREDICATE"
    }
  ],
  "vertices": [
    {
      "id": 0,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.id"
    },
    {
      "id": 1,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.name"
    },
    {
      "id": 2,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.age"
    },
    {
      "id": 3,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.id"
    },
    {
      "id": 4,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.name"
    },
    {
      "id": 5,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.age"
    },
    {
      "id": 6,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.id"
    },
    {
      "id": 7,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.name"
    }
  ]
}

四、總結

在 Spark SQL 的字段血緣實現中,我們通過其自擴展,首先拿到了 insert 語句,在我們自己的檢查規則中拿到

 SQL 語句,通過 SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最終得到了物理計劃。

我們通過迭代物理計劃,根據不同執行計劃做對應的轉換,然後就得到了字段之間的對應關係。當前的實現是比較簡單的,字段之間是直線的對應關係,中間過程被忽略,如果想實現字段的轉換的整個過程也是沒有問題的。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ZTb_6Gqj2GEuYjHu3ZbytQ