Skip to content

Commit 8653763

Browse files
huaxingaochenzhx
authored andcommitted
[SPARK-36351][SQL] Refactor filter push down in file source v2
### What changes were proposed in this pull request? Currently in `V2ScanRelationPushDown`, we push the filters (partition filters + data filters) to file source, and then pass all the filters (partition filters + data filters) as post scan filters to v2 Scan, and later in `PruneFileSourcePartitions`, we separate partition filters and data filters, set them in the format of `Expression` to file source. Changes in this PR: When we push filters to file sources in `V2ScanRelationPushDown`, since we already have the information about partition column , we want to separate partition filter and data filter there. The benefit of doing this: - we can handle all the filter related work for v2 file source at one place instead of two (`V2ScanRelationPushDown` and `PruneFileSourcePartitions`), so the code will be cleaner and easier to maintain. - we actually have to separate partition filters and data filters at `V2ScanRelationPushDown`, otherwise, there is no way to find out which filters are partition filters, and we can't push down aggregate for parquet even if we only have partition filter. - By separating the filters early at `V2ScanRelationPushDown`, we only needs to check data filters to find out which one needs to be converted to data source filters (e.g. Parquet predicates, ORC predicates) and pushed down to file source, right now we are checking all the filters (both partition filters and data filters) - Similarly, we can only pass data filters as post scan filters to v2 Scan, because partition filters are used for partition pruning only, no need to pass them as post scan filters. In order to do this, we will have the following changes - add `pushFilters` in file source v2. In this method: - push both Expression partition filter and Expression data filter to file source. Have to use Expression filters because we need these for partition pruning. - data filters are used for filter push down. If file source needs to push down data filters, it translates the data filters from `Expression` to `Sources.Filer`, and then decides which filters to push down. - partition filters are used for partition pruning. - file source v2 no need to implement `SupportsPushdownFilters` any more, because when we separating the two types of filters, we have already set them on file data sources. It's redundant to use `SupportsPushdownFilters` to set the filters again on file data sources. ### Why are the changes needed? see section one ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes apache#33650 from huaxingao/partition_filter. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent e6949ae commit 8653763

20 files changed

Lines changed: 176 additions & 144 deletions

File tree

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ case class AvroScan(
6262
pushedFilters)
6363
}
6464

