Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveBinaryArithmetic ::
ResolveUnion ::
typeCoercionRules ++
Seq(ResolveWithCTE) ++
extendedResolutionRules : _*),
Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn),
Batch("Apply Char Padding", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,60 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, With}
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy}

/**
* Analyze WITH nodes and substitute child plan with CTE definitions.
* Analyze WITH nodes and substitute child plan with CTE references or CTE definitions depending
* on the conditions below:
* 1. If in legacy mode, or if the query is a SQL command or DML statement, replace with CTE
* definitions, i.e., inline CTEs.
* 2. Otherwise, replace with CTE references `CTERelationRef`s. The decision to inline or not
* inline will be made later by the rule `InlineCTE` after query analysis.
*
* All the CTE definitions that are not inlined after this substitution will be grouped together
* under one `WithCTE` node for each of the main query and the subqueries. Any of the main query
* or the subqueries that do not contain CTEs or have had all CTEs inlined will obviously not have
* any `WithCTE` nodes. If any though, the `WithCTE` node will be in the same place as where the
* outermost `With` node once was.
*
* The CTE definitions in a `WithCTE` node are kept in the order of how they have been resolved.
* That means the CTE definitions are guaranteed to be in topological order base on their
* dependency for any valid CTE query (i.e., given CTE definitions A and B with B referencing A,
* A is guaranteed to appear before B). Otherwise, it must be an invalid user query, and an
* analysis exception will be thrown later by relation resolving rules.
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
traverseAndSubstituteCTE(plan)
case LegacyBehaviorPolicy.LEGACY =>
legacyTraverseAndSubstituteCTE(plan)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan)
val isCommand = plan.find {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan.collectFirst would be quicker and less memory-hungry, wouldn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing sth, but I don't see how so.

case _: Command | _: ParsedStatement | _: InsertIntoDir => true
case _ => false
}.isDefined
val cteDefs = mutable.ArrayBuffer.empty[CTERelationDef]
val (substituted, lastSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
traverseAndSubstituteCTE(plan, isCommand, cteDefs)
case LegacyBehaviorPolicy.LEGACY =>
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan, isCommand, cteDefs)
}
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq lastSubstituted.get) {
WithCTE(substituted, cteDefs.toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
case p if p eq lastSubstituted.get =>
done = true
WithCTE(p, cteDefs.toSeq)
}
}
}

Expand All @@ -59,7 +94,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
startOfQuery: Boolean = true): Unit = {
val resolver = conf.resolver
plan match {
case With(child, relations) =>
case UnresolvedWith(child, relations) =>
val newNames = mutable.ArrayBuffer.empty[String]
newNames ++= outerCTERelationNames
relations.foreach {
Expand All @@ -81,17 +116,21 @@ object CTESubstitution extends Rule[LogicalPlan] {
}
}

private def legacyTraverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = {
private def legacyTraverseAndSubstituteCTE(
plan: LogicalPlan,
cteDefs: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsUp {
case With(child, relations) =>
val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true)
substituteCTE(child, resolvedCTERelations)
case UnresolvedWith(child, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = true, isCommand = false, cteDefs)
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
}
}

/**
* Traverse the plan and expression nodes as a tree and replace matching references to CTE
* definitions.
* Traverse the plan and expression nodes as a tree and replace matching references with CTE
* references if `isCommand` is false, otherwise with the query plans of the corresponding
* CTE definitions.
* - If the rule encounters a WITH node then it substitutes the child of the node with CTE
* definitions of the node right-to-left order as a definition can reference to a previous
* one.
Expand Down Expand Up @@ -130,23 +169,36 @@ object CTESubstitution extends Rule[LogicalPlan] {
* @param plan the plan to be traversed
* @return the plan where CTE substitution is applied
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the return value of traverseAndSubstituteCTE changed a bit, this docs could use some update.

*/
private def traverseAndSubstituteCTE(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
case With(child: LogicalPlan, relations) =>
val resolvedCTERelations = resolveCTERelations(relations, isLegacy = false)
substituteCTE(child, resolvedCTERelations)
private def traverseAndSubstituteCTE(
plan: LogicalPlan,
isCommand: Boolean,
cteDefs: mutable.ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
var lastSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = false, isCommand, cteDefs)
if (!isCommand) {
cteDefs ++= resolvedCTERelations.map(_._2)
}
lastSubstituted = Some(substituteCTE(child, isCommand, resolvedCTERelations))
lastSubstituted.get

case other =>
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case e: SubqueryExpression => e.withNewPlan(traverseAndSubstituteCTE(e.plan))
case e: SubqueryExpression => e.withNewPlan(apply(e.plan))
}
}
(newPlan, lastSubstituted)
}

