-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31607][SQL] Improve the perf of CTESubstitution #28407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,11 +87,8 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| private def legacyTraverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = { | ||
| plan.resolveOperatorsUp { | ||
| case With(child, relations) => | ||
| // substitute CTE expressions right-to-left to resolve references to previous CTEs: | ||
| // with a as (select * from t), b as (select * from a) select * from b | ||
| relations.foldRight(child) { | ||
| case ((cteName, ctePlan), currentPlan) => substituteCTE(currentPlan, cteName, ctePlan) | ||
| } | ||
| val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true) | ||
| substituteCTE(child, resolvedCTERelations) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -139,18 +136,8 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| private def traverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = { | ||
| plan.resolveOperatorsUp { | ||
| case With(child: LogicalPlan, relations) => | ||
| // Substitute CTE definitions from last to first as a CTE definition can reference a | ||
| // previous one | ||
| relations.foldRight(child) { | ||
| case ((cteName, ctePlan), currentPlan) => | ||
| // A CTE definition might contain an inner CTE that has priority, so traverse and | ||
| // substitute CTE defined in ctePlan. | ||
| // A CTE definition might not be used at all or might be used multiple times. To avoid | ||
| // computation if it is not used and to avoid multiple recomputation if it is used | ||
| // multiple times we use a lazy construct with call-by-name parameter passing. | ||
| lazy val substitutedCTEPlan = traverseAndSubstituteCTE(ctePlan) | ||
| substituteCTE(currentPlan, cteName, substitutedCTEPlan) | ||
| } | ||
| val resolvedCTERelations = resolveCTERelations(relations, isLegacy = false) | ||
| substituteCTE(child, resolvedCTERelations) | ||
|
|
||
| case other => | ||
| other.transformExpressions { | ||
|
|
@@ -159,17 +146,38 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
| } | ||
| } | ||
|
|
||
| private def resolveCTERelations( | ||
| relations: Seq[(String, SubqueryAlias)], | ||
| isLegacy: Boolean): Seq[(String, LogicalPlan)] = { | ||
| val resolvedCTERelations = new mutable.ArrayBuffer[(String, LogicalPlan)](relations.size) | ||
| for ((name, relation) <- relations) { | ||
| val innerCTEResolved = if (isLegacy) { | ||
| // In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner | ||
| // `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations. | ||
| // Analyzer will run this rule multiple times until all `With` nodes are resolved. | ||
| relation | ||
| } else { | ||
| // A CTE definition might contain an inner CTE that has a higher priority, so traverse and | ||
| // substitute CTE defined in `relation` first. | ||
| traverseAndSubstituteCTE(relation) | ||
| } | ||
| // CTE definition can reference a previous one | ||
| resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For legacy case, Then in later
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The rule |
||
| } | ||
| resolvedCTERelations | ||
| } | ||
|
|
||
| private def substituteCTE( | ||
| plan: LogicalPlan, | ||
| cteName: String, | ||
| ctePlan: => LogicalPlan): LogicalPlan = | ||
| cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = | ||
| plan resolveOperatorsUp { | ||
| case UnresolvedRelation(Seq(table)) if plan.conf.resolver(cteName, table) => ctePlan | ||
| case u @ UnresolvedRelation(Seq(table)) => | ||
| cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u) | ||
|
|
||
| case other => | ||
| // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE. | ||
| other transformExpressions { | ||
| case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteName, ctePlan)) | ||
| case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteRelations)) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Just trying to understand. innerCTEResolved indicates a already resolved CTE or the one we are going to resolve in the subsequent call to substituteCTE ?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"resolved" here means the
Withis resolved inside this relation. The relation needs further processing to substituteUnresolvedRelationwith the previous CTE relations.The naming is not very accurate when
legacy = true, but this probably doesn't matter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan OK. sounds good.