Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
// (a1,a2,...) = (b1,b2,...)
// to
// (a1=b1 OR isnull(a1=b1)) AND (a2=b2 OR isnull(a2=b2)) AND ...
val joinConds = splitConjunctivePredicates(joinCond.get)
val baseJoinConds = splitConjunctivePredicates(joinCond.get)
val nullAwareJoinConds = baseJoinConds.map(c => Or(c, IsNull(c)))
// After that, add back the correlated join predicate(s) in the subquery
// Example:
// SELECT ... FROM A WHERE A.A1 NOT IN (SELECT B.B1 FROM B WHERE B.B2 = A.A2 AND B.B3 > 1)
// will have the final conditions in the LEFT ANTI as
// (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2)
val pairs = (joinConds.map(c => Or(c, IsNull(c))) ++ conditions).reduceLeft(And)
// (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2) AND B.B3 > 1
val finalJoinCond = (nullAwareJoinConds ++ conditions).reduceLeft(And)
// Deduplicate conflicting attributes if any.
dedupJoin(Join(outerPlan, sub, LeftAnti, Option(pairs)))
dedupJoin(Join(outerPlan, sub, LeftAnti, Option(finalJoinCond)))
case (p, predicate) =>
val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p)
Project(p.output, Filter(newCond.get, inputPlan))
Expand Down
133 changes: 132 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql

import org.scalatest.GivenWhenThen

import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.test.SharedSQLContext

