diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 7ef5ef55fabda..ff198c798b9c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -113,16 +113,19 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { withSubquery.foldLeft(newFilter) { case (p, Exists(sub, _, _, conditions)) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - buildJoin(outerPlan, sub, LeftSemi, joinCond) + val join = buildJoin(outerPlan, sub, LeftSemi, joinCond) + Project(p.output, join) case (p, Not(Exists(sub, _, _, conditions))) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - buildJoin(outerPlan, sub, LeftAnti, joinCond) + val join = buildJoin(outerPlan, sub, LeftAnti, joinCond) + Project(p.output, join) case (p, InSubquery(values, ListQuery(sub, _, _, _, conditions))) => // Deduplicate conflicting attributes if any. val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values)) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p) - Join(outerPlan, newSub, LeftSemi, joinCond, JoinHint.NONE) + val join = Join(outerPlan, newSub, LeftSemi, joinCond, JoinHint.NONE) + Project(p.output, join) case (p, Not(InSubquery(values, ListQuery(sub, _, _, _, conditions)))) => // This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr // Construct the condition. A NULL in one of the conditions is regarded as a positive diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index b3aefac05b129..219d7504246e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2316,4 +2316,50 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-45580: Handle case where a nested subquery becomes an existence join") { + withTempView("t1", "t2", "t3") { + Seq((1), (2), (3), (7)).toDF("a").persist().createOrReplaceTempView("t1") + Seq((1), (2), (3)).toDF("c1").persist().createOrReplaceTempView("t2") + Seq((3), (9)).toDF("col1").persist().createOrReplaceTempView("t3") + + val query1 = + """ + |SELECT * + |FROM t1 + |WHERE EXISTS ( + | SELECT c1 + | FROM t2 + | WHERE a = c1 + | OR a IN (SELECT col1 FROM t3) + |)""".stripMargin + val df1 = sql(query1) + checkAnswer(df1, Row(1) :: Row(2) :: Row(3) :: Nil) + + val query2 = + """ + |SELECT * + |FROM t1 + |WHERE a IN ( + | SELECT c1 + | FROM t2 + | where a IN (SELECT col1 FROM t3) + |)""".stripMargin + val df2 = sql(query2) + checkAnswer(df2, Row(3)) + + val query3 = + """ + |SELECT * + |FROM t1 + |WHERE NOT EXISTS ( + | SELECT c1 + | FROM t2 + | WHERE a = c1 + | OR a IN (SELECT col1 FROM t3) + |)""".stripMargin + val df3 = sql(query3) + checkAnswer(df3, Row(7)) + } + } }