Skip to content

Commit 865a3de

Browse files
committed
[SPARK-38959][SQL][FOLLOWUP] Optimizer batch PartitionPruning should optimize subqueries
### What changes were proposed in this pull request? This is a followup to #36304 to simplify `RowLevelOperationRuntimeGroupFiltering`. It does 3 things: 1. run `OptimizeSubqueries` in the batch `PartitionPruning`, so that `RowLevelOperationRuntimeGroupFiltering` does not need to invoke it manually. 2. skip dpp subquery in `OptimizeSubqueries`, to avoid the issue fixed by #33664 3. `RowLevelOperationRuntimeGroupFiltering` creates `InSubquery` instead of `DynamicPruningSubquery`, so that it can be optimized by `OptimizeSubqueries` later. This also avoids unnecessary planning overhead of `DynamicPruningSubquery`, as there is no join and we can only run it as a subquery. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #38557 from cloud-fan/help. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent db84869 commit 865a3de

3 files changed

Lines changed: 12 additions & 12 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
320320
}
321321
def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning(
322322
_.containsPattern(PLAN_EXPRESSION), ruleId) {
323+
// Do not optimize DPP subquery, as it was created from optimized plan and we should not
324+
// optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
325+
case d: DynamicPruningSubquery => d
323326
case s: SubqueryExpression =>
324327
val Subquery(newPlan, _) = Optimizer.this.execute(Subquery.fromExpression(s))
325328
// At this point we have an optimized subquery plan that we are going to attach

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class SparkOptimizer(
5151
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
5252
Batch("PartitionPruning", Once,
5353
PartitionPruning,
54-
RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+
54+
RowLevelOperationRuntimeGroupFiltering,
55+
OptimizeSubqueries) :+
5556
Batch("InjectRuntimeFilter", FixedPoint(1),
5657
InjectRuntimeFilter) :+
5758
Batch("MergeScalarSubqueries", Once,

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package org.apache.spark.sql.execution.dynamicpruning
1919

20-
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
20+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, InSubquery, ListQuery, PredicateHelper, V2ExpressionUtils}
2121
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
2222
import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
23-
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
23+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan}
2424
import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
2626
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
@@ -37,8 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat
3737
*
3838
* Note this rule only applies to group-based row-level operations.
3939
*/
40-
case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
41-
extends Rule[LogicalPlan] with PredicateHelper {
40+
object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper {
4241

4342
import DataSourceV2Implicits._
4443

@@ -65,8 +64,7 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic
6564
Filter(dynamicPruningCond, r)
6665
}
6766

68-
// optimize subqueries to rewrite them as joins and trigger job planning
69-
replaceData.copy(query = optimizeSubqueries(newQuery))
67+
replaceData.copy(query = newQuery)
7068
}
7169

7270
private def buildMatchingRowsPlan(
@@ -88,10 +86,8 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic
8886
buildKeys: Seq[Attribute],
8987
pruningKeys: Seq[Attribute]): Expression = {
9088

91-
val buildQuery = Project(buildKeys, matchingRowsPlan)
92-
val dynamicPruningSubqueries = pruningKeys.zipWithIndex.map { case (key, index) =>
93-
DynamicPruningSubquery(key, buildQuery, buildKeys, index, onlyInBroadcast = false)
94-
}
95-
dynamicPruningSubqueries.reduce(And)
89+
val buildQuery = Aggregate(buildKeys, buildKeys, matchingRowsPlan)
90+
DynamicPruningExpression(
91+
InSubquery(pruningKeys, ListQuery(buildQuery, childOutputs = buildQuery.output)))
9692
}
9793
}

0 commit comments

Comments
 (0)