-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL #20303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
7f0c2c9
4a9d054
3487eb8
7df45f8
a83967c
63fece9
52c7616
1081a3f
4a2311c
2c55985
5819826
ea93dbf
068ef94
4e69702
41f3a90
666bf76
e4bfc22
bef8ab8
2d6f110
fd413d4
028b0ac
2e08778
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -282,14 +282,19 @@ object SQLConf { | |
|
|
||
| val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = | ||
| buildConf("spark.sql.adaptive.minNumPostShufflePartitions") | ||
| .internal() | ||
| .doc("The advisory minimal number of post-shuffle partitions provided to " + | ||
| "ExchangeCoordinator. This setting is used in our test to make sure we " + | ||
| "have enough parallelism to expose issues that will not be exposed with a " + | ||
| "single partition. When the value is a non-positive value, this setting will " + | ||
| "not be provided to ExchangeCoordinator.") | ||
| .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") | ||
| .intConf | ||
| .checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " + | ||
| "must be a positive integer.") | ||
| .createWithDefault(1) | ||
|
|
||
| val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = | ||
| buildConf("spark.sql.adaptive.maxNumPostShufflePartitions") | ||
| .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.") | ||
| .intConf | ||
| .createWithDefault(-1) | ||
| .checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " + | ||
|
||
| "must be a positive integer.") | ||
| .createWithDefault(500) | ||
|
||
|
|
||
| val SUBEXPRESSION_ELIMINATION_ENABLED = | ||
| buildConf("spark.sql.subexpressionElimination.enabled") | ||
|
|
@@ -1728,8 +1733,9 @@ class SQLConf extends Serializable with Logging { | |
|
|
||
| def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) | ||
|
|
||
| def minNumPostShufflePartitions: Int = | ||
| getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) | ||
| def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) | ||
|
|
||
| def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS) | ||
|
|
||
| def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.adaptive | ||
|
|
||
| import java.util.concurrent.CountDownLatch | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, SQLExecution} | ||
| import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate | ||
|
|
||
| /** | ||
| * A root node to execute the query plan adaptively. It creates query stages, and incrementally | ||
| * updates the query plan when a query stage is materialized and provides accurate runtime | ||
| * data statistics. | ||
| */ | ||
| case class AdaptiveSparkPlan(initialPlan: SparkPlan, session: SparkSession) | ||
| extends LeafExecNode{ | ||
|
|
||
| override def output: Seq[Attribute] = initialPlan.output | ||
|
|
||
| @volatile private var currentPlan: SparkPlan = initialPlan | ||
| @volatile private var error: Throwable = null | ||
|
|
||
| // We will release the lock when we finish planning query stages, or we fail to do the planning. | ||
| // Getting `resultStage` will be blocked until the lock is release. | ||
| // This is better than wait()/notify(), as we can easily check if the computation has completed, | ||
| // by calling `readyLock.getCount()`. | ||
| private val readyLock = new CountDownLatch(1) | ||
|
|
||
| private def createCallback(executionId: Option[Long]): QueryStageCreatorCallback = { | ||
| new QueryStageCreatorCallback { | ||
| override def onPlanUpdate(updatedPlan: SparkPlan): Unit = { | ||
| updateCurrentPlan(updatedPlan, executionId) | ||
| if (updatedPlan.isInstanceOf[ResultQueryStage]) readyLock.countDown() | ||
| } | ||
|
|
||
| override def onStageMaterializingFailed(stage: QueryStage, e: Throwable): Unit = { | ||
| error = new SparkException( | ||
| s""" | ||
| |Fail to materialize stage ${stage.id}: | ||
| |${stage.plan.treeString} | ||
| """.stripMargin, e) | ||
| readyLock.countDown() | ||
| } | ||
|
|
||
| override def onError(e: Throwable): Unit = { | ||
| error = e | ||
| readyLock.countDown() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def updateCurrentPlan(newPlan: SparkPlan, executionId: Option[Long]): Unit = { | ||
| currentPlan = newPlan | ||
| executionId.foreach { id => | ||
| session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( | ||
| id, | ||
| SQLExecution.getQueryExecution(id).toString, | ||
| SparkPlanInfo.fromSparkPlan(currentPlan))) | ||
| } | ||
| } | ||
|
|
||
| def finalPlan: ResultQueryStage = { | ||
| if (readyLock.getCount > 0) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is finalPlan be called/used concurrently in multiple threads? If yes, the use of readyLock.getCount may cause problem. For example, two thread will both find getCount > 0 and each thread will create a QueryStageCreator before readyLock.countDown is called.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should only be called in a single thread. @cloud-fan may confirm it. |
||
| val sc = session.sparkContext | ||
| val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong) | ||
| val creator = new QueryStageCreator(initialPlan, session, createCallback(executionId)) | ||
| creator.start() | ||
| readyLock.await() | ||
| creator.stop() | ||
| } | ||
|
|
||
| if (error != null) throw error | ||
| currentPlan.asInstanceOf[ResultQueryStage] | ||
| } | ||
|
|
||
| override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect() | ||
| override def executeTake(n: Int): Array[InternalRow] = finalPlan.executeTake(n) | ||
| override def executeToIterator(): Iterator[InternalRow] = finalPlan.executeToIterator() | ||
| override def doExecute(): RDD[InternalRow] = finalPlan.execute() | ||
| override def generateTreeString( | ||
| depth: Int, | ||
| lastChildren: Seq[Boolean], | ||
| append: String => Unit, | ||
| verbose: Boolean, | ||
| prefix: String = "", | ||
| addSuffix: Boolean = false, | ||
| maxFields: Int): Unit = { | ||
| currentPlan.generateTreeString( | ||
| depth, lastChildren, append, verbose, "", false, maxFields) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.adaptive | ||
|
|
||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.command.ExecutedCommandExec | ||
|
|
||
| /** | ||
| * This rule wraps the query plan with an [[AdaptiveSparkPlan]], which executes the query plan | ||
| * adaptively with runtime data statistics. Note that this rule must be run after | ||
| * [[org.apache.spark.sql.execution.exchange.EnsureRequirements]], so that the exchange nodes are | ||
| * already inserted. | ||
| */ | ||
| case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] { | ||
|
|
||
| override def apply(plan: SparkPlan): SparkPlan = plan match { | ||
| case _: ExecutedCommandExec => plan | ||
| case _ if session.sessionState.conf.adaptiveExecutionEnabled => | ||
| AdaptiveSparkPlan(plan, session.cloneSession()) | ||
|
||
| case _ => plan | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: we can simply write
_ > 0