Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/extensions/engines/spark/rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ Kyuubi provides some configs to make these feature easy to use.
| spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns | 3 | The max columns of inferred columns. | 1.7.0 |
| spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 |
| spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true | When true, only enable final stage isolation for writing. | 1.7.0 |
| spark.sql.finalWriteStageEagerlyKillExecutors.enabled | false | When true, eagerly kill redundant executors before running final write stage. | 1.8.0 |
| spark.sql.finalWriteStageNumPartitionFactor | 1.2 | If the target executors * factor < active executors, and target executors * factor > min executors, then inject kill executors or inject custom resource profile. | 1.8.0 |

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.sql

import org.apache.spark.FinalStageResourceManager
import org.apache.spark.sql.SparkSessionExtensions

import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, MaxPartitionStrategy}
Expand All @@ -39,5 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
// watchdog extension
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxPartitionStrategy)

extensions.injectQueryStagePrepRule(FinalStageResourceManager)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for unstable calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, e.g., CoarseGrainedSchedulerBackend is under private[spark]


import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec}

import org.apache.kyuubi.sql.{KyuubiSQLConf, MarkNumOutputColumnsRule}

/**
* This rule assumes the final write stage has less cores requirement than previous, otherwise
* this rule would take no effect.
*
* It provide a feature:
* 1. Kill redundant executors before running final write stage
*/
case class FinalStageResourceManager(session: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dynamicAllocation enabled?

return plan
}

if (!MarkNumOutputColumnsRule.isWrite(session, plan)) {
return plan
}

val sc = session.sparkContext
val executorCores = sc.getConf.getInt("spark.executor.cores", 1)
val minExecutors = sc.getConf.getInt("spark.dynamicAllocation.minExecutors", 0)
val maxExecutors = sc.getConf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
val factor = conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_PARTITION_FACTOR)
val hasImprovementRoom = maxExecutors - 1 > minExecutors * factor
// Fast fail if:
// 1. only work with yarn and k8s
// 2. maxExecutors is bigger than minExecutors * factor
if (!sc.schedulerBackend.isInstanceOf[CoarseGrainedSchedulerBackend] || hasImprovementRoom) {
return plan
}

val stage = findFinalRebalanceStage(plan)
if (stage.isEmpty) {
return plan
}

// Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied.
// So we need to apply it by self.
val shuffleRead = queryStageOptimizerRules.foldLeft(stage.get.asInstanceOf[SparkPlan]) {
case (latest, rule) => rule.apply(latest)
}
shuffleRead match {
case AQEShuffleReadExec(stage: ShuffleQueryStageExec, partitionSpecs) =>
// The condition whether inject custom resource profile:
// - target executors < active executors
// - target executors > min executors
val numActiveExecutors = sc.getExecutorIds().length
val expectedCores = partitionSpecs.length
val targetExecutors = (((expectedCores / executorCores) + 1) * factor).toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(expectedCores / executorCores) + 1 => math.ceil(expectedCores.toFloat / executorCores)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep at least 1 exec?

val hasBenefits = targetExecutors < numActiveExecutors && targetExecutors > minExecutors
if (hasBenefits) {
val shuffleId = stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleDependency.shuffleId
val numReduce = stage.plan.asInstanceOf[ShuffleExchangeExec].numPartitions
// Now, there is only a final rebalance stage waiting to execute and all tasks of previous
// stage are finished. Kill redundant existed executors eagerly so the tasks of final
// stage can be centralized scheduled.
killExecutors(sc, targetExecutors, shuffleId, numReduce)
} else {
logInfo(s"Has no benefits to kill executors or inject custom resource profile, " +
s"active executors: $numActiveExecutors, min executor: $minExecutors, " +
s"target executors: $targetExecutors.")
}

case _ =>
}

plan
}

/**
* The priority of kill executors follow:
* 1. kill executor who is younger than other (The older the JIT works better)
* 2. kill executor who produces less shuffle data first
*/
private def findExecutorToKill(
sc: SparkContext,
targetExecutors: Int,
shuffleId: Int,
numReduce: Int): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val shuffleStatus = tracker.shuffleStatuses(shuffleId)
val executorToBlockSize = new mutable.HashMap[String, Long]
shuffleStatus.withMapStatuses { mapStatus =>
mapStatus.foreach { status =>
var i = 0
var sum = 0L
while (i < numReduce) {
sum += status.getSizeForBlock(i)
i += 1
}
executorToBlockSize.getOrElseUpdate(status.location.executorId, sum)
}
}

