Skip to content

Commit accde83

Browse files
milanisvetcloud-fan
authored andcommitted
[SPARK-50598][SQL] An initial, no-op PR which adds new parameters to already existing classes UnresolvedWith, CTERelationRef and CTERelationDef to enable later implementation of recursive CTEs
### What changes were proposed in this pull request? PR adds new parameters to already existing classes UnresolvedWith, CTERelationRef and CTERelationDef to enable later implementation of recursive CTEs. - Additional parameters are added in pattern matching cases in other files of mentioned classes as well. - A large number of trivial changes in tests was required, due to new parameters introduced to the classes, which are also addressed in this PR. - More information for reviewers can be found here: https://docs.google.com/document/d/1qcEJxqoXcr5cSt6HgIQjWQSqhfkSaVYkoDHsg5oxXp4/edit ### Why are the changes needed? Support for the recursive CTE. ### Does this PR introduce _any_ user-facing change? No. RECURSIVE keyword is not introduced in this PR. ### How was this patch tested? The tests failing after the initial change were all in `SQLQueryTestSuite`, so this patch was tested by running this Test Suite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49180 from milanisvet/milanrcte2. Authored-by: Milan Cupac <milan.cupac@outlook.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 63c7ca4 commit accde83

20 files changed

Lines changed: 351 additions & 340 deletions

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
123123
startOfQuery: Boolean = true): Unit = {
124124
val resolver = conf.resolver
125125
plan match {
126-
case UnresolvedWith(child, relations) =>
126+
case UnresolvedWith(child, relations, _) =>
127127
val newNames = ArrayBuffer.empty[String]
128128
newNames ++= outerCTERelationNames
129129
relations.foreach {
@@ -149,7 +149,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
149149
plan: LogicalPlan,
150150
cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = {
151151
plan.resolveOperatorsUp {
152-
case UnresolvedWith(child, relations) =>
152+
case UnresolvedWith(child, relations, _) =>
153153
val resolvedCTERelations =
154154
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
155155
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
@@ -202,7 +202,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
202202
var firstSubstituted: Option[LogicalPlan] = None
203203
val newPlan = plan.resolveOperatorsDownWithPruning(
204204
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
205-
case UnresolvedWith(child: LogicalPlan, relations) =>
205+
case UnresolvedWith(child: LogicalPlan, relations, _) =>
206206
val resolvedCTERelations =
207207
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
208208
outerCTEDefs

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
122122
private def pushdownPredicatesAndAttributes(
123123
plan: LogicalPlan,
124124
cteMap: CTEMap): LogicalPlan = plan.transformWithSubqueries {
125-
case cteDef @ CTERelationDef(child, id, originalPlanWithPredicates, _) =>
125+
case cteDef @ CTERelationDef(child, id, originalPlanWithPredicates, _, _, _) =>
126126
val (_, _, newPreds, newAttrSet) = cteMap(id)
127127
val originalPlan = originalPlanWithPredicates.map(_._1).getOrElse(child)
128128
val preds = originalPlanWithPredicates.map(_._2).getOrElse(Seq.empty)
@@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
141141
cteDef
142142
}
143143

144-
case cteRef @ CTERelationRef(cteId, _, output, _, _) =>
144+
case cteRef @ CTERelationRef(cteId, _, output, _, _, _) =>
145145
val (cteDef, _, _, newAttrSet) = cteMap(cteId)
146146
if (needsPruning(cteDef.child, newAttrSet)) {
147147
val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)
@@ -170,7 +170,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
170170
object CleanUpTempCTEInfo extends Rule[LogicalPlan] {
171171
override def apply(plan: LogicalPlan): LogicalPlan =
172172
plan.transformWithPruning(_.containsPattern(CTE)) {
173-
case cteDef @ CTERelationDef(_, _, Some(_), _) =>
173+
case cteDef @ CTERelationDef(_, _, Some(_), _, _, _) =>
174174
cteDef.copy(originalPlanWithPredicates = None)
175175
}
176176
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -833,10 +833,12 @@ object View {
833833
* @param child The final query of this CTE.
834834
* @param cteRelations A sequence of pair (alias, the CTE definition) that this CTE defined
835835
* Each CTE can see the base tables and the previously defined CTEs only.
836+
* @param allowRecursion A boolean flag if recursion is allowed.
836837
*/
837838
case class UnresolvedWith(
838839
child: LogicalPlan,
839-
cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
840+
cteRelations: Seq[(String, SubqueryAlias)],
841+
allowRecursion: Boolean = false) extends UnaryNode {
840842
final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_WITH)
841843

842844
override def output: Seq[Attribute] = child.output
@@ -862,12 +864,19 @@ case class UnresolvedWith(
862864
* pushdown to help ensure rule idempotency.
863865
* @param underSubquery If true, it means we don't need to add a shuffle for this CTE relation as
864866
* subquery reuse will be applied to reuse CTE relation output.
867+
* @param recursive If true, then this CTE Definition is recursive - it contains a self-reference.
868+
* @param recursionAnchor A helper plan node that temporary stores the anchor term of recursive
869+
* definitions. In the beginning of recursive resolution the `ResolveWithCTE`
870+
* rule updates this parameter and once it is resolved the same rule resolves
871+
* the recursive [[CTERelationRef]] references and removes this parameter.
865872
*/
866873
case class CTERelationDef(
867874
child: LogicalPlan,
868875
id: Long = CTERelationDef.newId,
869876
originalPlanWithPredicates: Option[(LogicalPlan, Seq[Expression])] = None,
870-
underSubquery: Boolean = false) extends UnaryNode {
877+
underSubquery: Boolean = false,
878+
recursive: Boolean = false,
879+
recursionAnchor: Option[LogicalPlan] = None) extends UnaryNode {
871880

872881
final override val nodePatterns: Seq[TreePattern] = Seq(CTE)
873882

@@ -891,13 +900,15 @@ object CTERelationDef {
891900
* de-duplication.
892901
* @param statsOpt The optional statistics inferred from the corresponding CTE
893902
* definition.
903+
* @param recursive If this is a recursive reference.
894904
*/
895905
case class CTERelationRef(
896906
cteId: Long,
897907
_resolved: Boolean,
898908
override val output: Seq[Attribute],
899909
override val isStreaming: Boolean,
900-
statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation {
910+
statsOpt: Option[Statistics] = None,
911+
recursive: Boolean = false) extends LeafNode with MultiInstanceRelation {
901912

902913
final override val nodePatterns: Seq[TreePattern] = Seq(CTE)
903914

sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s
44
-- !query analysis
55
CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, ErrorIfExists, [col]
66
+- WithCTE
7-
:- CTERelationDef xxxx, false
7+
:- CTERelationDef xxxx, false, false
88
: +- SubqueryAlias s
99
: +- Project [42 AS col#x]
1010
: +- OneRowRelation
1111
+- Project [col#x]
1212
+- SubqueryAlias s
13-
+- CTERelationRef xxxx, true, [col#x], false
13+
+- CTERelationRef xxxx, true, [col#x], false, false
1414

1515

1616
-- !query
@@ -26,13 +26,13 @@ CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s
2626
-- !query analysis
2727
CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, false, false, LocalTempView, UNSUPPORTED, true
2828
+- WithCTE
29-
:- CTERelationDef xxxx, false
29+
:- CTERelationDef xxxx, false, false
3030
: +- SubqueryAlias s
3131
: +- Project [42 AS col#x]
3232
: +- OneRowRelation
3333
+- Project [col#x]
3434
+- SubqueryAlias s
35-
+- CTERelationRef xxxx, true, [col#x], false
35+
+- CTERelationRef xxxx, true, [col#x], false, false
3636

3737

3838
-- !query
@@ -43,13 +43,13 @@ Project [col#x]
4343
+- View (`cte_view`, [col#x])
4444
+- Project [cast(col#x as int) AS col#x]
4545
+- WithCTE
46-
:- CTERelationDef xxxx, false
46+
:- CTERelationDef xxxx, false, false
4747
: +- SubqueryAlias s
4848
: +- Project [42 AS col#x]
4949
: +- OneRowRelation
5050
+- Project [col#x]
5151
+- SubqueryAlias s
52-
+- CTERelationRef xxxx, true, [col#x], false
52+
+- CTERelationRef xxxx, true, [col#x], false, false
5353

5454

5555
-- !query
@@ -58,13 +58,13 @@ INSERT INTO cte_tbl SELECT * FROM S
5858
-- !query analysis
5959
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
6060
+- WithCTE
61-
:- CTERelationDef xxxx, false
61+
:- CTERelationDef xxxx, false, false
6262
: +- SubqueryAlias s
6363
: +- Project [43 AS col#x]
6464
: +- OneRowRelation
6565
+- Project [col#x]
6666
+- SubqueryAlias S
67-
+- CTERelationRef xxxx, true, [col#x], false
67+
+- CTERelationRef xxxx, true, [col#x], false, false
6868

6969

7070
-- !query
@@ -80,13 +80,13 @@ INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s
8080
-- !query analysis
8181
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
8282
+- WithCTE
83-
:- CTERelationDef xxxx, false
83+
:- CTERelationDef xxxx, false, false
8484
: +- SubqueryAlias s
8585
: +- Project [44 AS col#x]
8686
: +- OneRowRelation
8787
+- Project [col#x]
8888
+- SubqueryAlias s
89-
+- CTERelationRef xxxx, true, [col#x], false
89+
+- CTERelationRef xxxx, true, [col#x], false, false
9090

9191

9292
-- !query

0 commit comments

Comments
 (0)