class SubquerySuite extends QueryTest with SharedSQLContext {
class SubquerySuite extends QueryTest with SharedSQLContext with GivenWhenThen {
import testImplicits._

setupTestData()
Expand Down Expand Up @@ -275,6 +277,135 @@ class SubquerySuite extends QueryTest with SharedSQLContext {

}

// ``col NOT IN expr'' is quite difficult to reason about. There are many edge cases, some of the
// rules are not very intuitive, and precedence and treatment of null values is somewhat
// unintuitive. To make this simpler to understand, I've come up with a plain English way of
// describing the expected behavior of this query.
//
// - If the subquery is empty (i.e. returns no rows), the row should be returned, regardless of
// whether the filtered columns include nulls.
// - If the subquery contains a result with all nulls, then the row should not be returned.
// - If for all non-null filter columns there exists a row in the subquery in which each column
// either
// 1. is equal to the corresponding filter column or
// 2. is null
// then the row should not be returned. (This includes the case where all filter columns are
// null.)
// - Otherwise, the row should be returned.
//
// Using these rules, we can come up with a set of test cases for single-column and multi-column
// NOT IN test cases.
test("NOT IN single column with nulls predicate subquery") {
// Test cases for single-column ``WHERE a NOT IN (SELECT c FROM r ...)'':
// | # | does subquery include null? | is a null? | a = c? | row with a included in result? |
// | 1 | empty | | | yes |
// | 2 | yes | | | no |
// | 3 | no | yes | | no |
// | 4 | no | no | yes | no |
// | 5 | no | no | no | yes |
Seq(row((null, 5.0)), row((3, 3.0))).toDF("a", "b").createOrReplaceTempView("m")
Copy link
Member

@gatorsmile gatorsmile May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use withTempView to ensure these views are dropped after the tests

Seq(row((2, 3.0)), row((2, 3.0)), row((null, 5.0))).toDF("c", "d").createOrReplaceTempView("s")

// Single-column test cases
val subqueryIsEmpty = "d > 6.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment like

local predicates in subquery

val cIncludesNull = "d = 5.0"
val cDoesNotMatchA = "d = 3.0"
val cMatchesA = "d = 5.0"
val aIsNull = "b = 5.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a comment here for aIsNull and aIsNotNull?

local predicate of table m

val aIsNotNull = "b = 3.0"

val includesNullRow = Row(null, 5.0) :: Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment above this line,

// expected results

val includesNotNullRow = Row(3, 3.0) :: Nil
val doesNotIncludeRow = Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> returnNullRow, returnNotNullRow, returnEmpty


val singleColumnTestCases = Seq(
("Case 1a (subquery is empty)", subqueryIsEmpty, aIsNull, includesNullRow),
("Case 1b (subquery is empty)", subqueryIsEmpty, aIsNotNull, includesNotNullRow),
("Case 2a (subquery includes null)", cIncludesNull, aIsNull, doesNotIncludeRow),
("Case 2b (subquery includes null)", cIncludesNull, aIsNotNull, doesNotIncludeRow),
("Case 3 (probe column is null)", cDoesNotMatchA, aIsNull, doesNotIncludeRow),
("Case 4 (there is a match)", cMatchesA, aIsNotNull, doesNotIncludeRow),
("Case 5 (there is no match)", cDoesNotMatchA, aIsNotNull, includesNotNullRow))

for ((given, sClause, mClause, expectedOutput) <- singleColumnTestCases) {
Given(given)
val query = s"SELECT * FROM m WHERE $mClause AND a NOT IN (SELECT c FROM s WHERE $sClause)"
checkAnswer(sql(query), expectedOutput)
}

// Correlated subqueries should also be handled properly. The addition of the correlated
// subquery changes the query from case 2/3/4 to case 1. Because of this, the row from l should
// be included in the output.
val correlatedSubqueryTestCases = Seq(
("Case 2a->1 (subquery had nulls)", cIncludesNull, aIsNull, includesNullRow),
("Case 2b->1 (subquery had nulls)", cIncludesNull, aIsNotNull, includesNotNullRow),
("Case 3->1 (probe column was null)", cMatchesA, aIsNull, includesNullRow),
("Case 4->1 (there was a match)", cMatchesA, aIsNotNull, includesNotNullRow))
for ((given, sClause, mClause, expectedOutput) <- correlatedSubqueryTestCases) {
Given(given)
// scalastyle:off
val query =
s"SELECT * FROM m WHERE $mClause AND a NOT IN (SELECT c FROM s WHERE $sClause AND c < b - 10)"
// scalastyle:on
checkAnswer(sql(query), expectedOutput)
}
}

test("NOT IN multi column with nulls predicate subquery") {
// scalastyle:off
// Test cases for multi-column ``WHERE a NOT IN (SELECT c FROM r ...)'':
// | # | does subquery include null? | do filter columns contain null? | a = c? | b = d? | row included in result? |
// | 1 | empty | * | * | * | yes |
// | 2 | 1+ row has null for all columns | * | * | * | no |
// | 3 | no row has null for all columns | (yes, yes) | * | * | no |
// | 4 | no | (no, yes) | yes | * | no |
// | 5 | no row has null for all columns | (no, yes) | no | * | yes |
// | 6 | no | (no, no) | yes | yes | no |
// | 7 | no | (no, no) | _ | _ | yes |
//
// This can clearly be generalized, but only these cases are tested here.
// scalastyle:on

Seq(row((null, null)), row((3, 5.0)), row((2, null)), row((2, 3.0))).toDF("a", "b")
.createOrReplaceTempView("m")
Seq(row((null, null)), row((2, 3.0)), row((3, null))).toDF("c", "d")
.createOrReplaceTempView("s")

val subqueryIsEmpty = "c > 200" // Returns ()
val dIsNull = "c = 3" // Returns (3, null)
val cAndDAreNull = "c IS NULL AND d IS NULL" // Returns (null, null)
val cAndDAreNotNull = "c = 2" // Returns (2, 3.0)

val aAndBAreNull = "a IS NULL AND b IS NULL" // Returns (null, null)
val aAndBAreNotNull = "a = 3" // Returns (3, 5.0)
val aAndBMatch = "a = 2 AND b = 3.0" // Returns (2, 3.0)
val aIsNotNull = "a = 2" // Returns (2, null), (2, 3.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like these comments. It can help reviewers understand the naming.


val includesNullRow = Row(null, null) :: Nil
val includesSemiNullAndNotNullRow = Row(2, null) :: Row(2, 3.0) :: Nil
val includesPartiallyNullRow = Row(2, null) :: Nil
val includesNotNullRow = Row(3, 5.0) :: Nil
val doesNotIncludeRow = Nil
val multiColumnTestCases = Seq(
("Case 1a (subquery is empty)", subqueryIsEmpty, aAndBAreNull, includesNullRow),
("Case 1b (subquery is empty)", subqueryIsEmpty, aIsNotNull, includesSemiNullAndNotNullRow),
("Case 2a (subquery contains null)", cAndDAreNull, aAndBAreNull, doesNotIncludeRow),
("Case 2b (subquery contains null)", cAndDAreNull, aAndBAreNotNull, doesNotIncludeRow),
("Case 3 (probe columns are all null)", dIsNull, aAndBAreNull, doesNotIncludeRow),
("Case 4 (null column, match)", cAndDAreNotNull, aIsNotNull, doesNotIncludeRow),
("Case 5 (null column, no match)", dIsNull, aIsNotNull, includesSemiNullAndNotNullRow),
("Case 6 (no null column, match)", cAndDAreNotNull, aAndBMatch, doesNotIncludeRow),
("Case 7 (no null column, no match)", cAndDAreNotNull, aAndBAreNotNull, includesNotNullRow))

for ((given, sClause, mClause, expectedOutput) <- multiColumnTestCases) {
Given(given)
val query =
s"SELECT * FROM m WHERE $mClause AND (a, b) NOT IN (SELECT c, d FROM s WHERE $sClause)"
checkAnswer(sql(query), expectedOutput)
}
}


test("IN predicate subquery within OR") {
checkAnswer(
sql("select * from l where l.a in (select c from r)" +
Expand Down