Skip to content

Commit ff6ea77

Browse files
committed
[SPARK-46092][SQL] Don't push down Parquet row group filters that overflow
This change adds a check for overflows when creating Parquet row group filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping row groups if the predicate value doesn't fit in an INT. This can happen if the read schema is specified as LONG, e.g via `.schema("col LONG")` While the Parquet readers don't support reading INT32 into a LONG, the overflow can lead to row groups being incorrectly skipped, bypassing the reader altogether and producing incorrect results instead of failing. Reading a parquet file containing INT32 values with a read schema specified as LONG can produce incorrect results today: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` will return an empty result. The correct result is either: - Failing the query if the parquet reader doesn't support upcasting integers to longs (all parquet readers in Spark today) - Return result `[0]` if the parquet reader supports that upcast (no readers in Spark as of now, but I'm looking into adding this capability). The following: ``` Seq(0).toDF("a").write.parquet(path) spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect() ``` produces an (incorrect) empty result before this change. After this change, the read will fail, raising an error about the unsupported conversion from INT to LONG in the parquet reader. - Added tests to `ParquetFilterSuite` to ensure that no row group filter is created when the predicate value overflows or when the value type isn't compatible with the parquet type - Added test to `ParquetQuerySuite` covering the correctness issue described above. No Closes apache#44006 from johanl-db/SPARK-46092-row-group-skipping-overflow. Authored-by: Johan Lasperas <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent e428fe9 commit ff6ea77

3 files changed

Lines changed: 120 additions & 2 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20-
import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
20+
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
2121
import java.math.{BigDecimal => JBigDecimal}
2222
import java.sql.{Date, Timestamp}
2323
import java.time.{Instant, LocalDate}
@@ -594,7 +594,13 @@ class ParquetFilters(
594594
private def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
595595
value == null || (nameToParquetField(name).fieldType match {
596596
case ParquetBooleanType => value.isInstanceOf[JBoolean]
597-
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
597+
case ParquetByteType | ParquetShortType | ParquetIntegerType => value match {
598+
// Byte/Short/Int are all stored as INT32 in Parquet so filters are built using type Int.
599+
// We don't create a filter if the value would overflow.
600+
case _: JByte | _: JShort | _: Integer => true
601+
case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= Int.MaxValue
602+
case _ => false
603+
}
598604
case ParquetLongType => value.isInstanceOf[JLong]
599605
case ParquetFloatType => value.isInstanceOf[JFloat]
600606
case ParquetDoubleType => value.isInstanceOf[JDouble]

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.io.File
21+
import java.lang.{Double => JDouble, Float => JFloat, Long => JLong}
2122
import java.math.{BigDecimal => JBigDecimal}
2223
import java.nio.charset.StandardCharsets
2324
import java.sql.{Date, Timestamp}
@@ -896,6 +897,76 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
896897
}
897898
}
898899

900+
test("don't push down filters that would result in overflows") {
901+
val schema = StructType(Seq(
902+
StructField("cbyte", ByteType),
903+
StructField("cshort", ShortType),
904+
StructField("cint", IntegerType)
905+
))
906+
907+
val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
908+
val parquetFilters = createParquetFilters(parquetSchema)
909+
910+
for {
911+
column <- Seq("cbyte", "cshort", "cint")
912+
value <- Seq(JLong.MAX_VALUE, JLong.MIN_VALUE).map(JLong.valueOf)
913+
} {
914+
val filters = Seq(
915+
sources.LessThan(column, value),
916+
sources.LessThanOrEqual(column, value),
917+
sources.GreaterThan(column, value),
918+
sources.GreaterThanOrEqual(column, value),
919+
sources.EqualTo(column, value),
920+
sources.EqualNullSafe(column, value),
921+
sources.Not(sources.EqualTo(column, value)),
922+
sources.In(column, Array(value))
923+
)
924+
for (filter <- filters) {
925+
assert(parquetFilters.createFilter(filter).isEmpty,
926+
s"Row group filter $filter shouldn't be pushed down.")
927+
}
928+
}
929+
}
930+
931+
test("don't push down filters when value type doesn't match column type") {
932+
val schema = StructType(Seq(
933+
StructField("cbyte", ByteType),
934+
StructField("cshort", ShortType),
935+
StructField("cint", IntegerType),
936+
StructField("clong", LongType),
937+
StructField("cfloat", FloatType),
938+
StructField("cdouble", DoubleType),
939+
StructField("cboolean", BooleanType),
940+
StructField("cstring", StringType),
941+
StructField("cdate", DateType),
942+
StructField("ctimestamp", TimestampType),
943+
StructField("cbinary", BinaryType),
944+
StructField("cdecimal", DecimalType(10, 0))
945+
))
946+
947+
val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
948+
val parquetFilters = createParquetFilters(parquetSchema)
949+
950+
val filters = Seq(
951+
sources.LessThan("cbyte", String.valueOf("1")),
952+
sources.LessThan("cshort", JBigDecimal.valueOf(1)),
953+
sources.LessThan("cint", JFloat.valueOf(JFloat.NaN)),
954+
sources.LessThan("clong", String.valueOf("1")),
955+
sources.LessThan("cfloat", JDouble.valueOf(1.0D)),
956+
sources.LessThan("cdouble", JFloat.valueOf(1.0F)),
957+
sources.LessThan("cboolean", String.valueOf("true")),
958+
sources.LessThan("cstring", Integer.valueOf(1)),
959+
sources.LessThan("cdate", Timestamp.valueOf("2018-01-01 00:00:00")),
960+
sources.LessThan("ctimestamp", Date.valueOf("2018-01-01")),
961+
sources.LessThan("cbinary", Integer.valueOf(1)),
962+
sources.LessThan("cdecimal", Integer.valueOf(1234))
963+
)
964+
for (filter <- filters) {
965+
assert(parquetFilters.createFilter(filter).isEmpty,
966+
s"Row group filter $filter shouldn't be pushed down.")
967+
}
968+
}
969+
899970
test("SPARK-6554: don't push down predicates which reference partition columns") {
900971
import testImplicits._
901972

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,47 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
901901
}
902902
}
903903
}
904+
905+
test("SPARK-37191: Merge schema for DecimalType with different precision") {
906+
withTempPath { path =>
907+
val data1 = Seq(Row(new BigDecimal("123456789.11")))
908+
val schema1 = StructType(StructField("col", DecimalType(12, 2)) :: Nil)
909+
910+
val data2 = Seq(Row(new BigDecimal("1234567890000.11")))
911+
val schema2 = StructType(StructField("col", DecimalType(17, 2)) :: Nil)
912+
913+
spark.createDataFrame(sparkContext.parallelize(data1, 1), schema1)
914+
.write.parquet(path.toString)
915+
spark.createDataFrame(sparkContext.parallelize(data2, 1), schema2)
916+
.write.mode("append").parquet(path.toString)
917+
918+
withAllParquetReaders {
919+
val res = spark.read.option("mergeSchema", "true").parquet(path.toString)
920+
assert(res.schema("col").dataType == DecimalType(17, 2))
921+
checkAnswer(res, data1 ++ data2)
922+
}
923+
}
924+
}
925+
926+
test("row group skipping doesn't overflow when reading into larger type") {
927+
withTempPath { path =>
928+
Seq(0).toDF("a").write.parquet(path.toString)
929+
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
930+
// to test both as this covers row group skipping.
931+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
932+
// Reading integer 'a' as a long isn't supported. Check that an exception is raised instead
933+
// of incorrectly skipping the single row group and producing incorrect results.
934+
val exception = intercept[SparkException] {
935+
spark.read
936+
.schema("a LONG")
937+
.parquet(path.toString)
938+
.where(s"a < ${Long.MaxValue}")
939+
.collect()
940+
}
941+
assert(exception.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
942+
}
943+
}
944+
}
904945
}
905946

906947
class ParquetV1QuerySuite extends ParquetQuerySuite {

0 commit comments

Comments
 (0)