val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
val executorsWithRegistrationTs = backend.getExecutorsWithRegistrationTs()
val existedExecutors = executorsWithRegistrationTs.keys.toSet
val expectedNumExecutorToKill = existedExecutors.size - targetExecutors
if (expectedNumExecutorToKill < 1) {
return Seq.empty
}

val executorIdsToKill = new ArrayBuffer[String]()
// We first kill executor who does not hold shuffle block. It would happen because
// the last stage is running fast and finished in a short time. The existed executors are
// from previous stages that have not been killed by DRA, so we can not find it by tracking
// shuffle status.
// We should evict executors by their alive time first and retain all of executors which
// have better locality for shuffle block.
val numExecutorToKillWithNoShuffle = expectedNumExecutorToKill - executorToBlockSize.size
executorsWithRegistrationTs.toSeq.sortBy(_._2).foreach { case (id, _) =>
if (executorIdsToKill.length < numExecutorToKillWithNoShuffle &&
!executorToBlockSize.contains(id)) {
executorIdsToKill.append(id)
}
}

// Evict the rest executors according to the shuffle block size
executorToBlockSize.toSeq.sortBy(_._2).foreach { case (id, _) =>
if (executorIdsToKill.length < expectedNumExecutorToKill) {
executorIdsToKill.append(id)
}
}

executorIdsToKill.toSeq
}

private def killExecutors(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.spark.SparkContext#killExecutors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a story about DRA. Since apache/spark#20604, org.apache.spark.SparkContext#killExecutors does not allow with DRA ON, so this pr hack the internal interface to kill executors. I think that pr is not very reaonable, it should be ok to kill executors with DRA ON if the min executor is less than the target executor.

sc: SparkContext,
targetExecutors: Int,
shuffleId: Int,
numReduce: Int): Unit = {
val executorAllocationClient = sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]

val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
logInfo(s"Request to kill executors, total count ${executorsToKill.size}, " +
s"[${executorsToKill.mkString(", ")}].")

// It is a little hack to kill executors with DRA enabled.
// It may cause the status in `ExecutorAllocationManager` inconsistent with
// `CoarseGrainedSchedulerBackend` for a while. But it should be synchronous finally.
executorAllocationClient.killExecutors(
executorIds = executorsToKill,
adjustTargetNumExecutors = false,
countFailures = false,
force = false)
}

@tailrec
private def findFinalRebalanceStage(plan: SparkPlan): Option[ShuffleQueryStageExec] = {
plan match {
case p: ProjectExec => findFinalRebalanceStage(p.child)
case f: FilterExec => findFinalRebalanceStage(f.child)
case s: SortExec if !s.global => findFinalRebalanceStage(s.child)
case stage: ShuffleQueryStageExec
if stage.isMaterialized &&
stage.plan.isInstanceOf[ShuffleExchangeExec] &&
stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleOrigin != ENSURE_REQUIREMENTS =>
Some(stage)
case _ => None
}
}

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
OptimizeSkewInRebalancePartitions,
CoalesceShufflePartitions(session),
OptimizeShuffleWithLocalRead)
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,20 @@ object KyuubiSQLConf {
.version("1.7.0")
.booleanConf
.createWithDefault(true)

val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED =
buildConf("spark.sql.finalWriteStageEagerlyKillExecutors.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's valuable to introduce a new namespace spark.sql.finalWriteStage.

Suggested change
buildConf("spark.sql.finalWriteStageEagerlyKillExecutors.enabled")
buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.enabled")

.doc("When true, eagerly kill redundant executors before running final write stage.")
.version("1.8.0")
.booleanConf
.createWithDefault(false)

val FINAL_WRITE_STAGE_PARTITION_FACTOR =
buildConf("spark.sql.finalWriteStageRetainExecutorsFactor")
.doc("If the target executors * factor < active executors, and " +
"target executors * factor > min executors, then kill redundant executors.")
.version("1.8.0")
.doubleConf
.checkValue(_ >= 1, "must be bigger than or equal to 1")
.createWithDefault(1.2)
}