擴展 Spark SQL 解析

大家好久不見了,最近生活發生了很多變故,同時我也大病了一場,希望一切都儘快好起來吧。今天跟大家分享下 Spark 吧,談談如何修改 Spark SQL 解析,讓其更符合你的業務邏輯。好,我們開始吧...

理論基礎

ANTLR

Antlr4 是一款開源的語法分析器生成工具,能夠根據語法規則文件生成對應的語法分析器。現在很多流行的應用和開源項目裏都有使用,比如 Hadoop、Hive 以及 Spark 等都在使用 ANTLR 來做語法分析。

ANTLR 語法識別一般分爲二個階段:

1. 詞法分析階段 (lexical analysis)

對應的分析程序叫做 lexer ,負責將符號(token)分組成符號類(token class or token type)

2. 解析階段

根據詞法,構建出一棵分析樹(parse tree)或叫語法樹(syntax tree)

ANTLR 的語法文件,非常像電路圖,從入口到出口,每個 Token 就像電阻,連接線就是短路點。

語法文件(*.g4)

上面截圖對應的語法文件片段,定義了兩部分語法,一部分是顯示錶達式和賦值,另外一部分是運算和表達式定義。

stat:   expr NEWLINE               # printExpr
  |   ID '=' expr NEWLINE         # assign
  |   NEWLINE                     # blank
  ;

expr:   expr op=('*'|'/') expr     # MulDiv
  |   expr op=('+'|'-') expr     # AddSub
  |   INT                         # int
  |   ID                         # id
  |   '(' expr ')'               # parens
  ;

接下來,加上定義詞法部分,就能形成完整的語法文件。

完整語法文件:

grammar LabeledExpr; // rename to distinguish from Expr.g4

prog:   stat+ ;

stat:   expr NEWLINE               # printExpr
  |   ID '=' expr NEWLINE         # assign
  |   NEWLINE                     # blank
  ;

expr:   expr op=('*'|'/') expr     # MulDiv
  |   expr op=('+'|'-') expr     # AddSub
  |   INT                         # int
  |   ID                         # id
  |   '(' expr ')'               # parens
  ;

MUL :   '*' ; // assigns token name to '*' used above in grammar
DIV :   '/' ;
ADD :   '+' ;
SUB :   '-' ;
ID :   [a-zA-Z]+ ;     // match identifiers
INT :   [0-9]+ ;         // match integers
NEWLINE:'\r'? '\n' ;     // return newlines to parser (is end-statement signal)
WS :   [ \t]+ -> skip ; // toss out whitespace

SqlBase.g4

Spark 的語法文件,在 sql 下的 catalyst 模塊裏,如下圖:

擴展語法定義

一條正常 SQL,例如 Select t.id,t.name from t  , 現在我們爲其添加一個 JACKY 表達式,令其出現在 Select 後面 ,形成一條語句

Select t.id,t.name JACKY(2) from t

我們先看一下正常的語法規則:

現在我們添加一個 jackyExpression

jackExpression 本身的規則就是 JACKY 加上括號包裹的一個數字

將 JACKY 添加爲 token

修改語法文件 如下:

jackyExpression
  : JACKY'(' number ')'
  //expression
  ;

namedExpression
  : expression (AS? (identifier | identifierList))?
  ;

namedExpressionSeq
  : namedExpression (',' namedExpression | jackyExpression )*
  ;

擴展邏輯計劃

經過上面的修改,就可以測試語法規則,是不是符合預期了,下面是一顆解析樹, 我們可以看到 jackyExpression 已經可以正常解析了。

Spark 執行流程

這裏引用一張經典的 Spark SQL 架構圖

我們輸入的 SQL 語句 首先被解析成 Unresolved Logical Pan ,對應的是

給邏輯計劃添加遍歷方法:

 override def visitJackyExpression(ctx: JackyExpressionContext): String = withOrigin(ctx) {
   println("this is astbuilder jacky = "+ctx.number().getText)

   this.jacky = ctx.number().getText.toInt

   ctx.number().getText
}

再處理 namedExpression 的時候,添加 jackyExpression 處理

   // Expressions.
   val expressions = Option(namedExpressionSeq).toSeq
    .flatMap(_.namedExpression.asScala)
    .map(typedVisit[Expression])


//jackyExpression 處理
   if(namedExpressionSeq().jackyExpression()!=null && namedExpressionSeq().jackyExpression().size() > 0){
     visitJackyExpression(namedExpressionSeq().jackyExpression().get(0))
  }

好了,到這裏從邏輯計劃處理就完成了,有了邏輯計劃,就可以在後續物理計劃中添加相應的處理邏輯就可以了(還沒研究明白... Orz)。

測試

測試用例

public class Case4 {
   public static void main(String[] args) {
       CharStream ca = CharStreams.fromString("SELECT `b`.`id`,`b`.`class` JACKY(2) FROM `b` LIMIT 10");
       SqlBaseLexer lexer = new SqlBaseLexer(ca);
       SqlBaseParser sqlBaseParser = new SqlBaseParser(new CommonTokenStream(lexer));
       ParseTree parseTree = sqlBaseParser.singleStatement();

       AstBuilder astBuilder = new AstBuilder();
       astBuilder.visit(parseTree);
       System.out.println(parseTree.toStringTree(sqlBaseParser));
       System.out.println(astBuilder.jacky());
  }
}

執行結果

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QjDHbcoIn87DwNIUrmXvug