Skip to content

Commit 7079481

Browse files
committed
Revert "[SPARK-28346][SQL] clone the query plan between analyzer, optimizer and planner"
This reverts commit e04f696.
1 parent c3b59c6 commit 7079481

7 files changed

Lines changed: 18 additions & 93 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
292292
mapChildren(_.transformDown(rule))
293293
} else {
294294
// If the transform function replaces this node with a new one, carry over the tags.
295-
afterRule.copyTagsFrom(this)
295+
afterRule.tags ++= this.tags
296296
afterRule.mapChildren(_.transformDown(rule))
297297
}
298298
}
@@ -316,7 +316,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
316316
}
317317
}
318318
// If the transform function replaces this node with a new one, carry over the tags.
319-
newNode.copyTagsFrom(this)
319+
newNode.tags ++= this.tags
320320
newNode
321321
}
322322

@@ -434,15 +434,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
434434
private def makeCopy(
435435
newArgs: Array[AnyRef],
436436
allowEmptyArgs: Boolean): BaseType = attachTree(this, "makeCopy") {
437-
val allCtors = getClass.getConstructors
438-
if (newArgs.isEmpty && allCtors.isEmpty) {
439-
// This is a singleton object which doesn't have any constructor. Just return `this` as we
440-
// can't copy it.
441-
return this
442-
}
443-
444437
// Skip no-arg constructors that are just there for kryo.
445-
val ctors = allCtors.filter(allowEmptyArgs || _.getParameterTypes.size != 0)
438+
val ctors = getClass.getConstructors.filter(allowEmptyArgs || _.getParameterTypes.size != 0)
446439
if (ctors.isEmpty) {
447440
sys.error(s"No valid constructor for $nodeName")
448441
}

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
2929
import org.apache.spark.sql.catalyst.plans.QueryPlan
3030
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
3131
import org.apache.spark.sql.catalyst.rules.Rule
32-
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
32+
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
3333
import org.apache.spark.sql.catalyst.util.truncatedString
3434
import org.apache.spark.sql.dynamicpruning.PlanDynamicPruningFilters
3535
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
@@ -62,38 +62,36 @@ class QueryExecution(
6262

6363
lazy val analyzed: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.ANALYSIS) {
6464
SparkSession.setActiveSession(sparkSession)
65-
// We can't clone `logical` here, which will reset the `_analyzed` flag.
6665
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
6766
}
6867

6968
lazy val withCachedData: LogicalPlan = {
7069
assertAnalyzed()
7170
assertSupported()
72-
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
73-
// optimizing and planning.
74-
sparkSession.sharedState.cacheManager.useCachedData(analyzed.clone())
71+
sparkSession.sharedState.cacheManager.useCachedData(analyzed)
7572
}
7673

7774
lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
78-
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
79-
// optimizing and planning.
80-
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
75+
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, tracker)
8176
}
8277

8378
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
8479
SparkSession.setActiveSession(sparkSession)
80+
// Runtime re-optimization requires a unique instance of every node in the logical plan.
81+
val logicalPlan = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
82+
optimizedPlan.clone()
83+
} else {
84+
optimizedPlan
85+
}
8586
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
8687
// but we will implement to choose the best plan.
87-
// Clone the logical plan here, in case the planner rules change the states of the logical plan.
88-
planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
88+
planner.plan(ReturnAnswer(logicalPlan)).next()
8989
}
9090

9191
// executedPlan should not be used to initialize any SparkPlan. It should be
9292
// only used for execution.
9393
lazy val executedPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
94-
// clone the plan to avoid sharing the plan instance between different stages like analyzing,
95-
// optimizing and planning.
96-
prepareForExecution(sparkPlan.clone())
94+
prepareForExecution(sparkPlan)
9795
}
9896

9997
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,6 @@ case class InMemoryRelation(
223223
statsOfPlanToCache).asInstanceOf[this.type]
224224
}
225225