65-
override def withFilters(
66-
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan =
67-
this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
68-
6965
override def equals(obj: Any): Boolean = obj match {
7066
case a: AvroScan => super.equals(a) && dataSchema == a.dataSchema && options == a.options &&
7167
equivalentFilters(pushedFilters, a.pushedFilters)

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.sql.v2.avro
1818

1919
import org.apache.spark.sql.SparkSession
2020
import org.apache.spark.sql.catalyst.StructFilters
21-
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
21+
import org.apache.spark.sql.connector.read.Scan
2222
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2323
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
2424
import org.apache.spark.sql.sources.Filter
@@ -31,7 +31,7 @@ class AvroScanBuilder (
3131
schema: StructType,
3232
dataSchema: StructType,
3333
options: CaseInsensitiveStringMap)
34-
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters {
34+
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
3535

3636
override def build(): Scan = {
3737
AvroScan(
@@ -41,17 +41,16 @@ class AvroScanBuilder (
4141
readDataSchema(),
4242
readPartitionSchema(),
4343
options,
44-
pushedFilters())
44+
pushedDataFilters,
45+
partitionFilters,
46+
dataFilters)
4547
}
4648

47-
private var _pushedFilters: Array[Filter] = Array.empty
48-
49-
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
49+
override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = {
5050
if (sparkSession.sessionState.conf.avroFilterPushDown) {
51-
_pushedFilters = StructFilters.pushedFilters(filters, dataSchema)
51+
StructFilters.pushedFilters(dataFilters, dataSchema)
52+
} else {
53+
Array.empty[Filter]
5254
}
53-
filters
5455
}
55-
56-
override def pushedFilters(): Array[Filter] = _pushedFilters
5756
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.internal.connector
18+
19+
import org.apache.spark.sql.catalyst.expressions.Expression
20+
import org.apache.spark.sql.sources.Filter
21+
22+
/**
23+
* A mix-in interface for {@link FileScanBuilder}. File sources can implement this interface to
24+
* push down filters to the file source. The pushed down filters will be separated into partition
25+
* filters and data filters. Partition filters are used for partition pruning and data filters are
26+
* used to reduce the size of the data to be read.
27+
*/
28+
trait SupportsPushDownCatalystFilters {
29+
30+
/**
31+
* Pushes down catalyst Expression filters (which will be separated into partition filters and
32+
* data filters), and returns data filters that need to be evaluated after scanning.
33+
*/
34+
def pushFilters(filters: Seq[Expression]): Seq[Expression]
35+
36+
/**
37+
* Returns the data filters that are pushed to the data source via
38+
* {@link #pushFilters(Expression[])}.
39+
*/
40+
def pushedFilters: Array[Filter]
41+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.json4s.jackson.Serialization
2828
import org.apache.spark.SparkUpgradeException
2929
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_LEGACY_INT96, SPARK_VERSION_METADATA_KEY}
3030
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
31+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper}
3132
import org.apache.spark.sql.catalyst.util.RebaseDateTime
3233
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3334
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
@@ -39,7 +40,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3940
import org.apache.spark.util.Utils
4041

4142

42-
object DataSourceUtils {
43+
object DataSourceUtils extends PredicateHelper {
4344
/**
4445
* The key to use for storing partitionBy columns as options.
4546
*/
@@ -242,4 +243,22 @@ object DataSourceUtils {
242243
options
243244
}
244245
}
246+
247+
def getPartitionFiltersAndDataFilters(
248+
partitionSchema: StructType,
249+
normalizedFilters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
250+
val partitionColumns = normalizedFilters.flatMap { expr =>
251+
expr.collect {
252+
case attr: AttributeReference if partitionSchema.names.contains(attr.name) =>
253+
attr
254+
}
255+
}
256+
val partitionSet = AttributeSet(partitionColumns)
257+
val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
258+
f.references.subsetOf(partitionSet)
259+
)
260+
val extraPartitionFilter =
261+
dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))
262+
(ExpressionSet(partitionFilters ++ extraPartitionFilter).toSeq, dataFilters)
263+
}
245264
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala

Lines changed: 6 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,24 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import org.apache.spark.sql.SparkSession
2120
import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
2221
import org.apache.spark.sql.catalyst.expressions._
2322
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2423
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
2524
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
2625
import org.apache.spark.sql.catalyst.rules.Rule
27-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan}
28-
import org.apache.spark.sql.types.StructType
2926

3027
/**
3128
* Prune the partitions of file source based table using partition filters. Currently, this rule
32-
* is applied to [[HadoopFsRelation]] with [[CatalogFileIndex]] and [[DataSourceV2ScanRelation]]
33-
* with [[FileScan]].
29+
* is applied to [[HadoopFsRelation]] with [[CatalogFileIndex]].
3430
*
3531
* For [[HadoopFsRelation]], the location will be replaced by pruned file index, and corresponding
3632
* statistics will be updated. And the partition filters will be kept in the filters of returned
3733
* logical plan.
38-
*
39-
* For [[DataSourceV2ScanRelation]], both partition filters and data filters will be added to
40-
* its underlying [[FileScan]]. And the partition filters will be removed in the filters of
41-
* returned logical plan.
4234
*/
4335
private[sql] object PruneFileSourcePartitions
4436
extends Rule[LogicalPlan] with PredicateHelper {
4537

46-
private def getPartitionKeyFiltersAndDataFilters(
47-
sparkSession: SparkSession,
48-
relation: LeafNode,
49-
partitionSchema: StructType,
50-
filters: Seq[Expression],
51-
output: Seq[AttributeReference]): (ExpressionSet, Seq[Expression]) = {
52-
val normalizedFilters = DataSourceStrategy.normalizeExprs(
53-
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), output)
54-
val partitionColumns =
55-
relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver)
56-
val partitionSet = AttributeSet(partitionColumns)
57-
val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
58-
f.references.subsetOf(partitionSet)
59-
)
60-
val extraPartitionFilter =
61-
dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))
62-
63-
(ExpressionSet(partitionFilters ++ extraPartitionFilter), dataFilters)
64-
}
65-
6638
private def rebuildPhysicalOperation(
6739
projects: Seq[NamedExpression],
6840
filters: Seq[Expression],
@@ -91,12 +63,14 @@ private[sql] object PruneFileSourcePartitions
9163
_,
9264
_))
9365
if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
94-
val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters(
95-
fsRelation.sparkSession, logicalRelation, partitionSchema, filters,
66+
val normalizedFilters = DataSourceStrategy.normalizeExprs(
67+
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)),
9668
logicalRelation.output)
69+
val (partitionKeyFilters, _) = DataSourceUtils
70+
.getPartitionFiltersAndDataFilters(partitionSchema, normalizedFilters)
9771

