Skip to content

Commit 6f710f9

Browse files
maropuyhuai
authored andcommitted
[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter
Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx' Current plan: ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == +- Filter (col0#0 = xxx) +- Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` This patch enables a plan below; ``` == Optimized Logical Plan == Project [col0#0,col1#1] +- Filter (col0#0 = xxx) +- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver}) == Physical Plan == Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)] ``` Author: Takeshi YAMAMURO <[email protected]> Closes #10427 from maropu/RemoveFilterInJdbcScan.
1 parent 9267bc6 commit 6f710f9

File tree

3 files changed

+56
-21
lines changed

3 files changed

+56
-21
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ private[sql] object JDBCRDD extends Logging {
189189
* Turns a single Filter into a String representing a SQL expression.
190190
* Returns None for an unhandled filter.
191191
*/
192-
private def compileFilter(f: Filter): Option[String] = {
192+
private[jdbc] def compileFilter(f: Filter): Option[String] = {
193193
Option(f match {
194194
case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
195195
case EqualNullSafe(attr, value) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(
9090

9191
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)
9292

93+
// Check if JDBCRDD.compileFilter can accept input filters
94+
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
95+
filters.filter(JDBCRDD.compileFilter(_).isEmpty)
96+
}
97+
9398
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
9499
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
95100
JDBCRDD.scanTable(

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ import java.sql.{Date, DriverManager, Timestamp}
2222
import java.util.{Calendar, GregorianCalendar, Properties}
2323

2424
import org.h2.jdbc.JdbcSQLException
25-
import org.scalatest.BeforeAndAfter
26-
import org.scalatest.PrivateMethodTester
25+
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
2726

28-
import org.apache.spark.SparkFunSuite
29-
import org.apache.spark.sql.Row
3027
import org.apache.spark.sql.execution.ExplainCommand
28+
import org.apache.spark.SparkFunSuite
29+
import org.apache.spark.sql.{DataFrame, Row}
30+
import org.apache.spark.sql.execution.PhysicalRDD
3131
import org.apache.spark.sql.execution.datasources.LogicalRelation
3232
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
3333
import org.apache.spark.sql.sources._
@@ -183,33 +183,63 @@ class JDBCSuite extends SparkFunSuite
183183
}
184184

185185
test("SELECT * WHERE (simple predicates)") {
186-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
187-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
188-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
189-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
190-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
191-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
192-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
193-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
186+
def checkPushdown(df: DataFrame): DataFrame = {
187+
val parentPlan = df.queryExecution.executedPlan
188+
// Check if SparkPlan Filter is removed in a physical plan and
189+
// the plan only has PhysicalRDD to scan JDBCRelation.
190+
assert(parentPlan.isInstanceOf[PhysicalRDD])
191+
assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
192+
df
193+
}
194+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
195+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
196+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
197+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
198+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
199+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
200+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
201+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
194202
.collect().size == 2)
195-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
203+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
196204
.collect().size == 2)
197-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
205+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
198206
.collect().size == 2)
199-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
207+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
200208
+ "AND THEID = 2")).collect().size == 2)
201-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
202-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
203-
assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
204-
assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
205-
assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
209+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
210+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
211+
assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
212+
assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
213+
assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
206214

207215
// This is a test to reflect discussion in SPARK-12218.
208216
// The older versions of spark have this kind of bugs in parquet data source.
209217
val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')")
210218
val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')")
211219
assert(df1.collect.toSet === Set(Row("mary", 2)))
212220
assert(df2.collect.toSet === Set(Row("mary", 2)))
221+
222+
def checkNotPushdown(df: DataFrame): DataFrame = {
223+
val parentPlan = df.queryExecution.executedPlan
224+
// Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
225+
// cannot compile given predicates.
226+
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
227+
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
228+
assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.Filter])
229+
df
230+
}
231+
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
232+
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
233+
}
234+
235+
test("SELECT COUNT(1) WHERE (predicates)") {
236+
// Check if an answer is correct when Filter is removed from operations such as count() which
237+
// does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in
238+
// org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters
239+
// are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD
240+
// correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more
241+
// discussions.
242+
assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1)))
213243
}
214244

215245
test("SELECT * WHERE (quoted strings)") {

0 commit comments

Comments
 (0)