diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 41e4c1fc3930..9f0eff5017f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -87,11 +87,8 @@ object CTESubstitution extends Rule[LogicalPlan] { private def legacyTraverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsUp { case With(child, relations) => - // substitute CTE expressions right-to-left to resolve references to previous CTEs: - // with a as (select * from t), b as (select * from a) select * from b - relations.foldRight(child) { - case ((cteName, ctePlan), currentPlan) => substituteCTE(currentPlan, cteName, ctePlan) - } + val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true) + substituteCTE(child, resolvedCTERelations) } } @@ -139,18 +136,8 @@ object CTESubstitution extends Rule[LogicalPlan] { private def traverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsUp { case With(child: LogicalPlan, relations) => - // Substitute CTE definitions from last to first as a CTE definition can reference a - // previous one - relations.foldRight(child) { - case ((cteName, ctePlan), currentPlan) => - // A CTE definition might contain an inner CTE that has priority, so traverse and - // substitute CTE defined in ctePlan. - // A CTE definition might not be used at all or might be used multiple times. To avoid - // computation if it is not used and to avoid multiple recomputation if it is used - // multiple times we use a lazy construct with call-by-name parameter passing. - lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan) - substituteCTE(currentPlan, cteName, substitutedCTEPlan) - } + val resolvedCTERelations = resolveCTERelations(relations, isLegacy = false) + substituteCTE(child, resolvedCTERelations) case other => other.transformExpressions { @@ -159,17 +146,38 @@ object CTESubstitution extends Rule[LogicalPlan] { } } + private def resolveCTERelations( + relations: Seq[(String, SubqueryAlias)], + isLegacy: Boolean): Seq[(String, LogicalPlan)] = { + val resolvedCTERelations = new mutable.ArrayBuffer[(String, LogicalPlan)](relations.size) + for ((name, relation) <- relations) { + val innerCTEResolved = if (isLegacy) { + // In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner + // `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations. + // Analyzer will run this rule multiple times until all `With` nodes are resolved. + relation + } else { + // A CTE definition might contain an inner CTE that has a higher priority, so traverse and + // substitute CTE defined in `relation` first. + traverseAndSubstituteCTE(relation) + } + // CTE definition can reference a previous one + resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations)) + } + resolvedCTERelations + } + private def substituteCTE( plan: LogicalPlan, - cteName: String, - ctePlan: => LogicalPlan): LogicalPlan = + cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = plan resolveOperatorsUp { - case UnresolvedRelation(Seq(table)) if plan.conf.resolver(cteName, table) => ctePlan + case u @ UnresolvedRelation(Seq(table)) => + cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u) case other => // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE. other transformExpressions { - case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan)) + case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteRelations)) } } }