Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,19 @@ object SQLConf {

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
.internal()
.doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
"not be provided to ExchangeCoordinator.")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: we can simply write _ > 0

"must be a positive integer.")
.createWithDefault(1)

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.")
.intConf
.createWithDefault(-1)
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"must be a positive integer.")
.createWithDefault(500)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support a negative case in a new execution model, When the value is a non-positive value, this setting will not be provided to ExchangeCoordinator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maxNumPostShufflePartitions will be used as the initial shuffle partition number, just like spark.sql.shuffle.partitions in non-AE mode. It will throw exception if this value is non-positive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should add a new config spark.sql.adaptive.reducePostShufflePartitions to disable this feature. I think using a negative value to disable is tricky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we don't allow user setting a negative value so reducePostShufflePartition is always enabled if AE is enabled. Yes, as we will have other features in AE, it makes senses to add a new config.


val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
Expand Down Expand Up @@ -1728,8 +1733,9 @@ class SQLConf extends Serializable with Logging {

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS)

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.PlanQueryStage
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -93,7 +94,12 @@ class QueryExecution(
* row format conversions as needed.
*/
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
val rules = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations
} else {
preparations
}
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
Expand All @@ -104,6 +110,17 @@ class QueryExecution(
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

// With adaptive execution, whole stage codegen will be done inside `QueryStageExecutor`.
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a high level note, we really should consider replacing this with a RuleExecutor.

PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are still fixing the exchanges? This is going away when you are changing join types right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The work for join types changing will be done in the runtime, we still need EnsureRequirements here.

ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf),
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
// by inserting leaf node QueryStage. Transforming the plan after applying this rule will
// only transform node in a sub-tree.
PlanQueryStage(sparkSession))

def simpleString: String = withRedaction {
val concat = new StringConcat()
concat.append("== Physical Plan ==\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlan, QueryStage}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -52,6 +53,8 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case a: AdaptiveSparkPlan => a.resultStage.plan :: Nil
case stage: QueryStage => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import java.util.concurrent.CountDownLatch

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate

/**
* A root node to trigger query stages and execute the query plan adaptively. It incrementally
* updates the query plan when a query stage is materialized and provides accurate runtime
* statistics.
*/
case class AdaptiveSparkPlan(initialPlan: ResultQueryStage, session: SparkSession)
extends LeafExecNode{

override def output: Seq[Attribute] = initialPlan.output

@volatile private var currentQueryStage: QueryStage = initialPlan
@volatile private var error: Throwable = null
private val readyLock = new CountDownLatch(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as using wait()...notify()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can answer that myself, the countdown stuff is useful to figure out if the computation has completed. Please add some doc here.


private def replaceStage(oldStage: QueryStage, newStage: QueryStage): QueryStage = {
if (oldStage.id == newStage.id) {
newStage
} else {
val newPlanForOldStage = oldStage.plan.transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is this supposed to recurse twice? Once in the transform and once in the replaceStage? Would this suffice:

oldStage.plan.transform {
  case q: QueryStage if q.id == newStage.id => newStage
}

case q: QueryStage => replaceStage(q, newStage)
}
oldStage.withNewPlan(newPlanForOldStage)
}
}

private def createCallback(executionId: Option[Long]): QueryStageTriggerCallback = {
new QueryStageTriggerCallback {
override def onStageUpdated(stage: QueryStage): Unit = {
updateCurrentQueryStage(stage, executionId)
if (stage.isInstanceOf[ResultQueryStage]) readyLock.countDown()
}

override def onStagePlanningFailed(stage: QueryStage, e: Throwable): Unit = {
error = new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use SparkException?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, RuntimeException will kill the whole executor.

s"""
|Fail to plan stage ${stage.id}:
|${stage.plan.treeString}
""".stripMargin, e)
readyLock.countDown()
}

override def onStageMaterializingFailed(stage: QueryStage, e: Throwable): Unit = {
error = new RuntimeException(
s"""
|Fail to materialize stage ${stage.id}:
|${stage.plan.treeString}
""".stripMargin, e)
readyLock.countDown()
}

override def onError(e: Throwable): Unit = {
error = e
readyLock.countDown()
}
}
}

private def updateCurrentQueryStage(newStage: QueryStage, executionId: Option[Long]): Unit = {
currentQueryStage = replaceStage(currentQueryStage, newStage)
executionId.foreach { id =>
session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
id,
SQLExecution.getQueryExecution(id).toString,
SparkPlanInfo.fromSparkPlan(currentQueryStage)))
}
}

def resultStage: ResultQueryStage = {
if (readyLock.getCount > 0) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is finalPlan be called/used concurrently in multiple threads? If yes, the use of readyLock.getCount may cause problem. For example, two thread will both find getCount > 0 and each thread will create a QueryStageCreator before readyLock.countDown is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only be called in a single thread. @cloud-fan may confirm it.

val sc = session.sparkContext
val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
val trigger = new QueryStageTrigger(session, createCallback(executionId))
trigger.start()
trigger.trigger(initialPlan)
readyLock.await()
trigger.stop()
}

if (error != null) throw error
currentQueryStage.asInstanceOf[ResultQueryStage]
}

override def executeCollect(): Array[InternalRow] = resultStage.executeCollect()
override def executeTake(n: Int): Array[InternalRow] = resultStage.executeTake(n)
override def executeToIterator(): Iterator[InternalRow] = resultStage.executeToIterator()
override def doExecute(): RDD[InternalRow] = resultStage.execute()
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int): Unit = {
currentQueryStage.generateTreeString(
depth, lastChildren, append, verbose, "", false, maxFields)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec}

/**
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it wraps it with
* a [[QueryStage]]. At the end it adds an [[AdaptiveSparkPlan]] at the top, which will drive the
* execution of query stages.
*/
case class PlanQueryStage(session: SparkSession) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
var id = 0
val exchangeToQueryStage = new java.util.IdentityHashMap[Exchange, QueryStage]
val planWithStages = plan.transformUp {
case e: ShuffleExchangeExec =>
val queryStage = ShuffleQueryStage(id, e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question; I'm not 100% sure about the design though... during runtime, a query stage could change depends on the map output statistics of the child stages, e.g., from ShuffleQueryStage to BroadcastExchangeExec?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@maropu maropu Feb 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a comment about the design though, we can't do planning from a logical plan to a physical plan in a query stage on-runtime (e.g., by reusing the planLater logic) instead of the current design (replacing from an already-planned physical plan to another physical plan on-runtime)? Or, we need to fix phsical plans in all the query stages before execution?

id += 1
exchangeToQueryStage.put(e, queryStage)
queryStage
case e: BroadcastExchangeExec =>
val queryStage = BroadcastQueryStage(id, e)
id += 1
exchangeToQueryStage.put(e, queryStage)
queryStage
// The `ReusedExchangeExec` was added in the rule `ReuseExchange`, via transforming up the
// query plan. This rule also transform up the query plan, so when we hit `ReusedExchangeExec`
// here, the exchange being reused must already be hit before and there should be an entry
// for it in `exchangeToQueryStage`.
case e: ReusedExchangeExec =>
val existingQueryStage = exchangeToQueryStage.get(e.child)
assert(existingQueryStage != null, "The exchange being reused should be hit before.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how hard this guarantee is. You may want to change this to something stronger.

ReusedQueryStage(existingQueryStage, e.output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this wrapper? Can we just return existingQueryStage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the wrapper shares the same reason with ReusedExchangeExec, it's necessary to keep the output of this exhange.

}
AdaptiveSparkPlan(ResultQueryStage(id, planWithStages), session)
}
}
Loading