-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17091][SQL] ParquetFilters rewrite IN to OR of Eq #14671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,20 +40,20 @@ import org.apache.spark.util.{AccumulatorContext, LongAccumulator} | |
| * NOTE: | ||
| * | ||
| * 1. `!(a cmp b)` is always transformed to its negated form `a cmp' b` by the | ||
| * `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)` | ||
| * results in a `GtEq` filter predicate rather than a `Not`. | ||
| * `BooleanSimplification` optimization rule whenever possible. As a result, predicate | ||
| * `!(a < 1)` results in a `GtEq` filter predicate rather than a `Not`. | ||
| * | ||
| * 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to ensure the inferred | ||
| * data type is nullable. | ||
| */ | ||
| class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { | ||
|
|
||
| private def checkFilterPredicate( | ||
| df: DataFrame, | ||
| predicate: Predicate, | ||
| filterClass: Class[_ <: FilterPredicate], | ||
| checker: (DataFrame, Seq[Row]) => Unit, | ||
| expected: Seq[Row]): Unit = { | ||
| df: DataFrame, | ||
| predicate: Predicate, | ||
| filterClass: Class[_ <: FilterPredicate], | ||
| checker: (DataFrame, Seq[Row]) => Unit, | ||
| expected: Seq[Row]): Unit = { | ||
| val output = predicate.collect { case a: Attribute => a }.distinct | ||
|
|
||
| withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { | ||
|
|
@@ -86,20 +86,20 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |
| } | ||
|
|
||
| private def checkFilterPredicate | ||
| (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) | ||
| (implicit df: DataFrame): Unit = { | ||
| (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) | ||
| (implicit df: DataFrame): Unit = { | ||
| checkFilterPredicate(df, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected) | ||
| } | ||
|
|
||
| private def checkFilterPredicate[T] | ||
| (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: T) | ||
| (implicit df: DataFrame): Unit = { | ||
| (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: T) | ||
| (implicit df: DataFrame): Unit = { | ||
| checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) | ||
| } | ||
|
|
||
| private def checkBinaryFilterPredicate | ||
| (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) | ||
| (implicit df: DataFrame): Unit = { | ||
| (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) | ||
| (implicit df: DataFrame): Unit = { | ||
| def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = { | ||
| assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) { | ||
| df.rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted | ||
|
|
@@ -110,8 +110,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |
| } | ||
|
|
||
| private def checkBinaryFilterPredicate | ||
| (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte]) | ||
| (implicit df: DataFrame): Unit = { | ||
| (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte]) | ||
| (implicit df: DataFrame): Unit = { | ||
| checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) | ||
| } | ||
|
|
||
|
|
@@ -369,7 +369,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |
|
|
||
| test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { | ||
| import testImplicits._ | ||
| Seq("true", "false").map { vectorized => | ||
| Seq("true", "false").foreach { vectorized => | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I remember I was told that this case is even essential in some cases, #14416 (comment)
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, maybe this belongs in a separate PR as a followup to the previous one, I just changed it because Intellij was yelling at me :P |
||
| withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", | ||
| SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", | ||
| SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { | ||
|
|
@@ -535,25 +535,48 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex | |
| import testImplicits._ | ||
|
|
||
| withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", | ||
| SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { | ||
| SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { | ||
| withTempPath { dir => | ||
| val path = s"${dir.getCanonicalPath}/table" | ||
| (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path) | ||
|
|
||
| Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) => | ||
| withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) { | ||
| val accu = new LongAccumulator | ||
| accu.register(sparkContext, Some("numRowGroups")) | ||
|
|
||
| val df = spark.read.parquet(path).filter("a < 100") | ||
| df.foreachPartition(_.foreach(v => accu.add(0))) | ||
| df.collect | ||
|
|
||
| val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups") | ||
| assert(numRowGroups.isDefined) | ||
| assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value)) | ||
| AccumulatorContext.remove(accu.id) | ||
| Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)) | ||
| .foreach { case (push, func) => | ||
| withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) { | ||
| val accu = new LongAccumulator | ||
| accu.register(sparkContext, Some("numRowGroups")) | ||
|
|
||
| val df = spark.read.parquet(path).filter("a < 100") | ||
| df.foreachPartition(_.foreach(v => accu.add(0))) | ||
| df.collect | ||
|
|
||
| val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups") | ||
| assert(numRowGroups.isDefined) | ||
| assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value)) | ||
| AccumulatorContext.remove(accu.id) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("In filters are pushed down") { | ||
| import testImplicits._ | ||
| withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { | ||
| withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { | ||
| withTempPath { dir => | ||
| val path = s"${dir.getCanonicalPath}/table1" | ||
| (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) | ||
| val df = spark.read.parquet(path).where("b in (0,2)") | ||
| assert(stripSparkFilter(df).count == 3) | ||
| val df1 = spark.read.parquet(path).where("not (b in (1))") | ||
| assert(stripSparkFilter(df1).count == 3) | ||
| val df2 = spark.read.parquet(path).where("not (b in (1,3) or a <= 2)") | ||
| assert(stripSparkFilter(df2).count == 2) | ||
| val df3 = spark.read.parquet(path).where("not (b in (1,3) and a <= 2)") | ||
| assert(stripSparkFilter(df3).count == 4) | ||
| val df4 = spark.read.parquet(path).where("not (a <= 2)") | ||
| assert(stripSparkFilter(df4).count == 3) | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the indentation changes, I can revert all of these.