Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ case class ParquetScanBuilder(
// are combined with filter or group by
// e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
// SELECT COUNT(col1) FROM t GROUP BY col2
// Todo: 1. add support if groupby column is partition col
// (https://issues.apache.org/jira/browse/SPARK-36646)
// 2. add support if filter col is partition col
// (https://issues.apache.org/jira/browse/SPARK-36647)
// However, if the filter or group by is on partition column,
// max/min/count can still be pushed down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So group by on partition column is not supported yet. Then this comment is not correct.

// Todo: add support if groupby column is partition col
// (https://issues.apache.org/jira/browse/SPARK-36646)
return false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ abstract class ParquetAggregatePushDownSuite
}
}

test("aggregate push down - query with filter not push down") {
test("aggregate push down - aggregate with data filter cannot be pushed down") {
val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19),
(9, "mno", 7), (2, null, 7))
withParquetTable(data, "t") {
Expand All @@ -240,6 +240,30 @@ abstract class ParquetAggregatePushDownSuite
}
}

test("aggregate push down - aggregate with partition filter can be pushed down") {
withTempPath { dir =>
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").parquet(dir.getCanonicalPath)
withTempView("tmp") {
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
val enableVectorizedReader = Seq("false", "true")
for (testVectorizedReader <- enableVectorizedReader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can be more scala here, but not a big deal:

Seq("false", "true").foreach { enableVectorizedReader =>
  withSQLConf(...) {
    ...
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c21 Thanks for reviewing! I fixed this.

withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
vectorizedReaderEnabledKey -> testVectorizedReader) {
val max = sql("SELECT max(id) FROM tmp WHERE p = 0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add other two supported aggregate functions? And how about group by on partition column case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.
Group by on partition column is a little more complicated and needs some code changes: currently, we only have the aggregate values in the returned row. For group by on partition column, we will need to pass down the partition col value and prepend that value to the aggregation row. I will have a separate PR for that work.

max.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregation: [MAX(id)]"
checkKeywordsExistsInExplain(max, expected_plan_fragment)
}
checkAnswer(max, Seq(Row(9)))
}
}
}
}
}

test("aggregate push down - push down only if all the aggregates can be pushed down") {
val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19),
(9, "mno", 7), (2, null, 7))
Expand Down