Skip to content

Commit 9ac9cd5

Browse files
dtenedorgengliangwang
authored andcommitted
[SPARK-40618][SQL] Fix bug in MergeScalarSubqueries rule with nested subqueries
### What changes were proposed in this pull request? There is a bug in the `MergeScalarSubqueries` rule for queries with subquery expressions nested inside each other, wherein the rule attempts to merge the nested subquery with its enclosing parent subquery. The result is not a valid plan and raises an exception in the optimizer. Here is a minimal reproducing case: ``` sql("create table test(col int) using csv") checkAnswer(sql("select(select sum((select sum(col) from test)) from test)"), Row(null)) ``` To fix, we disable the optimization for subqueries with nested subqueries inside them for now. ### Why are the changes needed? This fixes a bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated existing unit tests and added the reproducing case as a new test case. Closes apache#38052 from dtenedor/fix-merge-subquery-bug. Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 7d010b1 commit 9ac9cd5

2 files changed

Lines changed: 21 additions & 5 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,12 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
210210
cachedPlan: LogicalPlan): Option[(LogicalPlan, AttributeMap[Attribute])] = {
211211
checkIdenticalPlans(newPlan, cachedPlan).map(cachedPlan -> _).orElse(
212212
(newPlan, cachedPlan) match {
213+
case (_, _) if newPlan.containsPattern(SCALAR_SUBQUERY_REFERENCE) ||
214+
cachedPlan.containsPattern(SCALAR_SUBQUERY_REFERENCE) =>
215+
// Subquery expressions with nested subquery expressions within are not supported for now.
216+
// TODO: support this optimization by collecting the transitive subquery references in the
217+
// new plan and recording them in order to suppress merging the new plan into those.
218+
None
213219
case (np: Project, cp: Project) =>
214220
tryMergePlans(np.child, cp.child).map { case (mergedChild, outputMap) =>
215221
val (mergedProjectList, newOutputMap) =

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2157,7 +2157,7 @@ class SubquerySuite extends QueryTest
21572157
}
21582158
}
21592159

2160-
test("Merge non-correlated scalar subqueries from different parent plans") {
2160+
test("SPARK-40618: Do not merge scalar subqueries with nested subqueries inside") {
21612161
Seq(false, true).foreach { enableAQE =>
21622162
withSQLConf(
21632163
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString) {
@@ -2189,12 +2189,12 @@ class SubquerySuite extends QueryTest
21892189
}
21902190

21912191
if (enableAQE) {
2192-
assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in the plan")
2193-
assert(reusedSubqueryIds.size == 3,
2192+
assert(subqueryIds.size == 4, "Missing or unexpected SubqueryExec in the plan")
2193+
assert(reusedSubqueryIds.size == 2,
21942194
"Missing or unexpected reused ReusedSubqueryExec in the plan")
21952195
} else {
2196-
assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan")
2197-
assert(reusedSubqueryIds.size == 4,
2196+
assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in the plan")
2197+
assert(reusedSubqueryIds.size == 3,
21982198
"Missing or unexpected reused ReusedSubqueryExec in the plan")
21992199
}
22002200
}
@@ -2327,4 +2327,14 @@ class SubquerySuite extends QueryTest
23272327
assert(findProject(df2).size == 3)
23282328
}
23292329
}
2330+
2331+
test("SPARK-40618: Regression test for merging subquery bug with nested subqueries") {
2332+
// This test contains a subquery expression with another subquery expression nested inside.
2333+
// It acts as a regression test to ensure that the MergeScalarSubqueries rule does not attempt
2334+
// to merge them together.
2335+
withTable("t") {
2336+
sql("create table t(col int) using csv")
2337+
checkAnswer(sql("select(select sum((select sum(col) from t)) from t)"), Row(null))
2338+
}
2339+
}
23302340
}

0 commit comments

Comments
 (0)