9872
if (partitionKeyFilters.nonEmpty) {
99-
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
73+
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters)
10074
val prunedFsRelation =
10175
fsRelation.copy(location = prunedFileIndex)(fsRelation.sparkSession)
10276
// Change table stats based on the sizeInBytes of pruned files
@@ -117,23 +91,5 @@ private[sql] object PruneFileSourcePartitions
11791
} else {
11892
op
11993
}
120-
121-
case op @ PhysicalOperation(projects, filters,
122-
v2Relation @ DataSourceV2ScanRelation(_, scan: FileScan, output))
123-
if filters.nonEmpty =>
124-
val (partitionKeyFilters, dataFilters) =
125-
getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation,
126-
scan.readPartitionSchema, filters, output)
127-
// The dataFilters are pushed down only once
128-
if (partitionKeyFilters.nonEmpty || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) {
129-
val prunedV2Relation =
130-
v2Relation.copy(scan = scan.withFilters(partitionKeyFilters.toSeq, dataFilters))
131-
// The pushed down partition filters don't need to be reevaluated.
132-
val afterScanFilters =
133-
ExpressionSet(filters) -- partitionKeyFilters.filter(_.references.nonEmpty)
134-
rebuildPhysicalOperation(projects, afterScanFilters.toSeq, prunedV2Relation)
135-
} else {
136-
op
137-
}
13894
}
13995
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,6 @@ trait FileScan extends Scan
6969
*/
7070
def dataFilters: Seq[Expression]
7171

