Skip to content

Commit 488f392

Browse files
peter-tothcloud-fan
authored andcommitted
[SPARK-38404][SQL] Improve CTE resolution when a nested CTE references an outer CTE
### What changes were proposed in this pull request? Please note that the bug in the [SPARK-38404](https://issues.apache.org/jira/browse/SPARK-38404) is fixed already with #34929. This PR is a minor improvement to the current implementation by collecting already resolved outer CTEs to avoid re-substituting already collected CTE definitions. ### Why are the changes needed? Small improvement + additional tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new test case. Closes #36146 from peter-toth/SPARK-38404-nested-cte-references-outer-cte. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 8acce88 commit 488f392

5 files changed

Lines changed: 96 additions & 43 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20-
import scala.collection.mutable
20+
import scala.collection.mutable.ArrayBuffer
2121

2222
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
2323
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
@@ -55,27 +55,27 @@ object CTESubstitution extends Rule[LogicalPlan] {
5555
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
5656
case _ => false
5757
}
58-
val cteDefs = mutable.ArrayBuffer.empty[CTERelationDef]
58+
val cteDefs = ArrayBuffer.empty[CTERelationDef]
5959
val (substituted, lastSubstituted) =
6060
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
6161
case LegacyBehaviorPolicy.EXCEPTION =>
6262
assertNoNameConflictsInCTE(plan)
63-
traverseAndSubstituteCTE(plan, isCommand, cteDefs)
63+
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
6464
case LegacyBehaviorPolicy.LEGACY =>
6565
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
6666
case LegacyBehaviorPolicy.CORRECTED =>
67-
traverseAndSubstituteCTE(plan, isCommand, cteDefs)
67+
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
6868
}
6969
if (cteDefs.isEmpty) {
7070
substituted
7171
} else if (substituted eq lastSubstituted.get) {
72-
WithCTE(substituted, cteDefs.sortBy(_.id).toSeq)
72+
WithCTE(substituted, cteDefs.toSeq)
7373
} else {
7474
var done = false
7575
substituted.resolveOperatorsWithPruning(_ => !done) {
7676
case p if p eq lastSubstituted.get =>
7777
done = true
78-
WithCTE(p, cteDefs.sortBy(_.id).toSeq)
78+
WithCTE(p, cteDefs.toSeq)
7979
}
8080
}
8181
}
@@ -98,7 +98,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
9898
val resolver = conf.resolver
9999
plan match {
100100
case UnresolvedWith(child, relations) =>
101-
val newNames = mutable.ArrayBuffer.empty[String]
101+
val newNames = ArrayBuffer.empty[String]
102102
newNames ++= outerCTERelationNames
103103
relations.foreach {
104104
case (name, relation) =>
@@ -121,11 +121,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
121121

122122
private def legacyTraverseAndSubstituteCTE(
123123
plan: LogicalPlan,
124-
cteDefs: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = {
124+
cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = {
125125
plan.resolveOperatorsUp {
126126
case UnresolvedWith(child, relations) =>
127127
val resolvedCTERelations =
128-
resolveCTERelations(relations, isLegacy = true, isCommand = false, cteDefs)
128+
resolveCTERelations(relations, isLegacy = true, isCommand = false, Seq.empty, cteDefs)
129129
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
130130
}
131131
}
@@ -170,21 +170,23 @@ object CTESubstitution extends Rule[LogicalPlan] {
170170
* SELECT * FROM t
171171
* )
172172
* @param plan the plan to be traversed
173-
* @return the plan where CTE substitution is applied
173+
* @param isCommand if this is a command
174+
* @param outerCTEDefs already resolved outer CTE definitions with names
175+
* @param cteDefs all accumulated CTE definitions
176+
* @return the plan where CTE substitution is applied and optionally the last substituted `With`
177+
* where CTE definitions will be gathered to
174178
*/
175179
private def traverseAndSubstituteCTE(
176180
plan: LogicalPlan,
177181
isCommand: Boolean,
178-
cteDefs: mutable.ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
182+
outerCTEDefs: Seq[(String, CTERelationDef)],
183+
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
179184
var lastSubstituted: Option[LogicalPlan] = None
180185
val newPlan = plan.resolveOperatorsUpWithPruning(
181186
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
182187
case UnresolvedWith(child: LogicalPlan, relations) =>
183188
val resolvedCTERelations =
184-
resolveCTERelations(relations, isLegacy = false, isCommand, cteDefs)
185-
if (!isCommand) {
186-
cteDefs ++= resolvedCTERelations.map(_._2)
187-
}
189+
resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs)
188190
lastSubstituted = Some(substituteCTE(child, isCommand, resolvedCTERelations))
189191
lastSubstituted.get
190192

@@ -200,10 +202,14 @@ object CTESubstitution extends Rule[LogicalPlan] {
200202
relations: Seq[(String, SubqueryAlias)],
201203
isLegacy: Boolean,
202204
isCommand: Boolean,
203-
cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
204-
val resolvedCTERelations = new mutable.ArrayBuffer[(String, CTERelationDef)](relations.size)
205+
outerCTEDefs: Seq[(String, CTERelationDef)],
206+
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
207+
var resolvedCTERelations = if (isLegacy || isCommand) {
208+
Seq.empty
209+
} else {
210+
outerCTEDefs
211+
}
205212
for ((name, relation) <- relations) {
206-
val lastCTEDefCount = cteDefs.length
207213
val innerCTEResolved = if (isLegacy) {
208214
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner
209215
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations.
@@ -221,31 +227,18 @@ object CTESubstitution extends Rule[LogicalPlan] {
221227
// WITH t3 AS (SELECT * FROM t1)
222228
// )
223229
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
224-
traverseAndSubstituteCTE(relation, isCommand, cteDefs)._1
225-
}
226-
227-
if (cteDefs.length > lastCTEDefCount) {
228-
// We have added more CTE relations to the `cteDefs` from the inner CTE, and these relations
229-
// should also be substituted with `resolvedCTERelations` as inner CTE relation can refer to
230-
// outer CTE relation. For example:
231-
// WITH t1 AS (SELECT 1)
232-
// t2 AS (
233-
// WITH t3 AS (SELECT * FROM t1)
234-
// )
235-
for (i <- lastCTEDefCount until cteDefs.length) {
236-
val substituted =
237-
substituteCTE(cteDefs(i).child, isLegacy || isCommand, resolvedCTERelations.toSeq)
238-
cteDefs(i) = cteDefs(i).copy(child = substituted)
239-
}
230+
traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, cteDefs)._1
240231
}
241-
242232
// CTE definition can reference a previous one
243-
val substituted =
244-
substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations.toSeq)
233+
val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations)
245234
val cteRelation = CTERelationDef(substituted)
246-
resolvedCTERelations += (name -> cteRelation)
235+
if (!(isLegacy || isCommand)) {
236+
cteDefs += cteRelation
237+
}
238+
// Prepending new CTEs makes sure that those have higher priority over outer ones.
239+
resolvedCTERelations +:= (name -> cteRelation)
247240
}
248-
resolvedCTERelations.toSeq
241+
resolvedCTERelations
249242
}
250243

251244
private def substituteCTE(

sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,15 @@ WITH abc AS (SELECT 1)
135135
SELECT (
136136
WITH aBc AS (SELECT 2)
137137
SELECT * FROM aBC
138-
);
138+
);
139+
140+
-- SPARK-38404: CTE in CTE definition references outer
141+
WITH
142+
t1 AS (SELECT 1),
143+
t2 AS (
144+
WITH t3 AS (
145+
SELECT * FROM t1
146+
)
147+
SELECT * FROM t3
148+
)
149+
SELECT * FROM t2;

sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 16
2+
-- Number of queries: 17
33

44

55
-- !query
@@ -219,3 +219,20 @@ SELECT (
219219
struct<scalarsubquery():int>
220220
-- !query output
221221
1
222+
223+
224+
-- !query
225+
WITH
226+
t1 AS (SELECT 1),
227+
t2 AS (
228+
WITH t3 AS (
229+
SELECT * FROM t1
230+
)
231+
SELECT * FROM t3
232+
)
233+
SELECT * FROM t2
234+
-- !query schema
235+
struct<>
236+
-- !query output
237+
org.apache.spark.sql.AnalysisException
238+
Table or view not found: t1; line 5 pos 20

sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 16
2+
-- Number of queries: 17
33

44

55
-- !query
@@ -227,3 +227,19 @@ struct<>
227227
-- !query output
228228
org.apache.spark.sql.AnalysisException
229229
Name aBc is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
230+
231+
232+
-- !query
233+
WITH
234+
t1 AS (SELECT 1),
235+
t2 AS (
236+
WITH t3 AS (
237+
SELECT * FROM t1
238+
)
239+
SELECT * FROM t3
240+
)
241+
SELECT * FROM t2
242+
-- !query schema
243+
struct<1:int>
244+
-- !query output
245+
1

sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 16
2+
-- Number of queries: 17
33

44

55
-- !query
@@ -219,3 +219,19 @@ SELECT (
219219
struct<scalarsubquery():int>
220220
-- !query output
221221
2
222+
223+
224+
-- !query
225+
WITH
226+
t1 AS (SELECT 1),
227+
t2 AS (
228+
WITH t3 AS (
229+
SELECT * FROM t1
230+
)
231+
SELECT * FROM t3
232+
)
233+
SELECT * FROM t2
234+
-- !query schema
235+
struct<1:int>
236+
-- !query output
237+
1

0 commit comments

Comments
 (0)