Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,22 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
withSubquery.foldLeft(newFilter) {
case (p, Exists(sub, _, _, conditions, subHint)) =>
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
buildJoin(outerPlan, rewriteDomainJoinsIfPresent(outerPlan, sub, joinCond),
val join = buildJoin(outerPlan, rewriteDomainJoinsIfPresent(outerPlan, sub, joinCond),
LeftSemi, joinCond, subHint)
Project(p.output, join)
case (p, Not(Exists(sub, _, _, conditions, subHint))) =>
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
buildJoin(outerPlan, rewriteDomainJoinsIfPresent(outerPlan, sub, joinCond),
val join = buildJoin(outerPlan, rewriteDomainJoinsIfPresent(outerPlan, sub, joinCond),
LeftAnti, joinCond, subHint)
Project(p.output, join)
case (p, InSubquery(values, ListQuery(sub, _, _, _, conditions, subHint))) =>
// Deduplicate conflicting attributes if any.
val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))
val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
Join(outerPlan, rewriteDomainJoinsIfPresent(outerPlan, newSub, joinCond),
val join = Join(outerPlan, rewriteDomainJoinsIfPresent(outerPlan, newSub, joinCond),
LeftSemi, joinCond, JoinHint(None, subHint))
Project(p.output, join)
case (p, Not(InSubquery(values, ListQuery(sub, _, _, _, conditions, subHint)))) =>
// This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr
// Construct the condition. A NULL in one of the conditions is regarded as a positive
Expand Down
46 changes: 46 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2735,4 +2735,50 @@ class SubquerySuite extends QueryTest
Row(1, "a", 3) :: Row(2, "a", 3) :: Row(3, "a", 3) :: Nil)
}
}

test("SPARK-45580: Handle case where a nested subquery becomes an existence join") {
withTempView("t1", "t2", "t3") {
Seq((1), (2), (3), (7)).toDF("a").persist().createOrReplaceTempView("t1")
Seq((1), (2), (3)).toDF("c1").persist().createOrReplaceTempView("t2")
Seq((3), (9)).toDF("col1").persist().createOrReplaceTempView("t3")

val query1 =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned in the description that this particular query will not show the extraneous boolean column when executed via the Dataset API. However, when Utils.isTesting is true, the RuleExecutor will notice that the rule has changed the query's schema and throw an exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @bersprockets !

"""
|SELECT *
|FROM t1
|WHERE EXISTS (
| SELECT c1
| FROM t2
| WHERE a = c1
| OR a IN (SELECT col1 FROM t3)
|)""".stripMargin
val df1 = sql(query1)
checkAnswer(df1, Row(1) :: Row(2) :: Row(3) :: Nil)

val query2 =
"""
|SELECT *
|FROM t1
|WHERE a IN (
| SELECT c1
| FROM t2
| where a IN (SELECT col1 FROM t3)
|)""".stripMargin
val df2 = sql(query2)
checkAnswer(df2, Row(3))

val query3 =
"""
|SELECT *
|FROM t1
|WHERE NOT EXISTS (
| SELECT c1
| FROM t2
| WHERE a = c1
| OR a IN (SELECT col1 FROM t3)
|)""".stripMargin
val df3 = sql(query3)
checkAnswer(df3, Row(7))
}
}
}