Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,19 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
withSubquery.foldLeft(newFilter) {
case (p, Exists(sub, _, _, conditions)) =>
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
buildJoin(outerPlan, sub, LeftSemi, joinCond)
val join = buildJoin(outerPlan, sub, LeftSemi, joinCond)
Project(p.output, join)
case (p, Not(Exists(sub, _, _, conditions))) =>
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
buildJoin(outerPlan, sub, LeftAnti, joinCond)
val join = buildJoin(outerPlan, sub, LeftAnti, joinCond)
Project(p.output, join)
case (p, InSubquery(values, ListQuery(sub, _, _, _, conditions))) =>
// Deduplicate conflicting attributes if any.
val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))
val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
Join(outerPlan, newSub, LeftSemi, joinCond, JoinHint.NONE)
val join = Join(outerPlan, newSub, LeftSemi, joinCond, JoinHint.NONE)
Project(p.output, join)
case (p, Not(InSubquery(values, ListQuery(sub, _, _, _, conditions)))) =>
// This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr
// Construct the condition. A NULL in one of the conditions is regarded as a positive
Expand Down
46 changes: 46 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2316,4 +2316,50 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-45580: Handle case where a nested subquery becomes an existence join") {
withTempView("t1", "t2", "t3") {
Seq((1), (2), (3), (7)).toDF("a").persist().createOrReplaceTempView("t1")
Seq((1), (2), (3)).toDF("c1").persist().createOrReplaceTempView("t2")
Seq((3), (9)).toDF("col1").persist().createOrReplaceTempView("t3")

val query1 =
"""
|SELECT *
|FROM t1
|WHERE EXISTS (
| SELECT c1
| FROM t2
| WHERE a = c1
| OR a IN (SELECT col1 FROM t3)
|)""".stripMargin
val df1 = sql(query1)
checkAnswer(df1, Row(1) :: Row(2) :: Row(3) :: Nil)

val query2 =
"""
|SELECT *
|FROM t1
|WHERE a IN (
| SELECT c1
| FROM t2
| where a IN (SELECT col1 FROM t3)
|)""".stripMargin
val df2 = sql(query2)
checkAnswer(df2, Row(3))

val query3 =
"""
|SELECT *
|FROM t1
|WHERE NOT EXISTS (
| SELECT c1
| FROM t2
| WHERE a = c1
| OR a IN (SELECT col1 FROM t3)
|)""".stripMargin
val df3 = sql(query3)
checkAnswer(df3, Row(7))
}
}
}