diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala index c579867623e1..d844240a1a3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala @@ -130,10 +130,9 @@ case class ParquetScanBuilder( // are combined with filter or group by // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8 // SELECT COUNT(col1) FROM t GROUP BY col2 - // Todo: 1. add support if groupby column is partition col - // (https://issues.apache.org/jira/browse/SPARK-36646) - // 2. add support if filter col is partition col - // (https://issues.apache.org/jira/browse/SPARK-36647) + // However, if the filter is on partition column, max/min/count can still be pushed down + // Todo: add support if groupby column is partition col + // (https://issues.apache.org/jira/browse/SPARK-36646) return false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala index c795bd9ff338..4422a6a6cb7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala @@ -129,10 +129,9 @@ abstract class ParquetAggregatePushDownSuite .write.partitionBy("p").parquet(dir.getCanonicalPath) withTempView("tmp") { spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { val count = sql("SELECT COUNT(p) FROM tmp") count.queryExecution.optimizedPlan.collect { case _: DataSourceV2ScanRelation => @@ -221,7 +220,7 @@ abstract class ParquetAggregatePushDownSuite } } - test("aggregate push down - query with filter not push down") { + test("aggregate push down - aggregate with data filter cannot be pushed down") { val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19), (9, "mno", 7), (2, null, 7)) withParquetTable(data, "t") { @@ -240,6 +239,29 @@ abstract class ParquetAggregatePushDownSuite } } + test("aggregate push down - aggregate with partition filter can be pushed down") { + withTempPath { dir => + spark.range(10).selectExpr("id", "id % 3 as p") + .write.partitionBy("p").parquet(dir.getCanonicalPath) + withTempView("tmp") { + spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp"); + Seq("false", "true").foreach { enableVectorizedReader => + withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", + vectorizedReaderEnabledKey -> enableVectorizedReader) { + val max = sql("SELECT max(id), min(id), count(id) FROM tmp WHERE p = 0") + max.queryExecution.optimizedPlan.collect { + case _: DataSourceV2ScanRelation => + val expected_plan_fragment = + "PushedAggregation: [MAX(id), MIN(id), COUNT(id)]" + checkKeywordsExistsInExplain(max, expected_plan_fragment) + } + checkAnswer(max, Seq(Row(9, 0, 4))) + } + } + } + } + } + test("aggregate push down - push down only if all the aggregates can be pushed down") { val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19), (9, "mno", 7), (2, null, 7)) @@ -356,10 +378,9 @@ abstract class ParquetAggregatePushDownSuite spark.createDataFrame(rdd, schema).write.parquet(file.getCanonicalPath) withTempView("test") { spark.read.parquet(file.getCanonicalPath).createOrReplaceTempView("test") - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { val testMinWithTS = sql("SELECT min(StringCol), min(BooleanCol), min(ByteCol), " + "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), min(FloatCol), " + @@ -477,10 +498,9 @@ abstract class ParquetAggregatePushDownSuite } test("aggregate push down - column name case sensitivity") { - val enableVectorizedReader = Seq("false", "true") - for (testVectorizedReader <- enableVectorizedReader) { + Seq("false", "true").foreach { enableVectorizedReader => withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true", - vectorizedReaderEnabledKey -> testVectorizedReader) { + vectorizedReaderEnabledKey -> enableVectorizedReader) { withTempPath { dir => spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").parquet(dir.getCanonicalPath)