72-
/**
73-
* Create a new `FileScan` instance from the current one
74-
* with different `partitionFilters` and `dataFilters`
75-
*/
76-
def withFilters(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan
77-
7872
/**
7973
* If a file with `path` is unsplittable, return the unsplittable reason,
8074
* otherwise return `None`.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,30 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2
1818

19-
import org.apache.spark.sql.SparkSession
19+
import scala.collection.mutable
20+
21+
import org.apache.spark.sql.{sources, SparkSession}
22+
import org.apache.spark.sql.catalyst.expressions.Expression
2023
import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownRequiredColumns}
21-
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitioningUtils}
24+
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, DataSourceUtils, PartitioningAwareFileIndex, PartitioningUtils}
25+
import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
26+
import org.apache.spark.sql.sources.Filter
2227
import org.apache.spark.sql.types.StructType
2328

2429
abstract class FileScanBuilder(
2530
sparkSession: SparkSession,
2631
fileIndex: PartitioningAwareFileIndex,
27-
dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns {
32+
dataSchema: StructType)
33+
extends ScanBuilder
34+
with SupportsPushDownRequiredColumns
35+
with SupportsPushDownCatalystFilters {
2836
private val partitionSchema = fileIndex.partitionSchema
2937
private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
3038
protected val supportsNestedSchemaPruning = false
3139
protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields)
40+
protected var partitionFilters = Seq.empty[Expression]
41+
protected var dataFilters = Seq.empty[Expression]
42+
protected var pushedDataFilters = Array.empty[Filter]
3243

3344
override def pruneColumns(requiredSchema: StructType): Unit = {
3445
// [SPARK-30107] While `requiredSchema` might have pruned nested columns,
@@ -48,7 +59,7 @@ abstract class FileScanBuilder(
4859
StructType(fields)
4960
}
5061

51-
protected def readPartitionSchema(): StructType = {
62+
def readPartitionSchema(): StructType = {
5263
val requiredNameSet = createRequiredNameSet()
5364
val fields = partitionSchema.fields.filter { field =>
5465
val colName = PartitioningUtils.getColName(field, isCaseSensitive)
@@ -57,6 +68,31 @@ abstract class FileScanBuilder(
5768
StructType(fields)
5869
}
5970

71+
override def pushFilters(filters: Seq[Expression]): Seq[Expression] = {
72+
val (partitionFilters, dataFilters) =
73+
DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, filters)
74+
this.partitionFilters = partitionFilters
75+
this.dataFilters = dataFilters
76+
val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
77+
for (filterExpr <- dataFilters) {
78+
val translated = DataSourceStrategy.translateFilter(filterExpr, true)
79+
if (translated.nonEmpty) {
80+
translatedFilters += translated.get
81+
}
82+
}
83+
pushedDataFilters = pushDataFilters(translatedFilters.toArray)
84+
dataFilters
85+
}
86+
87+
override def pushedFilters: Array[Filter] = pushedDataFilters
88+
89+
/*
90+
* Push down data filters to the file source, so the data filters can be evaluated there to
91+
* reduce the size of the data to be read. By default, data filters are not pushed down.
92+
* File source needs to implement this method to push down data filters.
93+
*/
94+
protected def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = Array.empty[Filter]
95+
6096
private def createRequiredNameSet(): Set[String] =
6197
requiredSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet
6298

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ object PushDownUtils extends PredicateHelper {
9797
}
9898
(Right(r.pushedFilters), (untranslatableExprs ++ postScanFilters).toSeq)
9999

100+
case f: FileScanBuilder =>
101+
val postScanFilters = f.pushFilters(filters)
102+
(Left(f.pushedFilters), postScanFilters)
103+
100104
case _ => (Left(Nil), filters)
101105
}
102106
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory
2727
import org.apache.spark.sql.errors.QueryCompilationErrors
2828
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2929
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
30-
import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan}
30+
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
3131
import org.apache.spark.sql.sources.Filter
3232
import org.apache.spark.sql.types.StructType
3333
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -84,10 +84,6 @@ case class CSVScan(
8484
dataSchema, readDataSchema, readPartitionSchema, parsedOptions, pushedFilters)
8585
}
8686

87-
override def withFilters(
88-
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan =
89-
this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
90-
9187
override def equals(obj: Any): Boolean = obj match {
9288
case c: CSVScan => super.equals(c) && dataSchema == c.dataSchema && options == c.options &&
9389
equivalentFilters(pushedFilters, c.pushedFilters)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.csv
1919

2020
import org.apache.spark.sql.SparkSession
2121
import org.apache.spark.sql.catalyst.StructFilters
22-
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
22+
import org.apache.spark.sql.connector.read.Scan
2323
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2424
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
2525
import org.apache.spark.sql.sources.Filter
@@ -32,7 +32,7 @@ case class CSVScanBuilder(
3232
schema: StructType,
3333
dataSchema: StructType,
3434
options: CaseInsensitiveStringMap)
35-
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters {
35+
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
3636

3737
override def build(): Scan = {
3838
CSVScan(
@@ -42,17 +42,16 @@ case class CSVScanBuilder(
4242
readDataSchema(),
4343
readPartitionSchema(),
4444
options,
45-
pushedFilters())
45+
pushedDataFilters,
46+
partitionFilters,
47+
dataFilters)
4648
}
4749

48-
private var _pushedFilters: Array[Filter] = Array.empty
49-
50-
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
50+
override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = {
5151
if (sparkSession.sessionState.conf.csvFilterPushDown) {
52-
_pushedFilters = StructFilters.pushedFilters(filters, dataSchema)
52+
StructFilters.pushedFilters(dataFilters, dataSchema)
53+
} else {
54+
Array.empty[Filter]
5355
}
54-
filters
5556
}
56-
57-
override def pushedFilters(): Array[Filter] = _pushedFilters
5857
}

0 commit comments

Comments
 (0)