From b98865127a39bde885f9b1680cfe608629d59d51 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 17:43:56 -0400 Subject: [PATCH 01/13] [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results ## What changes were proposed in this pull request? This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase. ## How was this patch tested? ./dev/run-tests a new unit test on the problematic pattern. --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++++ .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2efa997ff22d..c3ee6517875c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,6 +1021,16 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e + case l @ LocalLimit(_, child) => + failOnOuterReferenceInSubTree(l, "LIMIT") + l + // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) + // and we are walking bottom up, we will fail on LocalLimit before + // reaching GlobalLimit. + // The code below is just a safety net. + case g @ GlobalLimit(_, child) => + failOnOuterReferenceInSubTree(g, "LIMIT") + g case p => failOnOuterReference(p) p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index ff112c51697a..b78a988eddbb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -533,5 +533,13 @@ class AnalysisErrorSuite extends AnalysisTest { Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a), c), LocalRelation(c)))), LocalRelation(a)) assertAnalysisError(plan3, "Accessing outer query column is not allowed in" :: Nil) + + val plan4 = Filter( + Exists( + Limit(1, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) + ), + LocalRelation(a)) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in LIMIT" :: Nil) } } From 069ed8f8e5f14dca7a15701945d42fc27fe82f3c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 17:50:02 -0400 Subject: [PATCH 02/13] [SPARK-16804][SQL] Correlated subqueries containing LIMIT return incorrect results ## What changes were proposed in this pull request? This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase. ## How was this patch tested? ./dev/run-tests a new unit test on the problematic pattern. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c3ee6517875c..357c763f5946 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1022,14 +1022,14 @@ class Analyzer( failOnOuterReferenceInSubTree(e, "an EXPAND") e case l @ LocalLimit(_, child) => - failOnOuterReferenceInSubTree(l, "LIMIT") + failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. case g @ GlobalLimit(_, child) => - failOnOuterReferenceInSubTree(g, "LIMIT") + failOnOuterReferenceInSubTree(g, "a LIMIT") g case p => failOnOuterReference(p) From edca333c081e6d4e53a91b496fba4a3ef4ee89ac Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 29 Jul 2016 20:28:15 -0400 Subject: [PATCH 03/13] New positive test cases --- .../org/apache/spark/sql/SubquerySuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index afed342ff8e2..52387b4b72a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -571,4 +571,33 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1.0, false) :: Row(1.0, false) :: Row(2.0, true) :: Row(2.0, true) :: Row(3.0, false) :: Row(5.0, true) :: Row(null, false) :: Row(null, true) :: Nil) } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 1") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 from onerow t2 where t1.c1=t2.c1) + | and exists (select 1 from onerow LIMIT 1)""".stripMargin), + Row(1) :: Nil) + } + } + + test("SPARK-16804: Correlated subqueries containing LIMIT - 2") { + withTempView("onerow") { + Seq(1).toDF("c1").createOrReplaceTempView("onerow") + + checkAnswer( + sql( + """ + | select c1 from onerow t1 + | where exists (select 1 + | from (select 1 from onerow t2 LIMIT 1) + | where t1.c1=t2.c1)""".stripMargin), + Row(1) :: Nil) + } + } } From 64184fdb77c1a305bb2932e82582da28bb4c0e53 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Mon, 1 Aug 2016 09:20:09 -0400 Subject: [PATCH 04/13] Fix unit test case failure --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index b78a988eddbb..c08de826bd94 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -540,6 +540,6 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b))) ), LocalRelation(a)) - assertAnalysisError(plan4, "Accessing outer query column is not allowed in LIMIT" :: Nil) + assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) } } From 29f82b05c9e40e7934397257c674b260a8e8a996 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 5 Aug 2016 13:42:01 -0400 Subject: [PATCH 05/13] blocking TABLESAMPLE --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 7 +++++-- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 357c763f5946..9d99c4173d4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,16 +1021,19 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e - case l @ LocalLimit(_, child) => + case l @ LocalLimit(_, _) => failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. - case g @ GlobalLimit(_, child) => + case g @ GlobalLimit(_, _) => failOnOuterReferenceInSubTree(g, "a LIMIT") g + case s @ Sample(_, _, _, _, _) => + failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") + s case p => failOnOuterReference(p) p diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index c08de826bd94..0b7d681be511 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -541,5 +541,13 @@ class AnalysisErrorSuite extends AnalysisTest { ), LocalRelation(a)) assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil) + + val plan5 = Filter( + Exists( + Sample(0.0, 0.5, false, 1L, + Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) + ), + LocalRelation(a)) + assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From ac43ab47907a1ccd6d22f920415fbb4de93d4720 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Fri, 5 Aug 2016 17:10:19 -0400 Subject: [PATCH 06/13] Fixing code styling --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9d99c4173d4a..29ede7048a2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1021,17 +1021,17 @@ class Analyzer( case e: Expand => failOnOuterReferenceInSubTree(e, "an EXPAND") e - case l @ LocalLimit(_, _) => + case l : LocalLimit => failOnOuterReferenceInSubTree(l, "a LIMIT") l // Since LIMIT is represented as GlobalLimit(, (LocalLimit (, child)) // and we are walking bottom up, we will fail on LocalLimit before // reaching GlobalLimit. // The code below is just a safety net. - case g @ GlobalLimit(_, _) => + case g : GlobalLimit => failOnOuterReferenceInSubTree(g, "a LIMIT") g - case s @ Sample(_, _, _, _, _) => + case s : Sample => failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") s case p => From 631d396031e8bf627eb1f4872a4d3a17c144536c Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 7 Aug 2016 14:39:44 -0400 Subject: [PATCH 07/13] Correcting Scala test style --- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 0b7d681be511..8935d979414a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -548,6 +548,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) ), LocalRelation(a)) - assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) + assertAnalysisError(plan5, + "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From 7eb9b2dbba3633a1958e38e0019e3ce816300514 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 7 Aug 2016 22:31:09 -0400 Subject: [PATCH 08/13] One (last) attempt to correct the Scala style tests --- .../apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 8935d979414a..6438065fb292 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -548,7 +548,7 @@ class AnalysisErrorSuite extends AnalysisTest { Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b) ), LocalRelation(a)) - assertAnalysisError(plan5, + assertAnalysisError(plan5, "Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil) } } From 473c81bacda2b12e6b85fe3f609ba334460bf0fe Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Sun, 1 Jan 2017 11:15:07 -0500 Subject: [PATCH 09/13] first try on the fix --- .../apache/spark/sql/catalyst/optimizer/subquery.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index f14aaab72a98..ce4816198839 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -68,8 +68,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Note that will almost certainly be planned as a Broadcast Nested Loop join. // Use EXISTS if performance matters to you. val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - val anyNull = splitConjunctivePredicates(joinCond.get).map(IsNull).reduceLeft(Or) - Join(outerPlan, sub, LeftAnti, Option(Or(anyNull, joinCond.get))) + val joinConds = splitConjunctivePredicates(joinCond.get) + val isNulls = joinConds.map(IsNull) + val pairs = joinConds.zip(isNulls).map(Or.tupled).reduceLeft(And) + Join(outerPlan, sub, LeftAnti, Option(pairs)) + // val x = isNulls.reduceLeft(Or) + // Join(outerPlan, sub, LeftAnti, Option(Or(x, joinCond.get))) case (p, predicate) => val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) Project(p.output, Filter(newCond.get, inputPlan)) From 278ebaea9ab52bc141e85e578416203107d38eda Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Tue, 3 Jan 2017 17:07:35 -0500 Subject: [PATCH 10/13] add/update test cases --- .../in-subquery/not-in-multiple-columns.sql | 55 +++++++++++++++++ .../not-in-multiple-columns.sql.out | 59 +++++++++++++++++++ .../apache/spark/sql/SQLQueryTestSuite.scala | 7 ++- .../org/apache/spark/sql/SubquerySuite.scala | 6 +- 4 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/not-in-multiple-columns.sql.out diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql new file mode 100644 index 000000000000..2a64e31a6145 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql @@ -0,0 +1,55 @@ +-- This file contains test cases for NOT IN subquery with multiple columns. + +-- The data sets are populated as follows: +-- 1) When T1.A1 = T2.A2 +-- 1.1) T1.B1 = T2.B2 +-- 1.2) T1.B1 = T2.B2 returns false +-- 1.3) T1.B1 is null +-- 1.4) T2.B2 is null +-- 2) When T1.A1 = T2.A2 returns false +-- 3) When T1.A1 is null +-- 4) When T1.A2 is null + +-- T1.A1 T1.B1 T2.A2 T2.B2 +-- ----- ----- ----- ----- +-- 1 1 1 1 (1.1) +-- 1 3 (1.2) +-- 1 null 1 null (1.3 & 1.4) +-- +-- 2 1 1 1 (2) +-- null 1 (3) +-- null 3 (4) + +create temporary view t1 as select * from values + (1, 1), (2, 1), (null, 1), + (1, 3), (null, 3), + (1, null), (null, 2) +as t1(a1, b1); + +create temporary view t2 as select * from values + (1, 1), + (null, 3), + (1, null) +as t2(a2, b2); + +-- multiple columns in NOT IN +-- TC 01.01 +select a1,b1 +from t1 +where (a1,b1) not in (select a2,b2 + from t2); + +-- multiple columns with expressions in NOT IN +-- TC 01.02 +select a1,b1 +from t1 +where (a1-1,b1) not in (select a2,b2 + from t2); + +-- multiple columns with expressions in NOT IN +-- TC 01.02 +select a1,b1 +from t1 +where (a1,b1) not in (select a2+1,b2 + from t2); + diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/not-in-multiple-columns.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/not-in-multiple-columns.sql.out new file mode 100644 index 000000000000..756c3782a0e7 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/not-in-multiple-columns.sql.out @@ -0,0 +1,59 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 5 + + +-- !query 0 +create temporary view t1 as select * from values + (1, 1), (2, 1), (null, 1), + (1, 3), (null, 3), + (1, null), (null, 2) +as t1(a1, b1) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +create temporary view t2 as select * from values + (1, 1), + (null, 3), + (1, null) +as t2(a2, b2) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +select a1,b1 +from t1 +where (a1,b1) not in (select a2,b2 + from t2) +-- !query 2 schema +struct +-- !query 2 output +2 1 + + +-- !query 3 +select a1,b1 +from t1 +where (a1-1,b1) not in (select a2,b2 + from t2) +-- !query 3 schema +struct +-- !query 3 output +1 1 + + +-- !query 4 +select a1,b1 +from t1 +where (a1,b1) not in (select a2+1,b2 + from t2) +-- !query 4 schema +struct +-- !query 4 output +1 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 1a4049fb339c..fdf940a7f950 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -163,7 +163,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { s"-- Number of queries: ${outputs.size}\n\n\n" + outputs.zipWithIndex.map{case (qr, i) => qr.toString(i)}.mkString("\n\n\n") + "\n" } - stringToFile(new File(testCase.resultFile), goldenOutput) + val resultFile = new File(testCase.resultFile); + val parent = resultFile.getParentFile(); + if (!parent.exists()) { + assert(parent.mkdirs(), "Could not create directory: " + parent) + } + stringToFile(resultFile, goldenOutput) } // Read back the golden file. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 2ef8b18c0461..25dbecb5894e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -263,12 +263,12 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1, 2.0) :: Row(1, 2.0) :: Nil) checkAnswer( - sql("select * from l where a not in (select c from t where b < d)"), - Row(1, 2.0) :: Row(1, 2.0) :: Row(3, 3.0) :: Nil) + sql("select * from l where (a, b) not in (select c, d from t) and a < 4"), + Row(1, 2.0) :: Row(1, 2.0) :: Row(2, 1.0) :: Row(2, 1.0) :: Row(3, 3.0) :: Nil) // Empty sub-query checkAnswer( - sql("select * from l where a not in (select c from r where c > 10 and b < d)"), + sql("select * from l where (a, b) not in (select c, d from r where c > 10)"), Row(1, 2.0) :: Row(1, 2.0) :: Row(2, 1.0) :: Row(2, 1.0) :: Row(3, 3.0) :: Row(null, null) :: Row(null, 5.0) :: Row(6, null) :: Nil) From de655d0d00693a2bc98fddad7be6f55fb2690555 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Tue, 3 Jan 2017 20:26:45 -0500 Subject: [PATCH 11/13] Add descriptive comment --- .../org/apache/spark/sql/catalyst/optimizer/subquery.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index ce4816198839..68ebeaaaf52b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -68,12 +68,15 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Note that will almost certainly be planned as a Broadcast Nested Loop join. // Use EXISTS if performance matters to you. val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) + // Expand the NOT IN expression with the NULL-aware semantic + // to its full form. That is from: + // (a1,b1,...) = (a2,b2,...) + // to + // (a1=a2 OR isnull(a1=a2)) AND (b1=b2 OR isnull(b1=b2)) AND ... val joinConds = splitConjunctivePredicates(joinCond.get) val isNulls = joinConds.map(IsNull) val pairs = joinConds.zip(isNulls).map(Or.tupled).reduceLeft(And) Join(outerPlan, sub, LeftAnti, Option(pairs)) - // val x = isNulls.reduceLeft(Or) - // Join(outerPlan, sub, LeftAnti, Option(Or(x, joinCond.get))) case (p, predicate) => val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) Project(p.output, Filter(newCond.get, inputPlan)) From f3f7773dbb169cbe63e05112c7a37756d7aa7789 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 5 Jan 2017 20:32:22 -0500 Subject: [PATCH 12/13] code optimization --- .../org/apache/spark/sql/catalyst/optimizer/subquery.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 68ebeaaaf52b..4d62cce9da0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -74,8 +74,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // to // (a1=a2 OR isnull(a1=a2)) AND (b1=b2 OR isnull(b1=b2)) AND ... val joinConds = splitConjunctivePredicates(joinCond.get) - val isNulls = joinConds.map(IsNull) - val pairs = joinConds.zip(isNulls).map(Or.tupled).reduceLeft(And) + val pairs = joinConds.map(c => Or(c, IsNull(c))).reduceLeft(And) Join(outerPlan, sub, LeftAnti, Option(pairs)) case (p, predicate) => val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) From 6a1a4159f54397ef81baaf618e3b816866f589e9 Mon Sep 17 00:00:00 2001 From: Nattavut Sutyanyong Date: Thu, 12 Jan 2017 20:18:23 -0500 Subject: [PATCH 13/13] remove trail whitespaces --- .../inputs/subquery/in-subquery/not-in-multiple-columns.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql index 2a64e31a6145..db668505adf2 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-multiple-columns.sql @@ -2,7 +2,7 @@ -- The data sets are populated as follows: -- 1) When T1.A1 = T2.A2 --- 1.1) T1.B1 = T2.B2 +-- 1.1) T1.B1 = T2.B2 -- 1.2) T1.B1 = T2.B2 returns false -- 1.3) T1.B1 is null -- 1.4) T2.B2 is null @@ -10,7 +10,7 @@ -- 3) When T1.A1 is null -- 4) When T1.A2 is null --- T1.A1 T1.B1 T2.A2 T2.B2 +-- T1.A1 T1.B1 T2.A2 T2.B2 -- ----- ----- ----- ----- -- 1 1 1 1 (1.1) -- 1 3 (1.2)