Skip to content

Commit 269c75b

Browse files
committed
[SPARK-40193][SQL] Merge subquery plans with different filters
1 parent f53cddd commit 269c75b

7 files changed

Lines changed: 671 additions & 81 deletions

File tree

core/src/main/scala/org/apache/spark/util/collection/BitSet.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,4 +250,11 @@ class BitSet(numBits: Int) extends Serializable {
250250

251251
/** Return the number of longs it would take to hold numBits. */
252252
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1
253+
254+
override def equals(other: Any): Boolean = other match {
255+
case otherSet: BitSet => numWords == otherSet.numWords && Arrays.equals(words, otherSet.words)
256+
case _ => false
257+
}
258+
259+
override def hashCode(): Int = Arrays.hashCode(words)
253260
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3875,6 +3875,15 @@ object SQLConf {
38753875
.booleanConf
38763876
.createWithDefault(false)
38773877

3878+
val PLAN_MERGE_IGNORE_PUSHED_PUSHED_DATA_FILTERS =
3879+
buildConf("spark.sql.planMerge.ignorePushedDataFilters")
3880+
.internal()
3881+
.doc(s"When set to true plan merging is enabled even if physical scan operations have " +
3882+
"different data filters pushed down.")
3883+
.version("3.4.0")
3884+
.booleanConf
3885+
.createWithDefault(true)
3886+
38783887
val ERROR_MESSAGE_FORMAT = buildConf("spark.sql.error.messageFormat")
38793888
.doc("When PRETTY, the error message consists of textual representation of error class, " +
38803889
"message and query context. The MINIMAL and STANDARD formats are pretty JSON formats where " +

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ class SparkOptimizer(
5454
PartitionPruning) :+
5555
Batch("InjectRuntimeFilter", FixedPoint(1),
5656
InjectRuntimeFilter) :+
57-
Batch("MergeScalarSubqueries", Once,
58-
MergeScalarSubqueries) :+
5957
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
6058
PushDownPredicates) :+
6159
Batch("Cleanup filters that cannot be pushed down", Once,
@@ -79,6 +77,9 @@ class SparkOptimizer(
7977
PushPredicateThroughNonJoin,
8078
RemoveNoopOperators) :+
8179
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) :+
80+
Batch("Merge Scalar Subqueries", Once,
81+
MergeScalarSubqueries,
82+
RewriteDistinctAggregates) :+
8283
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)
8384

8485
override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,14 @@ import org.apache.spark.util.collection.BitSet
5454
* is under the threshold with the addition of the next file, add it. If not, open a new bucket
5555
* and add it. Proceed to the next file.
5656
*/
57-
object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
57+
object FileSourceStrategy extends Strategy {
58+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
59+
case FileSourceScanPlan(scanPlan, _) => scanPlan :: Nil
60+
case _ => Nil
61+
}
62+
}
5863

64+
object FileSourceScanPlan extends PredicateHelper with Logging {
5965
// should prune buckets iff num buckets is greater than 1 and there is only one bucket column
6066
private def shouldPruneBuckets(bucketSpec: Option[BucketSpec]): Boolean = {
6167
bucketSpec match {
@@ -145,7 +151,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
145151
}
146152
}
147153

148-
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
154+
def unapply(plan: LogicalPlan): Option[(SparkPlan, FileSourceScanExec)] = plan match {
149155
case PhysicalOperation(projects, filters,
150156
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
151157
// Filters on this relation fall into four categories based on where we can use them to avoid
@@ -291,8 +297,8 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
291297
execution.ProjectExec(projects, withFilter)
292298
}
293299

294-
withProjections :: Nil
300+
Some(withProjections, scan)
295301

296-
case _ => Nil
302+
case _ => None
297303
}
298304
}

0 commit comments

Comments
 (0)