Skip to content

Commit cae8cc3

Browse files
lianchengRobert Kruszewski
authored andcommitted
[SPARK-17213][SQL] Disable Parquet filter push-down for string and binary columns due to PARQUET-686
This PR targets to both master and branch-2.1. ## What changes were proposed in this pull request? Due to PARQUET-686, Parquet doesn't do string comparison correctly while doing filter push-down for string columns. This PR disables filter push-down for both string and binary columns to work around this issue. Binary columns are also affected because some Parquet data models (like Hive) may store string columns as a plain Parquet `binary` instead of a `binary (UTF8)`. ## How was this patch tested? New test case added in `ParquetFilterSuite`. Author: Cheng Lian <[email protected]> Closes apache#16106 from liancheng/spark-17213-bad-string-ppd.
1 parent 1746299 commit cae8cc3

2 files changed

Lines changed: 26 additions & 3 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ private[parquet] object ParquetFilters {
4141
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
4242
case DoubleType =>
4343
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
44-
// Binary.fromString and Binary.fromByteArray don't accept null values
4544
case StringType =>
4645
(n: String, v: Any) => FilterApi.eq(
4746
binaryColumn(n),
@@ -69,6 +68,7 @@ private[parquet] object ParquetFilters {
6968
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
7069
case DoubleType =>
7170
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
71+
7272
case StringType =>
7373
(n: String, v: Any) => FilterApi.notEq(
7474
binaryColumn(n),
@@ -94,6 +94,7 @@ private[parquet] object ParquetFilters {
9494
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
9595
case DoubleType =>
9696
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
97+
9798
case StringType =>
9899
(n: String, v: Any) =>
99100
FilterApi.lt(binaryColumn(n),

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
261261
}
262262
}
263263

264-
test("filter pushdown - string") {
264+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
265+
ignore("filter pushdown - string") {
265266
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
266267
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
267268
checkFilterPredicate(
@@ -289,7 +290,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
289290
}
290291
}
291292

292-
test("filter pushdown - binary") {
293+
// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
294+
ignore("filter pushdown - binary") {
293295
implicit class IntToBinary(int: Int) {
294296
def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
295297
}
@@ -692,6 +694,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
692694

693695
test("Do not create Timestamp filters when interpreting from INT96") {
694696
val baseMillis = System.currentTimeMillis()
697+
695698
def base(): Timestamp = new Timestamp(baseMillis)
696699

697700
val timestamps = (0 to 3).map { i =>
@@ -724,4 +727,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
724727
checkNoFilterPredicate('_1 < timestamps(1) || '_1 > timestamps(2))
725728
}
726729
}
730+
731+
test("SPARK-17213: Broken Parquet filter push-down for string columns") {
732+
withTempPath { dir =>
733+
import testImplicits._
734+
735+
val path = dir.getCanonicalPath
736+
// scalastyle:off nonascii
737+
Seq("a", "é").toDF("name").write.parquet(path)
738+
// scalastyle:on nonascii
739+
740+
assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
741+
assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)
742+
743+
// scalastyle:off nonascii
744+
assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
745+
assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
746+
// scalastyle:on nonascii
747+
}
748+
}
727749
}

0 commit comments

Comments
 (0)