Skip to content

Commit 58de88c

Browse files
committed
[SPARK-22548][SQL] Incorrect nested AND expression pushed down to JDBC data source
1 parent bf0c0ae commit 58de88c

2 files changed

Lines changed: 29 additions & 1 deletion

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,10 @@ object DataSourceStrategy {
497497
Some(sources.IsNotNull(a.name))
498498

499499
case expressions.And(left, right) =>
500-
(translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
500+
for {
501+
leftFilter <- translateFilter(left)
502+
rightFilter <- translateFilter(right)
503+
} yield sources.And(leftFilter, rightFilter)
501504

502505
case expressions.Or(left, right) =>
503506
for {

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,33 @@ class JDBCSuite extends SparkFunSuite
296296
// The older versions of spark have this kind of bugs in parquet data source.
297297
val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')")
298298
val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')")
299+
val df3 = sql("SELECT * FROM foobar WHERE (THEID > 0 AND NAME = 'mary') OR (NAME = 'fred')")
300+
val df4 = sql("SELECT * FROM foobar " +
301+
"WHERE (THEID > 0 AND TRIM(NAME) = 'mary') OR (NAME = 'fred')")
302+
val df5 = sql("SELECT * FROM foobar " +
303+
"WHERE THEID > 0 AND TRIM(NAME) = 'mary' AND LENGTH(NAME) > 3")
304+
val df6 = sql("SELECT * FROM foobar " +
305+
"WHERE THEID < 0 OR NAME = 'mary' OR NAME = 'fred'")
306+
val df7 = sql("SELECT * FROM foobar " +
307+
"WHERE THEID < 0 OR TRIM(NAME) = 'mary' OR NAME = 'fred'")
308+
val df8 = sql("SELECT * FROM foobar " +
309+
"WHERE NOT((THEID < 0 OR NAME != 'mary') AND (THEID != 1 OR NAME != 'fred'))")
310+
val df9 = sql("SELECT * FROM foobar " +
311+
"WHERE NOT((THEID < 0 OR NAME != 'mary') AND (THEID != 1 OR TRIM(NAME) != 'fred'))")
312+
val df10 = sql("SELECT * FROM foobar " +
313+
"WHERE (NOT(THEID < 0 OR TRIM(NAME) != 'mary')) OR (THEID = 1 AND NAME = 'fred')")
314+
299315
assert(df1.collect.toSet === Set(Row("mary", 2)))
300316
assert(df2.collect.toSet === Set(Row("mary", 2)))
317+
assert(df3.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
318+
assert(df4.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
319+
assert(df5.collect.toSet === Set(Row("mary", 2)))
320+
assert(df6.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
321+
assert(df7.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
322+
assert(df8.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
323+
assert(df9.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
324+
assert(df10.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
325+
301326

302327
def checkNotPushdown(df: DataFrame): DataFrame = {
303328
val parentPlan = df.queryExecution.executedPlan

0 commit comments

Comments
 (0)