Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
mapChildren(_.transformDown(rule))
} else {
// If the transform function replaces this node with a new one, carry over the tags.
afterRule.tags ++= this.tags
afterRule.copyTagsFrom(this)
afterRule.mapChildren(_.transformDown(rule))
}
}
Expand All @@ -311,7 +311,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
}
// If the transform function replaces this node with a new one, carry over the tags.
newNode.tags ++= this.tags
newNode.copyTagsFrom(this)
newNode
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
Expand Down Expand Up @@ -60,36 +60,31 @@ class QueryExecution(

lazy val analyzed: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.ANALYSIS) {
SparkSession.setActiveSession(sparkSession)
// We can't clone `logical` here, which will reset the `_analyzed` flag.
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}

lazy val withCachedData: LogicalPlan = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not necessary, but should we clone logical too before sending to analyzer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think we should

assertAnalyzed()
assertSupported()
sparkSession.sharedState.cacheManager.useCachedData(analyzed)
sparkSession.sharedState.cacheManager.useCachedData(analyzed.clone())
}

lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, tracker)
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now query plan is mutable, I think it's better to limit the life cycle of a query plan instance. We can clone the query plan between analyzer, optimizer and planner, so that the life cycle is limited in one stage.

If we decide to clone the plan after each stage, will any test fail if we do not clone it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test added

}

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
SparkSession.setActiveSession(sparkSession)
// Runtime re-optimization requires a unique instance of every node in the logical plan.
val logicalPlan = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
optimizedPlan.clone()
} else {
optimizedPlan
}
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(logicalPlan)).next()
planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
}

// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
prepareForExecution(sparkPlan)
prepareForExecution(sparkPlan.clone())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ case class InMemoryRelation(
statsOfPlanToCache).asInstanceOf[this.type]
}

override def clone(): LogicalPlan = {
val cloned = this.copy()
cloned.statsOfPlanToCache = this.statsOfPlanToCache
cloned
}

override def simpleString(maxFields: Int): String =
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.{StringType, StructField, StructType}
Expand Down Expand Up @@ -168,4 +168,6 @@ case object ResetCommand extends RunnableCommand with IgnoreCachedData {
sparkSession.sessionState.conf.clear()
Seq.empty[Row]
}

override def clone(): LogicalPlan = this
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clone defined in TreeNode doesn't work for case object.

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,6 @@ case object ClearCacheCommand extends RunnableCommand with IgnoreCachedData {
sparkSession.catalog.clearCache()
Seq.empty[Row]
}

override def clone(): LogicalPlan = this
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ case class SaveIntoDataSourceCommand(
val redacted = SQLConf.get.redactOptions(options)
s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}"
}

override def clone(): LogicalPlan = {
SaveIntoDataSourceCommand(query.clone(), dataSource, options, mode)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mapChildren in TreeNode will change the map type. (from CaseInsensitiveMap to a normal map)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution
import scala.io.Source

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, OneRowRelation, SubqueryAlias}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

Expand Down Expand Up @@ -138,4 +139,27 @@ class QueryExecutionSuite extends SharedSQLContext {
val error = intercept[Error](qe.toString)
assert(error.getMessage.contains("error"))
}

test("analyzed plan should not change after it's generated") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests still can pass without calling clone() in QueryExecution

val df = spark.range(10).filter('id > 0).as("a")
val analyzedPlan = df.queryExecution.analyzed
val tag = new TreeNodeTag[String]("test")
analyzedPlan.setTagValue(tag, "tag")

def checkPlan(l: LogicalPlan): Unit = {
assert(l.isInstanceOf[SubqueryAlias])
val sub = l.asInstanceOf[SubqueryAlias]
assert(sub.child.isInstanceOf[Filter])
assert(sub.getTagValue(tag).isDefined)
assert(sub.child.getTagValue(tag).isEmpty)
}

checkPlan(analyzedPlan)
val df2 = df.filter('id > 0)
// trigger optimizaion
df2.queryExecution.optimizedPlan

// The previous analyzed plan should not get changed.
checkPlan(analyzedPlan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class PartitionBatchPruningSuite
val result = df.collect().map(_(0)).toArray
assert(result.length === 1)

val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect {
case in: InMemoryTableScanExec => (in.readPartitions.value, in.readBatches.value)
}.head
assert(readPartitions === 5)
Expand All @@ -208,7 +208,7 @@ class PartitionBatchPruningSuite
df.collect().map(_(0)).toArray
}

val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect {
case in: InMemoryTableScanExec => (in.readPartitions.value, in.readBatches.value)
}.head

Expand Down