Skip to content

Commit b54a94b

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-43376][SQL] Improve reuse subquery with table cache
### What changes were proposed in this pull request? AQE can not reuse subquery if it is pushed into `InMemoryTableScan`. There are two issues: - `ReuseAdaptiveSubquery` can not support reuse subquery if two subquery have the same exprId - `InMemoryTableScan` miss apply `ReuseAdaptiveSubquery` when wrap `TableCacheQueryStageExec` For example: ``` Seq(1).toDF("c1").cache().createOrReplaceTempView("t1") Seq(2).toDF("c2").createOrReplaceTempView("t2") spark.sql("SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)") ``` There are two `subquery#27` but have no `ReusedSubquery` ``` AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(1) Filter (c1#14 < Subquery subquery#27, [id=#20]) : +- Subquery subquery#27, [id=#20] : +- AdaptiveSparkPlan isFinalPlan=true : +- LocalTableScan [c2#25] +- TableCacheQueryStage 0 +- InMemoryTableScan [c1#14], [(c1#14 < Subquery subquery#27, [id=#20])] :- InMemoryRelation [c1#14], StorageLevel(disk, memory, deserialized, 1 replicas) : +- LocalTableScan [c1#14] +- Subquery subquery#27, [id=#20] +- AdaptiveSparkPlan isFinalPlan=true +- LocalTableScan [c2#25] ``` ### Why are the changes needed? Improve the coverage of reuse subquery. Note that, it is not a real perf issue because the subquery has been already reused (the same Java object). This pr just makes the plan clearer about subquery reuse. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test Closes #41046 from ulysses-you/aqe-subquery. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 1641118 commit b54a94b

6 files changed

Lines changed: 64 additions & 42 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -587,8 +587,10 @@ case class AdaptiveSparkPlanExec(
587587
BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
588588
}
589589
case i: InMemoryTableScanExec =>
590-
// No need to optimize `InMemoryTableScanExec` as it's a leaf node.
591-
TableCacheQueryStageExec(currentStageId, i)
590+
// Apply `queryStageOptimizerRules` so that we can reuse subquery.
591+
// No need to apply `postStageCreationRules` for `InMemoryTableScanExec`
592+
// as it's a leaf node.
593+
TableCacheQueryStageExec(currentStageId, optimizeQueryStage(i, isFinalStage = false))
592594
}
593595
currentStageId += 1
594596
setLogicalLinkForNewQueryStage(queryStage, plan)

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ case class InsertAdaptiveSparkPlan(
125125
/**
126126
* Returns an expression-id-to-execution-plan map for all the sub-queries.
127127
* For each sub-query, generate the adaptive execution plan for each sub-query by applying this
128-
* rule, or reuse the execution plan from another sub-query of the same semantics if possible.
128+
* rule.
129129
*/
130130
private def buildSubqueryMap(plan: SparkPlan): Map[Long, BaseSubqueryExec] = {
131131
val subqueryMap = mutable.HashMap.empty[Long, BaseSubqueryExec]

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,16 @@ case class ReuseAdaptiveSubquery(
3333

3434
plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
3535
case sub: ExecSubqueryExpression =>
36-
val newPlan = reuseMap.getOrElseUpdate(sub.plan.canonicalized, sub.plan)
37-
if (newPlan.ne(sub.plan)) {
38-
sub.withNewPlan(ReusedSubqueryExec(newPlan))
39-
} else {
40-
sub
36+
// The subquery can be already reused (the same Java object) due to filter pushdown
37+
// of table cache. If it happens, we just need to wrap the current subquery with
38+
// `ReusedSubqueryExec` and no need to update the `reuseMap`.
39+
reuseMap.get(sub.plan.canonicalized).map { subquery =>
40+
sub.withNewPlan(ReusedSubqueryExec(subquery))
41+
}.getOrElse {
42+
reuseMap.putIfAbsent(sub.plan.canonicalized, sub.plan) match {
43+
case Some(subquery) => sub.withNewPlan(ReusedSubqueryExec(subquery))
44+
case None => sub
45+
}
4146
}
4247
}
4348
}

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
3636
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
3737
import org.apache.spark.sql.catalyst.util.DateTimeConstants
3838
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan}
39-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
39+
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation}
4040
import org.apache.spark.sql.execution.columnar._
4141
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
4242
import org.apache.spark.sql.functions._
@@ -823,21 +823,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
823823

