Spark SQL执行计划到RDD全流程记录

Source

目录

0、样例说明

1、解析词义,语义,生成语法树

1.1、概念

1.2、根据样例跟踪Spark代码

2、Unresolved Logical Plan

3、Analyzed Logical Plan

源码

SessionCatalog

Analyzer Rule Batch

对比

4、Optimized Logical Plan

5、Physical Plan

SparkPlan 


记录Spark SQL生成执行计划的全流程和代码跟踪。Spark版本是2.3.2。

 上图流程描述了Spark SQL 怎么转成Spark计算框架可以执行的分布式模型,下面结合一个样例,跟踪每个步骤。

0、样例说明

SQL样例:

select 
    t1.*,
    t2.id
from 
    test.tb_hive_test t1
join 
    test.test t2
on 
    t1.a == t2.name and imp_date == '20221216'

test.tb_hive_test是分区表,test.test是非分区表,两张表的建表语句:

CREATE TABLE test.tb_hive_test(
    a string, 
    b string
)
PARTITIONED BY (imp_date string COMMENT '分区时间')
Stored as ORC


CREATE TABLE test.test(
    id int, 
    name string,
    create_time timestamp
)
Stored as ORC

1、解析词义,语义,生成语法树

1.1、概念

SparK基于ANTLR语法解析SQL,ANTLR是可以根据输入自动生成语法树并可视化的显示出来的开源语法分析器。 ANTLR 主要包含词法分析器,语法分析器和树分析器。

  • 词法分析器又称 Scanner、Lexer 或 Tokenizer。词法分析器的工作是分析量化那些本来毫无意义的字符流, 抽取出一些单词,例如关键字(例如SELECT)、标识符(例如STRING)、符号(例如=)和操作符(例如UNION) 供语法分析器使用。
  • 语法分析器又称编译器。在分析字符流的时候,词法分析器只关注单个词语,不关注上下文。语法分析器则将收到的 Token 组织起来,并转换成为目标语言语法定义所允许的序列。
  • 树分析器可以用于对语法分析生成的抽象语法树进行遍历,并能执行一些相关的操作。

1.2、Spark源码分析

当提交上述SQL是,会调用该方法:

package org.apache.spark.sql

class SparkSession private(...){
    //sqlText为输入的SQL
    def sql(sqlText: String): DataFrame = {
        Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
      }
}

sessionState.sqlParser.parsePlan又会调用AbstractSqlParser中的parse方法解析SQL:

package org.apache.spark.sql.catalyst.parser


abstract class AbstractSqlParser extends ParserInterface with Logging {
    //1、调用parse函数
    override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
        //2、astBuilder遍历语法树
        astBuilder.visitSingleStatement(parser.singleStatement()) match {
          case plan: LogicalPlan => plan
          case _ =>
            val position = Origin(None, None)
            throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
        }
    }


    protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
        logDebug(s"Parsing command: $command")
        //词法分析器
        val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
        lexer.removeErrorListeners()
        lexer.addErrorListener(ParseErrorListener)

      
        val tokenStream = new CommonTokenStream(lexer)
        val parser = new SqlBaseParser(tokenStream)  //语法分析器
        parser.addParseListener(PostProcessor)
        parser.removeErrorListeners()
        parser.addErrorListener(ParseErrorListener)
        ...
    }
}

debug可以看到,执行到这里,sql被拆解成了很多类型的**Context

这些Context类都是根据ANTLR语法去解析SqlBase.g4这个DSL脚本,然后生成的,比如下面这个querySpecification就会生成QuerySpecificationContext类,里面就能通过各类参数记录下Select开头的某类SQL的词义和值。

 把SQL解析成AST语法树之后,就可以开始构建逻辑执行计划。

2、Unresolved Logical Plan

回到上面得parsePlan函数,第一步生成语法树之后,第二步是使用 AstBuilder (树遍历)将语法

树转换成 LogicalPlan。这个 LogicalPlan 也被称为 Unresolved LogicalPlan。

package org.apache.spark.sql.catalyst.parser


abstract class AbstractSqlParser extends ParserInterface with Logging {
    //1、调用parse函数
    override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
        //2、astBuilder遍历语法树
        astBuilder.visitSingleStatement(parser.singleStatement()) match {
          case plan: LogicalPlan => plan
          case _ =>
            val position = Origin(None, None)
            throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
        }
    }
}

从代码上看,第二步执行完成后,应该得到的是如下的执行计划 

但是从Spark history看,logical plan如下,这个是spark自己的优化:

3、Analyzed Logical Plan

源码

