Skip to content

Commit 0664ac5

Browse files
ulysses-youbeliefer
authored andcommitted
[SPARK-41048][SQL] Improve output partitioning and ordering with AQE cache
### What changes were proposed in this pull request? Try our best to give a stable output partitioning and ordering if current executed plan is final plan in `InMemoryTableScanExec`. Make AdaptiveSparkPlanExec expose the isFinal flag ### Why are the changes needed? The cached plan in InMemoryRelation can be AdaptiveSparkPlanExec, however AdaptiveSparkPlanExec deos not specify its output partitioning and ordering. It causes unnecessary shuffle and local sort for downstream action. ``` ... | AdaptiveSparkPlanExec | InMemoryTableScanExec | ... ``` after this pr, the `InMemoryTableScanExec` can preverse the output partitioning and ordering. ### Does this PR introduce _any_ user-facing change? no, only improve performance ### How was this patch tested? add test Closes apache#38558 from ulysses-you/aqe-cache. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 500a6f1 commit 0664ac5

3 files changed

Lines changed: 34 additions & 7 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ case class AdaptiveSparkPlanExec(
188188

189189
@volatile private var currentPhysicalPlan = initialPlan
190190

191-
private var isFinalPlan = false
191+
@volatile private var _isFinalPlan = false
192192

193193
private var currentStageId = 0
194194

@@ -205,6 +205,8 @@ case class AdaptiveSparkPlanExec(
205205

206206
def executedPlan: SparkPlan = currentPhysicalPlan
207207

208+
def isFinalPlan: Boolean = _isFinalPlan
209+
208210
override def conf: SQLConf = context.session.sessionState.conf
209211

210212
override def output: Seq[Attribute] = inputPlan.output
@@ -329,7 +331,7 @@ case class AdaptiveSparkPlanExec(
329331
optimizeQueryStage(result.newPlan, isFinalStage = true),
330332
postStageCreationRules(supportsColumnar),
331333
Some((planChangeLogger, "AQE Post Stage Creation")))
332-
isFinalPlan = true
334+
_isFinalPlan = true
333335
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
334336
currentPhysicalPlan
335337
}

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
2424
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
2525
import org.apache.spark.sql.columnar.CachedBatch
2626
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, WholeStageCodegenExec}
27+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
2728
import org.apache.spark.sql.execution.metric.SQLMetrics
2829
import org.apache.spark.sql.vectorized.ColumnarBatch
2930

@@ -111,10 +112,15 @@ case class InMemoryTableScanExec(
111112

112113
override def output: Seq[Attribute] = attributes
113114

115+
private def cachedPlan = relation.cachedPlan match {
116+
case adaptive: AdaptiveSparkPlanExec if adaptive.isFinalPlan => adaptive.executedPlan
117+
case other => other
118+
}
119+
114120
private def updateAttribute(expr: Expression): Expression = {
115121
// attributes can be pruned so using relation's output.
116122
// E.g., relation.output is [id, item] but this scan's output can be [item] only.
117-
val attrMap = AttributeMap(relation.cachedPlan.output.zip(relation.output))
123+
val attrMap = AttributeMap(cachedPlan.output.zip(relation.output))
118124
expr.transform {
119125
case attr: Attribute => attrMap.getOrElse(attr, attr)
120126
}
@@ -123,7 +129,7 @@ case class InMemoryTableScanExec(
123129
// The cached version does not change the outputPartitioning of the original SparkPlan.
124130
// But the cached version could alias output, so we need to replace output.
125131
override def outputPartitioning: Partitioning = {
126-
relation.cachedPlan.outputPartitioning match {
132+
cachedPlan.outputPartitioning match {
127133
case e: Expression => updateAttribute(e).asInstanceOf[Partitioning]
128134
case other => other
129135
}
@@ -132,7 +138,7 @@ case class InMemoryTableScanExec(
132138
// The cached version does not change the outputOrdering of the original SparkPlan.
133139
// But the cached version could alias output, so we need to replace output.
134140
override def outputOrdering: Seq[SortOrder] =
135-
relation.cachedPlan.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
141+
cachedPlan.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
136142

137143
lazy val enableAccumulatorsForTest: Boolean = conf.inMemoryTableScanStatisticsEnabled
138144

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
4040
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, LocalRelation, LogicalPlan, OneRowRelation, Statistics}
4141
import org.apache.spark.sql.catalyst.util.DateTimeUtils
4242
import org.apache.spark.sql.connector.FakeV2Provider
43-
import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, WholeStageCodegenExec}
43+
import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, SortExec, WholeStageCodegenExec}
4444
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
4545
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
46-
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
46+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike}
4747
import org.apache.spark.sql.expressions.{Aggregator, Window}
4848
import org.apache.spark.sql.functions._
4949
import org.apache.spark.sql.internal.SQLConf
@@ -3513,6 +3513,25 @@ class DataFrameSuite extends QueryTest
35133513
assert(df.queryExecution.executedPlan.execute().getNumPartitions == 2)
35143514
}
35153515
}
3516+
3517+
test("SPARK-41048: Improve output partitioning and ordering with AQE cache") {
3518+
withSQLConf(
3519+
SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
3520+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
3521+
val df1 = spark.range(10).selectExpr("cast(id as string) c1")
3522+
val df2 = spark.range(10).selectExpr("cast(id as string) c2")
3523+
val cached = df1.join(df2, $"c1" === $"c2").cache()
3524+
cached.count()
3525+
val executedPlan = cached.groupBy("c1").agg(max($"c2")).queryExecution.executedPlan
3526+
// before is 2 sort and 1 shuffle
3527+
assert(collect(executedPlan) {
3528+
case s: ShuffleExchangeLike => s
3529+
}.isEmpty)
3530+
assert(collect(executedPlan) {
3531+
case s: SortExec => s
3532+
}.isEmpty)
3533+
}
3534+
}
35163535
}
35173536

35183537
case class GroupByKey(a: Int, b: Int)

0 commit comments

Comments
 (0)