226-
// override `clone` since the default implementation won't carry over mutable states.
227-
override def clone(): LogicalPlan = {
228-
val cloned = this.copy()
229-
cloned.statsOfPlanToCache = this.statsOfPlanToCache
230-
cloned
231-
}
232-
233226
override def simpleString(maxFields: Int): String =
234227
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
235228
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.{Row, SparkSession}
2222
import org.apache.spark.sql.catalyst.expressions.Attribute
23-
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan}
23+
import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData
2424
import org.apache.spark.sql.internal.SQLConf
2525
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
2626
import org.apache.spark.sql.types.{StringType, StructField, StructType}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,4 @@ case class SaveIntoDataSourceCommand(
5252
val redacted = SQLConf.get.redactOptions(options)
5353
s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}"
5454
}
55-
56-
// Override `clone` since the default implementation will turn `CaseInsensitiveMap` to a normal
57-
// map.
58-
override def clone(): LogicalPlan = {
59-
SaveIntoDataSourceCommand(query.clone(), dataSource, options, mode)
60-
}
6155
}

sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@ package org.apache.spark.sql.execution
1818

1919
import scala.io.Source
2020

21-
import org.apache.spark.sql.{AnalysisException, FastOperator}
22-
import org.apache.spark.sql.catalyst.plans.QueryPlan
21+
import org.apache.spark.sql.AnalysisException
2322
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
24-
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2523
import org.apache.spark.sql.internal.SQLConf
2624
import org.apache.spark.sql.test.SharedSparkSession
2725

@@ -139,56 +137,5 @@ class QueryExecutionSuite extends SharedSparkSession {
139137
(_: LogicalPlan) => throw new Error("error"))
140138
val error = intercept[Error](qe.toString)
141139
assert(error.getMessage.contains("error"))
142-
143-
spark.experimental.extraStrategies = Nil
144-
}
145-
146-
test("SPARK-28346: clone the query plan between different stages") {
147-
val tag1 = new TreeNodeTag[String]("a")
148-
val tag2 = new TreeNodeTag[String]("b")
149-
val tag3 = new TreeNodeTag[String]("c")
150-
151-
def assertNoTag(tag: TreeNodeTag[String], plans: QueryPlan[_]*): Unit = {
152-
plans.foreach { plan =>
153-
assert(plan.getTagValue(tag).isEmpty)
154-
}
155-
}
156-
157-
val df = spark.range(10)
158-
val analyzedPlan = df.queryExecution.analyzed
159-
val cachedPlan = df.queryExecution.withCachedData
160-
val optimizedPlan = df.queryExecution.optimizedPlan
161-
162-
analyzedPlan.setTagValue(tag1, "v")
163-
assertNoTag(tag1, cachedPlan, optimizedPlan)
164-
165-
cachedPlan.setTagValue(tag2, "v")
166-
assertNoTag(tag2, analyzedPlan, optimizedPlan)
167-
168-
optimizedPlan.setTagValue(tag3, "v")
169-
assertNoTag(tag3, analyzedPlan, cachedPlan)
170-
171-
val tag4 = new TreeNodeTag[String]("d")
172-
try {
173-
spark.experimental.extraStrategies = Seq(new SparkStrategy() {
174-
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
175-
plan.foreach {
176-
case r: org.apache.spark.sql.catalyst.plans.logical.Range =>
177-
r.setTagValue(tag4, "v")
178-
case _ =>
179-
}
180-
Seq(FastOperator(plan.output))
181-
}
182-
})
183-
// trigger planning
184-
df.queryExecution.sparkPlan
185-
assert(optimizedPlan.getTagValue(tag4).isEmpty)
186-
} finally {
187-
spark.experimental.extraStrategies = Nil
188-
}
189-
190-
val tag5 = new TreeNodeTag[String]("e")
191-
df.queryExecution.executedPlan.setTagValue(tag5, "v")
192-
assertNoTag(tag5, df.queryExecution.sparkPlan)
193140
}
194141
}

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ class PartitionBatchPruningSuite extends SharedSparkSession {
180180
val result = df.collect().map(_(0)).toArray
181181
assert(result.length === 1)
182182

183-
val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect {
183+
val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
184184
case in: InMemoryTableScanExec => (in.readPartitions.value, in.readBatches.value)
185185
}.head
186186
assert(readPartitions === 5)
@@ -201,7 +201,7 @@ class PartitionBatchPruningSuite extends SharedSparkSession {
201201
df.collect().map(_(0)).toArray
202202
}
203203

204-
val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect {
204+
val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
205205
case in: InMemoryTableScanExec => (in.readPartitions.value, in.readBatches.value)
206206
}.head
207207

0 commit comments

Comments
 (0)