Unresolved LogicalPlan 通过绑定catalog元数据,得到Analyzed Logical Plan。这一步骤主要通过Analyzer来实现。先通过源码跟踪,回到第一步:

package org.apache.spark.sql

class SparkSession private(...){
    //sqlText为输入的SQL
    def sql(sqlText: String): DataFrame = {
        Dataset.ofRows(
            self, 
            //1、得到Unresolved Logical Plan
            sessionState.sqlParser.parsePlan(sqlText) 
        )
      }
}

private[sql] object Dataset {

    def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
        //2、构造一个QueryExecution
        val qe = sparkSession.sessionState.executePlan(logicalPlan) 
        qe.assertAnalyzed()  //3、analyzer
        new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
    }
}


package org.apache.spark.sql.execution
    class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
        def assertAnalyzed(): Unit = analyzed
        
        lazy val analyzed: LogicalPlan = {
            SparkSession.setActiveSession(sparkSession)
            //3.1、对逻辑计划进行关联分析,得到分析后的逻辑计划
            sparkSession.sessionState.analyzer.executeAndCheck(logical) 
      }
}

private[sql] class SessionState(
    sharedState: SharedState,
    val conf: SQLConf,
    val experimentalMethods: ExperimentalMethods,
    val functionRegistry: FunctionRegistry,
    val udfRegistration: UDFRegistration,
    catalogBuilder: () => SessionCatalog,
    val sqlParser: ParserInterface,
    analyzerBuilder: () => Analyzer,
    optimizerBuilder: () => Optimizer,
    val planner: SparkPlanner,
    val streamingQueryManager: StreamingQueryManager,
    val listenerManager: ExecutionListenerManager,
    resourceLoaderBuilder: () => SessionResourceLoader,
    createQueryExecution: LogicalPlan => QueryExecution,
    createClone: (SparkSession, SessionState) => SessionState) {

然后看下Analyzer源码,构造Analyzer对象的第一个入参是SessionCatalog。

package org.apache.spark.sql.catalyst.analysis

class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {

    //1、analyzer调用入口参数
    def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
        val analyzed = execute(plan) //2、对逻辑执行计划进行遍历
        try {
          checkAnalysis(analyzed)
          EliminateBarriers(analyzed)
        } catch {
            case e: AnalysisException =>
            val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
            ae.setStackTrace(e.getStackTrace)
            throw ae
        }
      }



package org.apache.spark.sql.catalyst.rules
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {

    def execute(plan: TreeType): TreeType = {
        var curPlan = plan
        val queryExecutionMetrics = RuleExecutor.queryExecutionMeter

        //2、对逻辑执行计划进行遍历
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true

          // Run until fix point (or the max number of iterations as specified in the strategy.
          while (continue) {
              curPlan = batch.rules.foldLeft(curPlan) {
                case (plan, rule) => 
                    queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
                    queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)

              }
           }    
          ...
        }
    }
}



package org.apache.spark.sql.catalyst.analysis
class Analyzer...{
    lazy val batches: Seq[Batch] = Seq(
      ...
      Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ...
     )
}

SessionCatalog

在 Spark 2.x 中,Spark SQL中的 Catalog 体系实现以 SessionCatalog 为主体,SessionCatalog起到了一个代理的作用,对底层的元数据信息、临时表信息、视图信息和函数信息进行了封装,用来管理数据库(Databases)、数据表(Tables)、数据分区(Partitions)和函数(Functions)的接口。

SessionCatalog 的主体入参主要分为两类:ExternalCatalog和InMemoryCatalog,ExternalCatalog利用Hive原数据库来实现持久化的管理,在生产环境中广泛应用,InMemoryCatalog将信息存储在内存中,一般用于测试或比较简单的SQL处理;

Analyzer Rule Batch

Analyzer 会使用事先定义好的 Rule 以及 SessionCatalog 对 Unresolved LogicalPlan 进行转换操作;多个性质类似的 Rule 组成一个 Batch,例如下面的Resolution Batch,RuleExecutor 执行时,是先按Batch顺序,串行执行第一个Batch的Rule列表,再执行第二个Batch的Rule列表。

package org.apache.spark.sql.catalyst.analysis

class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
//0、Analyzer继承RuleExecutor,最终的转换是在RuleExecutor实现
extends RuleExecutor[LogicalPlan] with CheckAnalysis {

    //1、analyzer调用入口参数
    def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
        val analyzed = execute(plan) //2、最终跳转到RuleExecutor中的execute
        try {
          checkAnalysis(analyzed)
          EliminateBarriers(analyzed)
        } catch {
            case e: AnalysisException =>
            val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
            ae.setStackTrace(e.getStackTrace)
            throw ae
        }
      }



