From b1d8b633c702d851fae3dd9b0ea5fcbaa1b40b39 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 22 Dec 2015 09:31:30 +0900 Subject: [PATCH 01/12] Implement JdbcRelation#unhandledFilters --- .../datasources/jdbc/JDBCRelation.scala | 13 +++++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 57 ++++++++++++------- 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 572be823ca87..e85f2d60cdb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -90,6 +90,19 @@ private[sql] case class JDBCRelation( override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + // Check if JDBCRDD#compileFilter can accept input filters + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + filters.filterNot(canCompileFilter) + } + + private def canCompileFilter(filter: Filter): Boolean = filter match { + case EqualTo(_, _) | Not(EqualTo(_, _)) => true + case LessThan(_, _) | LessThanOrEqual(_, _) => true + case GreaterThan(_, _) | GreaterThanOrEqual(_, _) => true + case IsNull(_) | IsNotNull(_) => true + case _ => false + } + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] JDBCRDD.scanTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 518607543b48..dd6d9e74fc0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -18,16 +18,21 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal -import java.sql.{Date, DriverManager, Timestamp} -import java.util.{Calendar, GregorianCalendar, Properties} +import java.sql.Date +import java.sql.DriverManager +import java.sql.Timestamp +import java.util.Calendar +import java.util.GregorianCalendar +import java.util.Properties import org.h2.jdbc.JdbcSQLException import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester - import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.execution.ExplainCommand +import org.apache.spark.sql.execution.PhysicalRDD import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.sources._ @@ -183,26 +188,40 @@ class JDBCSuite extends SparkFunSuite } test("SELECT * WHERE (simple predicates)") { - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + val checkFilterPushdown = (df: DataFrame) => { + val schema = df.schema + val parentPlan = df.queryExecution.executedPlan + assert(parentPlan.isInstanceOf[PhysicalRDD]) + assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + val rdd = parentPlan.execute().map(row => Row.fromSeq(row.toSeq(schema))) + sqlContext.createDataFrame(rdd, schema) + } + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")) + .collect().size == 1) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")) + .collect().size == 2) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) .collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + "AND THEID = 2")).collect().size == 2) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) - assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")) + .collect().size == 1) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")) + .collect().size == 1) + assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")) + .collect().size == 1) + assert(checkFilterPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) + assert(checkFilterPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")) + .collect().size == 0) // This is a test to reflect discussion in SPARK-12218. // The older versions of spark have this kind of bugs in parquet data source. From 5460c5679f4d3cd8a03fb29f1e2ebdefde0029d1 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 22 Dec 2015 13:46:25 +0900 Subject: [PATCH 02/12] Fix style errors --- .../datasources/jdbc/JDBCRelation.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 40 ++++++++----------- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index e85f2d60cdb9..25b639766fa6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -97,7 +97,7 @@ private[sql] case class JDBCRelation( private def canCompileFilter(filter: Filter): Boolean = filter match { case EqualTo(_, _) | Not(EqualTo(_, _)) => true - case LessThan(_, _) | LessThanOrEqual(_, _) => true + case LessThan(_, _) | LessThanOrEqual(_, _) => true case GreaterThan(_, _) | GreaterThanOrEqual(_, _) => true case IsNull(_) | IsNotNull(_) => true case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index dd6d9e74fc0e..c57698aaaa66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -188,7 +188,7 @@ class JDBCSuite extends SparkFunSuite } test("SELECT * WHERE (simple predicates)") { - val checkFilterPushdown = (df: DataFrame) => { + val doFilterPushdown = (df: DataFrame) => { val schema = df.schema val parentPlan = df.queryExecution.executedPlan assert(parentPlan.isInstanceOf[PhysicalRDD]) @@ -196,32 +196,26 @@ class JDBCSuite extends SparkFunSuite val rdd = parentPlan.execute().map(row => Row.fromSeq(row.toSeq(schema))) sqlContext.createDataFrame(rdd, schema) } - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")) - .collect().size == 1) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) .collect().size == 2) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) .collect().size == 2) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) .collect().size == 2) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) - .collect().size == 2) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + "AND THEID = 2")).collect().size == 2) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")) - .collect().size == 1) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")) - .collect().size == 1) - assert(checkFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")) - .collect().size == 1) - assert(checkFilterPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) - assert(checkFilterPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")) - .collect().size == 0) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) + assert(doFilterPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) + assert(doFilterPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) // This is a test to reflect discussion in SPARK-12218. // The older versions of spark have this kind of bugs in parquet data source. From afefdaac73486156128d099f0323d4ac300370ad Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 22 Dec 2015 19:41:44 +0900 Subject: [PATCH 03/12] Apply comments --- .../datasources/jdbc/JDBCRelation.scala | 4 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 50 +++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 25b639766fa6..23ff2a774a84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -90,7 +90,9 @@ private[sql] case class JDBCRelation( override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) - // Check if JDBCRDD#compileFilter can accept input filters + /** + * Check if [[JDBCRDD.compileFilter]] can accept input filters. + */ override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { filters.filterNot(canCompileFilter) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index c57698aaaa66..f465f2076cc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,12 +26,11 @@ import java.util.GregorianCalendar import java.util.Properties import org.h2.jdbc.JdbcSQLException -import org.scalatest.BeforeAndAfter -import org.scalatest.PrivateMethodTester -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.Row +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} + import org.apache.spark.sql.execution.ExplainCommand +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.execution.PhysicalRDD import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD @@ -188,34 +187,33 @@ class JDBCSuite extends SparkFunSuite } test("SELECT * WHERE (simple predicates)") { - val doFilterPushdown = (df: DataFrame) => { - val schema = df.schema + def checkPlan(df: DataFrame): DataFrame = { val parentPlan = df.queryExecution.executedPlan + // Check if SparkPlan Filter is removed in a physical plan and + // the plan only has PhysicalRDD to scan JDBCRelation. assert(parentPlan.isInstanceOf[PhysicalRDD]) assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) - val rdd = parentPlan.execute().map(row => Row.fromSeq(row.toSeq(schema))) - sqlContext.createDataFrame(rdd, schema) + df } - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) - .collect().size == 2) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) + assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) + assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) + assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) .collect().size == 2) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")).collect().size == 2) + assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) .collect().size == 2) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + "AND THEID = 2")).collect().size == 2) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) - assert(doFilterPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) - assert(doFilterPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) - assert(doFilterPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) + assert(checkPlan(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) + assert(checkPlan(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) // This is a test to reflect discussion in SPARK-12218. // The older versions of spark have this kind of bugs in parquet data source. From 3dc75109fbf64e2ec7dc0ab8c601024aeea2d489 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 23 Dec 2015 00:09:11 +0900 Subject: [PATCH 04/12] Simplify unhandledFilters --- .../execution/datasources/jdbc/JDBCRelation.scala | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 23ff2a774a84..16a9acae9a77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -90,19 +90,9 @@ private[sql] case class JDBCRelation( override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) - /** - * Check if [[JDBCRDD.compileFilter]] can accept input filters. - */ + // Check if JDBCRDD.compileFilter can accept input filters override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { - filters.filterNot(canCompileFilter) - } - - private def canCompileFilter(filter: Filter): Boolean = filter match { - case EqualTo(_, _) | Not(EqualTo(_, _)) => true - case LessThan(_, _) | LessThanOrEqual(_, _) => true - case GreaterThan(_, _) | GreaterThanOrEqual(_, _) => true - case IsNull(_) | IsNotNull(_) => true - case _ => false + filters.filterNot(JDBCRDD.compileFilter(_) != null) } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { From e91f3040b181d14f6560ccba80a0bdce6bd2a443 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 24 Dec 2015 11:18:25 +0900 Subject: [PATCH 05/12] Remove double negatives --- .../spark/sql/execution/datasources/jdbc/JDBCRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 16a9acae9a77..ca5d03b103d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -92,7 +92,7 @@ private[sql] case class JDBCRelation( // Check if JDBCRDD.compileFilter can accept input filters override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { - filters.filterNot(JDBCRDD.compileFilter(_) != null) + filters.filter(JDBCRDD.compileFilter(_) == null) } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { From 6c36232016ceaa9245fd0f74c43d0cb6959389fa Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 24 Dec 2015 14:43:22 +0900 Subject: [PATCH 06/12] Add a test for pruning columns in DataSourceStrategy --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f465f2076cc0..5be227df1f60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -18,12 +18,8 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal -import java.sql.Date -import java.sql.DriverManager -import java.sql.Timestamp -import java.util.Calendar -import java.util.GregorianCalendar -import java.util.Properties +import java.sql.{Date, DriverManager, Timestamp} +import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -223,6 +219,12 @@ class JDBCSuite extends SparkFunSuite assert(df2.collect.toSet === Set(Row("mary", 2))) } + test("SELECT COUNT(1) WHERE (predicates)") { + // Check if an answer is correct when filters are pushed down into JDBC data sources + // and some columns are pruned in DataSourceStrategy. + assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1))) + } + test("SELECT * WHERE (quoted strings)") { assert(sql("select * from foobar").where('NAME === "joe 'foo' \"bar\"").collect().size === 1) } From ca63c72544c88ca20318cc769b52aa9cb97b5044 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 5 Jan 2016 10:24:59 +0900 Subject: [PATCH 07/12] Add private[jdbc] in JDBCRDD#compileFilter --- .../apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index d867e144e517..befba867bc46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -189,7 +189,7 @@ private[sql] object JDBCRDD extends Logging { * Turns a single Filter into a String representing a SQL expression. * Returns None for an unhandled filter. */ - private def compileFilter(f: Filter): Option[String] = { + private[jdbc] def compileFilter(f: Filter): Option[String] = { Option(f match { case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" case EqualNullSafe(attr, value) => From e3a6755a82f8b2d574e37302beaa4bbb9b48936f Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 5 Feb 2016 22:19:59 +0900 Subject: [PATCH 08/12] Fix a bug in JDBCRelation --- .../spark/sql/execution/datasources/jdbc/JDBCRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index ca5d03b103d0..ee6373d03e1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -92,7 +92,7 @@ private[sql] case class JDBCRelation( // Check if JDBCRDD.compileFilter can accept input filters override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { - filters.filter(JDBCRDD.compileFilter(_) == null) + filters.filter(JDBCRDD.compileFilter(_).isEmpty) } override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { From fd69b88a7f9d94a4db06e98f08146d9dfe912415 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Mon, 8 Feb 2016 01:44:24 +0900 Subject: [PATCH 09/12] Add tests for the case where push-down filters not applied --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5be227df1f60..da72564a906f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -183,7 +183,7 @@ class JDBCSuite extends SparkFunSuite } test("SELECT * WHERE (simple predicates)") { - def checkPlan(df: DataFrame): DataFrame = { + def checkPushdown(df: DataFrame): DataFrame = { val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is removed in a physical plan and // the plan only has PhysicalRDD to scan JDBCRelation. @@ -191,25 +191,26 @@ class JDBCSuite extends SparkFunSuite assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) df } - assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) - assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) - assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')")) .collect().size == 2) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")).collect().size == 2) - assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')")) .collect().size == 2) - assert(checkPlan(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'")) + .collect().size == 2) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' " + "AND THEID = 2")).collect().size == 2) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) - assert(checkPlan(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) - assert(checkPlan(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) - assert(checkPlan(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1) + assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0) // This is a test to reflect discussion in SPARK-12218. // The older versions of spark have this kind of bugs in parquet data source. @@ -217,6 +218,18 @@ class JDBCSuite extends SparkFunSuite val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')") assert(df1.collect.toSet === Set(Row("mary", 2))) assert(df2.collect.toSet === Set(Row("mary", 2))) + + def checkNotPushdown(df: DataFrame): DataFrame = { + val parentPlan = df.queryExecution.executedPlan + // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD + // cannot compile given predicates. + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.Filter]) + df + } + sql("SELECT * FROM foobar WHERE THEID < 1").explain(true) + sql("SELECT * FROM foobar WHERE (THEID + 1) < 2").explain(true) + assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0) + assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2) } test("SELECT COUNT(1) WHERE (predicates)") { From 425118781e062c834f48694e8941f0e7b7ff64c5 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 9 Feb 2016 18:34:03 +0900 Subject: [PATCH 10/12] Remove unnecessary lines --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index da72564a906f..a005202db201 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -226,8 +226,6 @@ class JDBCSuite extends SparkFunSuite assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.Filter]) df } - sql("SELECT * FROM foobar WHERE THEID < 1").explain(true) - sql("SELECT * FROM foobar WHERE (THEID + 1) < 2").explain(true) assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0) assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2) } From 425060292469526576eab38c71c565bd31c0c1c3 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 9 Feb 2016 18:34:47 +0900 Subject: [PATCH 11/12] Add comments for appended tests --- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index a005202db201..f139c363d6de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -231,8 +231,12 @@ class JDBCSuite extends SparkFunSuite } test("SELECT COUNT(1) WHERE (predicates)") { - // Check if an answer is correct when filters are pushed down into JDBC data sources - // and some columns are pruned in DataSourceStrategy. + // Check if an answer is correct when Filter is removed from operations such as count() which + // does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in + // org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters + // are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD + // correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more + // discussions. assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1))) } From 7a7b9fa352bc8e28491d3ba22d2a68b943575044 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 9 Feb 2016 23:40:47 +0900 Subject: [PATCH 12/12] Fix tests according to WholeStageCodegen implemented --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f139c363d6de..7a0f7abaa1ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -223,7 +223,9 @@ class JDBCSuite extends SparkFunSuite val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD // cannot compile given predicates. - assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.Filter]) + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) + val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] + assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.Filter]) df } assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)