private def resolveCTERelations(
relations: Seq[(String, SubqueryAlias)],
isLegacy: Boolean): Seq[(String, LogicalPlan)] = {
val resolvedCTERelations = new mutable.ArrayBuffer[(String, LogicalPlan)](relations.size)
isLegacy: Boolean,
isCommand: Boolean,
cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
val resolvedCTERelations = new mutable.ArrayBuffer[(String, CTERelationDef)](relations.size)
for ((name, relation) <- relations) {
val innerCTEResolved = if (isLegacy) {
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner
Expand All @@ -156,25 +208,37 @@ object CTESubstitution extends Rule[LogicalPlan] {
} else {
// A CTE definition might contain an inner CTE that has a higher priority, so traverse and
// substitute CTE defined in `relation` first.
traverseAndSubstituteCTE(relation)
traverseAndSubstituteCTE(relation, isCommand, cteDefs)._1
}
// CTE definition can reference a previous one
resolvedCTERelations += (name -> substituteCTE(innerCTEResolved, resolvedCTERelations.toSeq))
val substituted =
substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations.toSeq)
val cteRelation = CTERelationDef(substituted)
resolvedCTERelations += (name -> cteRelation)
}
resolvedCTERelations.toSeq
}

private def substituteCTE(
plan: LogicalPlan,
cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan =
alwaysInline: Boolean,
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan =
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
case u @ UnresolvedRelation(Seq(table), _, _) =>
cteRelations.find(r => plan.conf.resolver(r._1, table)).map(_._2).getOrElse(u)
cteRelations.find(r => plan.conf.resolver(r._1, table)).map { case (_, d) =>
if (alwaysInline) {
d.child
} else {
// Add a `SubqueryAlias` for hint-resolving rules to match relation names.
SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output))
}
}.getOrElse(u)

case other =>
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.plan, cteRelations))
case e: SubqueryExpression =>
e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object ResolveHints {
case r: SubqueryAlias if matchedIdentifierInHint(extractIdentifier(r)) =>
ResolvedHint(plan, createHintInfo(hintName))

case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
case _: ResolvedHint | _: View | _: UnresolvedWith | _: SubqueryAlias =>
// Don't traverse down these nodes.
// For an existing strategy hint, there is no chance for a match from this point down.
// The rest (view, with, subquery) indicates different scopes that we shouldn't traverse
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, CTERelationRef, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION}

/**
* Updates CTE references with the resolve output attributes of corresponding CTE definitions.
*/
object ResolveWithCTE extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
if (plan.containsAllPatterns(CTE)) {
val cteDefMap = mutable.HashMap.empty[Long, CTERelationDef]
resolveWithCTE(plan, cteDefMap)
} else {
plan
}
}

private def resolveWithCTE(
plan: LogicalPlan,
cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
case w @ WithCTE(_, cteDefs) =>
cteDefs.foreach { cteDef =>
if (cteDef.resolved) {
cteDefMap.put(cteDef.id, cteDef)
}
}
w

case ref: CTERelationRef if !ref.resolved =>
cteDefMap.get(ref.cteId).map { cteDef =>
CTERelationRef(cteDef.id, cteDef.resolved, cteDef.output)
}.getOrElse {
ref
}

case other =>
other.transformExpressionsWithPruning(_.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
case e: SubqueryExpression => e.withNewPlan(resolveWithCTE(e.plan, cteDefMap))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the main query has more than one subqueries, when resolving the second subquery, the cteDefMap will contain CTE defs from the first subquery. I think we should clone the map here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary, right? The real resolving has happened in CTESubstitution earlier. Now there's a strict 1 to 1 ID mapping, so the map can only contain some unrelated CTE defs at its worst.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, makes sense

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ abstract class SubqueryExpression(
AttributeSet.fromAttributeSets(outerAttrs.map(_.references))
override def children: Seq[Expression] = outerAttrs ++ joinCond
override def withNewPlan(plan: LogicalPlan): SubqueryExpression
def isCorrelated: Boolean = outerAttrs.nonEmpty
}

object SubqueryExpression {
Expand All @@ -86,7 +87,7 @@ object SubqueryExpression {
def hasInOrCorrelatedExistsSubquery(e: Expression): Boolean = {
e.find {
case _: ListQuery => true
case _: Exists if e.children.nonEmpty => true
case ex: Exists => ex.isCorrelated
case _ => false
}.isDefined
}
Expand All @@ -98,7 +99,7 @@ object SubqueryExpression {
*/
def hasCorrelatedSubquery(e: Expression): Boolean = {
e.find {
case s: SubqueryExpression => s.children.nonEmpty
case s: SubqueryExpression => s.isCorrelated
case _ => false
}.isDefined
}
Expand Down Expand Up @@ -279,7 +280,7 @@ case class ScalarSubquery(
object ScalarSubquery {
def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
e.find {
case s: ScalarSubquery => s.children.nonEmpty
case s: ScalarSubquery => s.isCorrelated
case _ => false
}.isDefined
}
Expand Down
Loading