824824
test("SPARK-19993 subquery with cached underlying relation") {
825825
withTempView("t1") {
826-
Seq(1).toDF("c1").createOrReplaceTempView("t1")
827-
spark.catalog.cacheTable("t1")
828-
829-
// underlying table t1 is cached as well as the query that refers to it.
830-
val sqlText =
831-
"""
832-
|SELECT * FROM t1
833-
|WHERE
834-
|NOT EXISTS (SELECT * FROM t1)
835-
""".stripMargin
836-
val ds = sql(sqlText)
837-
assert(getNumInMemoryRelations(ds) == 2)
838-
839-
val cachedDs = sql(sqlText).cache()
840-
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.sparkPlan) == 3)
826+
Seq(false, true).foreach { enabled =>
827+
withSQLConf(
828+
SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> enabled.toString,
829+
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key ->
830+
AQEPropagateEmptyRelation.ruleName) {
831+
832+
Seq(1).toDF("c1").createOrReplaceTempView("t1")
833+
spark.catalog.cacheTable("t1")
834+
835+
// underlying table t1 is cached as well as the query that refers to it.
836+
val sqlText =
837+
"""
838+
|SELECT * FROM t1
839+
|WHERE
840+
|NOT EXISTS (SELECT * FROM t1)
841+
""".stripMargin
842+
val ds = sql(sqlText)
843+
assert(getNumInMemoryRelations(ds) == 2)
844+
845+
val cachedDs = sql(sqlText).cache()
846+
cachedDs.collect()
847+
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.executedPlan) == 3)
848+
849+
cachedDs.unpersist()
850+
spark.catalog.uncacheTable("t1")
851+
}
852+
}
841853
}
842854
}
843855

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2337,15 +2337,9 @@ class SubquerySuite extends QueryTest
23372337
case rs: ReusedSubqueryExec => rs.child.id
23382338
}
23392339

2340-
if (enableAQE) {
2341-
assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in the plan")
2342-
assert(reusedSubqueryIds.size == 4,
2343-
"Missing or unexpected reused ReusedSubqueryExec in the plan")
2344-
} else {
2345-
assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan")
2346-
assert(reusedSubqueryIds.size == 5,
2347-
"Missing or unexpected reused ReusedSubqueryExec in the plan")
2348-
}
2340+
assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan")
2341+
assert(reusedSubqueryIds.size == 5,
2342+
"Missing or unexpected reused ReusedSubqueryExec in the plan")
23492343
}
23502344
}
23512345
}
@@ -2413,15 +2407,9 @@ class SubquerySuite extends QueryTest
24132407
case rs: ReusedSubqueryExec => rs.child.id
24142408
}
24152409

2416-
if (enableAQE) {
2417-
assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in the plan")
2418-
assert(reusedSubqueryIds.size == 3,
2419-
"Missing or unexpected reused ReusedSubqueryExec in the plan")
2420-
} else {
2421-
assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan")
2422-
assert(reusedSubqueryIds.size == 4,
2423-
"Missing or unexpected reused ReusedSubqueryExec in the plan")
2424-
}
2410+
assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan")
2411+
assert(reusedSubqueryIds.size == 4,
2412+
"Missing or unexpected reused ReusedSubqueryExec in the plan")
24252413
}
24262414
}
24272415
}

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2826,6 +2826,21 @@ class AdaptiveQueryExecSuite
28262826
.executedPlan.isInstanceOf[LocalTableScanExec])
28272827
}
28282828
}
2829+
2830+
test("SPARK-43376: Improve reuse subquery with table cache") {
2831+
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
2832+
withTable("t1", "t2") {
2833+
withCache("t1") {
2834+
Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
2835+
Seq(2).toDF("c2").createOrReplaceTempView("t2")
2836+
2837+
val (_, adaptive) = runAdaptiveAndVerifyResult(
2838+
"SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")
2839+
assert(findReusedSubquery(adaptive).size == 1)
2840+
}
2841+
}
2842+
}
2843+
}
28292844
}
28302845

28312846
/**

0 commit comments

Comments
 (0)