Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
52f0222
[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL
maryannxue May 28, 2019
1c665d2
Address review comments
maryannxue May 28, 2019
a9b4209
Address review comments
maryannxue May 29, 2019
cbfbc4e
Remove reuse-stage lock
maryannxue May 29, 2019
4255421
Address review comments
maryannxue May 30, 2019
a656e42
Refactoring: remove applyPhysicalRules from InsertAdaptiveSparkPlan
maryannxue May 30, 2019
ac0794d
Address review comments
maryannxue May 30, 2019
62af5be
Address review comments
maryannxue May 31, 2019
77a668b
add lock object
maryannxue May 31, 2019
3e85e74
fix
maryannxue Jun 1, 2019
bd6a364
fix
maryannxue Jun 2, 2019
9eaf307
Address review comments
maryannxue Jun 3, 2019
e2fa8e3
fix
maryannxue Jun 3, 2019
55450e9
fix
maryannxue Jun 5, 2019
4b0755d
Fix the ambiguous logical node matching issue
maryannxue Jun 12, 2019
6e547d7
Revert "fix"
maryannxue Jun 12, 2019
baef964
Followup for 'Revert "fix"'
maryannxue Jun 12, 2019
ec59f88
Address review comments
maryannxue Jun 12, 2019
9af4eb1
Refine explain string for AdaptiveSparkPlanExec
maryannxue Jun 12, 2019
5b5ac2e
minor refinement
maryannxue Jun 12, 2019
da33bd7
Refine explain string of QueryStageExec
maryannxue Jun 12, 2019
5688cb4
code refinement for TreeNode; add a test in TreeNodeSuite
maryannxue Jun 13, 2019
a40b771
Address review comments
maryannxue Jun 13, 2019
37905f5
Address review comments
maryannxue Jun 13, 2019
eb8fe75
fix
maryannxue Jun 14, 2019
4481085
code refinement
maryannxue Jun 14, 2019
8570ec0
Address review comments
maryannxue Jun 14, 2019
237c067
Address review comments
maryannxue Jun 14, 2019
d6e040b
Merge remote-tracking branch 'origin/master' into aqe
maryannxue Jun 14, 2019
e265104
fix
maryannxue Jun 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)

val RUNTIME_REOPTIMIZATION_ENABLED =
buildConf("spark.sql.runtime.reoptimization.enabled")
.doc("When true, enable runtime query re-optimization.")
.booleanConf
.createWithDefault(false)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to update the doc for this config. It isn't enabled when runtime query re-optimization is true.

Copy link
Contributor Author

@maryannxue maryannxue May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it now and see if we should use the existing config spark.sql.adaptive.enabled instead.

Expand Down Expand Up @@ -1882,7 +1888,10 @@ class SQLConf extends Serializable with Logging {
def targetPostShuffleInputSize: Long =
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
def runtimeReoptimizationEnabled: Boolean = getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def adaptiveExecutionEnabled: Boolean =
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -107,6 +108,9 @@ class QueryExecution(

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
InsertAdaptiveSparkPlan(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.DataType

object SparkPlan {
// a TreeNode tag in SparkPlan, to carry its original logical plan. The planner will add this tag
// when converting a logical plan to a physical plan.
/** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted. */
val LOGICAL_PLAN_TAG = TreeNodeTag[LogicalPlan]("logical_plan")

/** The [[LogicalPlan]] inherited from its ancestor. */
val LOGICAL_PLAN_INHERITED_TAG = TreeNodeTag[LogicalPlan]("logical_plan_inherited")
}

/**
Expand Down Expand Up @@ -79,6 +81,34 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
super.makeCopy(newArgs)
}

/**
* @return The logical plan this plan is linked to.
*/
def logicalLink: Option[LogicalPlan] =
getTagValue(SparkPlan.LOGICAL_PLAN_TAG)
.orElse(getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG))

/**
* Set logical plan link recursively if unset.
*/
def setLogicalLink(logicalPlan: LogicalPlan): Unit = {
setLogicalLink(logicalPlan, false)
}

private def setLogicalLink(logicalPlan: LogicalPlan, inherited: Boolean = false): Unit = {
if (logicalLink.isDefined) {
return
}

val tag = if (inherited) {
SparkPlan.LOGICAL_PLAN_INHERITED_TAG
} else {
SparkPlan.LOGICAL_PLAN_TAG
}
setTagValue(tag, logicalPlan)
children.foreach(_.setLogicalLink(logicalPlan, true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we set inherited logical plan into children, we can't set logical plan into them? So only the top SparkPlan has its logical plan, all its children just have inherited logical plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's true. And you could always "force set" this logical link if need be.
It's not necessary to draw a line between "logical plan" and "inherited logical plan" for the use of adaptive execution, so this is simply to make sure any future use of it can tell a top node from the rest.

}

/**
* @return All metrics containing metrics of this SparkPlan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -53,6 +54,8 @@ private[execution] object SparkPlanInfo {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case ReusedSubqueryExec(child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -36,6 +37,7 @@ class SparkPlanner(
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
DataSourceV2Strategy ::
FileSourceStrategy ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -58,6 +59,8 @@ case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
protected override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException()
}

override def setLogicalLink(logicalPlan: LogicalPlan): Unit = {}
}

abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Expand All @@ -69,7 +72,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ReturnAnswer(rootPlan) => rootPlan
case _ => plan
}
p.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, logicalPlan)
p.setLogicalLink(logicalPlan)
p
}
}
Expand Down
Loading