-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33673][SQL] Avoid push down partition filters to ParquetScan for DataSourceV2 #30652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @wangyum |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #132381 has finished for PR 30652 at commit
|
|
Thanks for fixing this so quickly. |
| val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold | ||
| val isCaseSensitive = sqlConf.caseSensitiveAnalysis | ||
| val parquetSchema = | ||
| new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the partition column is in file? I guess this might be why we push down partition filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good question, but it seems that the filter pushed down in DataSource V1 does not contain the filter related to partition columns too.
The dataFilters use to construct FileSourceScanExec and pass to ParquetFileFormat.buildReaderWithPartitionValues to build pushed filters also filtered out partition filters, am I right?
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
Lines 184 to 193 in e4d1c10
| // Partition keys are not available in the statistics of the files. | |
| // `dataColumns` might have partition columns, we need to filter them out. | |
| val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionColumns.contains) | |
| val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f => | |
| if (f.references.intersect(partitionSet).nonEmpty) { | |
| extractPredicatesWithinOutputSet(f, AttributeSet(dataColumnsWithoutPartitionCols)) | |
| } else { | |
| Some(f) | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe another way is refer to parquet FileMetaData#getSchema to determine which filter should be pushed down
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this is a correct fix. I have a pr to fix partition column is in file.
#30670
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should use readDataSchema() instead of dataSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address b9f8eb2 use readDataSchema() instead of dataSchema
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #132402 has finished for PR 30652 at commit
|
|
retest this please. |
|
Test build #132424 has finished for PR 30652 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Local test |
|
Test build #132456 has finished for PR 30652 at commit
|
|
Waiting for SPARK-33705 to fix 3 failed UTs in thriftsever module |
gengliangwang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Thanks all. Merged to master. |
|
Thanks for your review @wangyum @viirya @cloud-fan @gengliangwang ~ |
|
Test build #132749 has finished for PR 30652 at commit
|
### What changes were proposed in this pull request? not push down partition filter to `ORCScan` for DSv2 ### Why are the changes needed? Seems to me that partition filter is only used for partition pruning and shouldn't be pushed down to `ORCScan`. We don't push down partition filter to ORCScan in DSv1 ``` == Physical Plan == *(1) Filter (isnotnull(value#19) AND NOT (value#19 = a)) +- *(1) ColumnarToRow +- FileScan orc [value#19,p1#20,p2#21] Batched: true, DataFilters: [isnotnull(value#19), NOT (value#19 = a)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/pt/_5f4sxy56x70dv9zpz032f0m0000gn/T/spark-c1..., PartitionFilters: [isnotnull(p1#20), isnotnull(p2#21), (p1#20 = 1), (p2#21 = 2)], PushedFilters: [IsNotNull(value), Not(EqualTo(value,a))], ReadSchema: struct<value:string> ``` Also, we don't push down partition filter for parquet in DSv2. #30652 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test suites Closes #33680 from huaxingao/orc_filter. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? not push down partition filter to `ORCScan` for DSv2 ### Why are the changes needed? Seems to me that partition filter is only used for partition pruning and shouldn't be pushed down to `ORCScan`. We don't push down partition filter to ORCScan in DSv1 ``` == Physical Plan == *(1) Filter (isnotnull(value#19) AND NOT (value#19 = a)) +- *(1) ColumnarToRow +- FileScan orc [value#19,p1#20,p2#21] Batched: true, DataFilters: [isnotnull(value#19), NOT (value#19 = a)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/pt/_5f4sxy56x70dv9zpz032f0m0000gn/T/spark-c1..., PartitionFilters: [isnotnull(p1#20), isnotnull(p2#21), (p1#20 = 1), (p2#21 = 2)], PushedFilters: [IsNotNull(value), Not(EqualTo(value,a))], ReadSchema: struct<value:string> ``` Also, we don't push down partition filter for parquet in DSv2. #30652 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test suites Closes #33680 from huaxingao/orc_filter. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit b04330c) Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
As described in SPARK-33673, some test suites in
ParquetV2SchemaPruningSuitewill failed when setparquet.versionto 1.11.1 because Parquet will return empty results for non-existent column since PARQUET-1765.This pr change to use
readDataSchema()instead ofschemato buildpushedParquetFiltersinParquetScanBuilderto avoid push down partition filters toParquetScanforDataSourceV2Why are the changes needed?
Prepare for upgrade using Parquet 1.11.1.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Pass the Jenkins or GitHub Action
Manual test as follows:
Before
After