From c9062186e3d763222c2e387e6c82b9c3c55cb6f6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 30 Apr 2020 00:03:02 +0800 Subject: [PATCH 1/2] fix perf regression in CTESubstitution --- .../catalyst/analysis/CTESubstitution.scala | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 41e4c1fc3930..adc3c457d46a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -87,11 +87,8 @@ object CTESubstitution extends Rule[LogicalPlan] { private def legacyTraverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsUp { case With(child, relations) => - // substitute CTE expressions right-to-left to resolve references to previous CTEs: - // with a as (select * from t), b as (select * from a) select * from b - relations.foldRight(child) { - case ((cteName, ctePlan), currentPlan) => substituteCTE(currentPlan, cteName, ctePlan) - } + val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true) + substituteCTE(child, resolvedCTERelations) } } @@ -139,18 +136,8 @@ object CTESubstitution extends Rule[LogicalPlan] { private def traverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsUp { case With(child: LogicalPlan, relations) => - // Substitute CTE definitions from last to first as a CTE definition can reference a - // previous one - relations.foldRight(child) { - case ((cteName, ctePlan), currentPlan) => - // A CTE definition might contain an inner CTE that has priority, so traverse and - // substitute CTE defined in ctePlan. - // A CTE definition might not be used at all or might be used multiple times. To avoid - // computation if it is not used and to avoid multiple recomputation if it is used - // multiple times we use a lazy construct with call-by-name parameter passing. - lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan) - substituteCTE(currentPlan, cteName, substitutedCTEPlan) - } + val resolvedCTERelations = resolveCTERelations(relations, isLegacy = false) + substituteCTE(child, resolvedCTERelations) case other => other.transformExpressions { @@ -159,17 +146,36 @@ object CTESubstitution extends Rule[LogicalPlan] { } } + private def resolveCTERelations( + relations: Seq[(String, SubqueryAlias)], + isLegacy: Boolean): Seq[(String, LogicalPlan)] = { + val resolvedCTERelations = new mutable.ArrayBuffer[(String, LogicalPlan)](relations.size) + for ((name, relation) <- relations) { + val innerCTEResolved = if (isLegacy) { + // In legacy mode, outer CTE relations take precedence, so substitute relations later. + relation + } else { + // A CTE definition might contain an inner CTE that has priority, so traverse and + // substitute CTE defined in `relation` first. + traverseAndSubstituteCTE(relation) + } + // CTE definition can reference a previous one + resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations)) + } + resolvedCTERelations + } + private def substituteCTE( plan: LogicalPlan, - cteName: String, - ctePlan: => LogicalPlan): LogicalPlan = + cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = plan resolveOperatorsUp { - case UnresolvedRelation(Seq(table)) if plan.conf.resolver(cteName, table) => ctePlan + case u @ UnresolvedRelation(Seq(table)) => + cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u) case other => // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE. other transformExpressions { - case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan)) + case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteRelations)) } } } From b022e3513a842329b24569840a07a501bed9a188 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 30 Apr 2020 17:19:46 +0800 Subject: [PATCH 2/2] update comments --- .../spark/sql/catalyst/analysis/CTESubstitution.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index adc3c457d46a..9f0eff5017f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -152,10 +152,12 @@ object CTESubstitution extends Rule[LogicalPlan] { val resolvedCTERelations = new mutable.ArrayBuffer[(String, LogicalPlan)](relations.size) for ((name, relation) <- relations) { val innerCTEResolved = if (isLegacy) { - // In legacy mode, outer CTE relations take precedence, so substitute relations later. + // In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner + // `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations. + // Analyzer will run this rule multiple times until all `With` nodes are resolved. relation } else { - // A CTE definition might contain an inner CTE that has priority, so traverse and + // A CTE definition might contain an inner CTE that has a higher priority, so traverse and // substitute CTE defined in `relation` first. traverseAndSubstituteCTE(relation) }