package org.apache.spark.sql.catalyst.rules
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
    def execute(plan: TreeType): TreeType = {
        var curPlan = plan
        val queryExecutionMetrics = RuleExecutor.queryExecutionMeter

        //3、对预定义好的rule进行遍历
        //根据rule将UnresolvedLogicalPlan转换成RelateLogicalPlan
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true

          // Run until fix point (or the max number of iterations as specified in the strategy.
          while (continue) {
              curPlan = batch.rules.foldLeft(curPlan) {
                ...
              }
          }    
       }
    }
}


//4、rule在Analyser已经预定义好
package org.apache.spark.sql.catalyst.analysis
class Analyzer...{
    lazy val batches: Seq[Batch] = Seq(
      ...
      Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions :: //解析表的函数
      ResolveRelations :: //解析表或者视图的函数
      ResolveReferences :: //解析列的函数
      ...
     )
}

对比

下面是Unresolved LogicalPlan和Analyzed Logical Plan的对比,其实我这个例子不太明显,没有复杂的子查询,然后spark对Unresolved LogicalPlan得到的logicalPlan已经做了一些元数据的关联,因此两个步骤的结果相差不大,只是把AnalysisBarrier去掉了。

4、Optimized Logical Plan

Optimized Logical Plan的生成和第三步的逻辑差不多,主要看下Optimizer自己预定义了哪些Rule Batch。

package org.apache.spark.sql.catalyst.optimizer
abstract class Optimizer(sessionCatalog: SessionCatalog)
  extends RuleExecutor[LogicalPlan] {

    def batches: Seq[Batch] = {
        val operatorOptimizationRuleSet =
          Seq(
            // Operator push down
            PushProjectionThroughUnion,
            ReorderJoin,
            EliminateOuterJoin,
            PushPredicateThroughJoin,
            PushDownPredicate,
            LimitPushDown,
            ColumnPruning,
            InferFiltersFromConstraints,
            // Operator combine
            CollapseRepartition,
            CollapseProject,
            CollapseWindow,
            CombineFilters,
            CombineLimits,
            CombineUnions,
            // Constant folding and strength reduction
            NullPropagation,
            ...
          )
        }
    }
}

在这个例子里,主要做了两类:

  • 谓词下推:把join中的imp_date == '20221216'条件,下推到了tb_hive_test表,这样可以减少数据源要扫描的数据。
  • 空值传导:默认判断分区字段不为空,以及join on条件中的的a字段不为空。

5、Physical Plan

这一步主要将逻辑计划树转换成物理计划树,以获取真实可执行的物理计划。代码还是回到QueryExecution。

package org.apache.spark.sql.execution
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {  
    
    lazy val sparkPlan: SparkPlan = {
        SparkSession.setActiveSession(sparkSession)
        planner.plan(ReturnAnswer(optimizedPlan)).next()
    }

    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

    protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
        preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
    }
}

SparkPlan 

上面executedPlan返回的就是SparkPlan,它是所有物理计划的抽象类。有了SparkPlan Tree,才能将其转换成 RDD的DAG。 SparkPlan分四类:

  • LeafExecNode,叶子节点,是不存在子节点的。在所有SparkPlan中最基础的,是生成RDD的开始,和数据源相关,例如HiveTableScanExec。
  • UnaryExecNode,一元节点,拥有一个子节点的SparkPlan,这种物理执行计划一般是对RDD进行转换,一进一出,例如Exchange重分区。
  • BinaryExecNode,二元节点,拥有两个子节点的SparkPlan,这种物理计划有LeftNode和RightNode组成,例如Join。

  • 其他节点,例如CodeGenSupport和UnionExec。

package org.apache.spark.sql.execution
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {

    trait LeafExecNode extends SparkPlan {
      override final def children: Seq[SparkPlan] = Nil
      override def producedAttributes: AttributeSet = outputSet
    }

    object UnaryExecNode {
      def unapply(a: Any): Option[(SparkPlan, SparkPlan)] = a match {
        case s: SparkPlan if s.children.size == 1 => Some((s, s.children.head))
        case _ => None
      }
    }

    trait BinaryExecNode extends SparkPlan {
      def left: SparkPlan
      def right: SparkPlan

      override final def children: Seq[SparkPlan] = Seq(left, right)
    }
}

在本例子中,Relation 算子变为 FileScan,Join 算子变为BroadcastHashJoin(因为我的测试数据量很小)。