Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,18 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
_shuffleMergedFinalized = true
}

def shuffleMergeFinalized : Boolean = _shuffleMergedFinalized
/**
* Returns true if push based shuffle is disabled for this stage, or if the shuffle merge for
* this stage is finalized, i.e. the shuffle merge results for all partitions are available.
*/
def shuffleMergeFinalized : Boolean = {
// Empty RDD won't be computed therefore shuffle merge finalized should be true by default.
if (shuffleMergeEnabled && rdd.getNumPartitions > 0) {
_shuffleMergedFinalized
} else {
true
}
}

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
Expand Down
33 changes: 14 additions & 19 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeoutException, TimeUnit }
import java.util.concurrent.{ConcurrentHashMap, TimeoutException, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.tailrec
Expand All @@ -37,8 +37,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, MergeFinalizerListener}
import org.apache.spark.network.shuffle.{BlockStoreClient, MergeFinalizerListener}
import org.apache.spark.network.shuffle.protocol.MergeStatuses
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
Expand Down Expand Up @@ -267,14 +266,9 @@ private[spark] class DAGScheduler(

// Since SparkEnv gets initialized after DAGScheduler, externalShuffleClient needs to be
// initialized lazily
private lazy val externalShuffleClient: Option[ExternalBlockStoreClient] =
private lazy val externalShuffleClient: Option[BlockStoreClient] =
if (pushBasedShuffleEnabled) {
val transConf = SparkTransportConf.fromSparkConf(sc.conf, "shuffle", 1)
val shuffleClient = new ExternalBlockStoreClient(transConf, env.securityManager,
env.securityManager.isAuthenticationEnabled(),
sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
shuffleClient.init(sc.conf.getAppId)
Some(shuffleClient)
Some(env.blockManager.blockStoreClient)
} else {
None
}
Expand Down Expand Up @@ -720,7 +714,7 @@ private[spark] class DAGScheduler(
// Mark mapStage as available with shuffle outputs only after shuffle merge is
// finalized with push based shuffle. If not, subsequent ShuffleMapStage won't
// read from merged output as the MergeStatuses are not available.
if (!mapStage.isAvailable || !mapStage.isMergeFinalized) {
if (!mapStage.isAvailable || !mapStage.shuffleDep.shuffleMergeFinalized) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
Expand Down Expand Up @@ -1351,7 +1345,7 @@ private[spark] class DAGScheduler(
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
// Only generate merger location for a given shuffle dependency once.
if (s.shuffleDep.shuffleMergeEnabled) {
if (!s.isMergeFinalized) {
if (!s.shuffleDep.shuffleMergeFinalized) {
prepareShuffleServicesForShuffleMapStage(s)
} else {
// Disable Shuffle merge for the retry/reuse of the same shuffle dependency if it has
Expand Down Expand Up @@ -1717,7 +1711,7 @@ private[spark] class DAGScheduler(
}

if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.isMergeFinalized &&
if (!shuffleStage.shuffleDep.shuffleMergeFinalized && shuffleStage.isAvailable &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
scheduleShuffleMergeFinalize(shuffleStage)
} else {
Expand Down Expand Up @@ -1756,11 +1750,13 @@ private[spark] class DAGScheduler(
if (mapStage.rdd.isBarrier()) {
// Mark all the map as broken in the map stage, to ensure retry all the tasks on
// resubmitted stage attempt.
// TODO: SPARK-35547: Clean all push-based shuffle metadata like merge enabled and
// TODO: finalized as we are clearing all the merge results.
mapOutputTracker.unregisterAllMapAndMergeOutput(shuffleId)
} else if (mapIndex != -1) {
// Mark the map whose fetch failed as broken in the map stage
mapOutputTracker.unregisterMapOutput(shuffleId, mapIndex, bmAddress)
if (mapStage.shuffleDep.shuffleMergeEnabled) {
if (pushBasedShuffleEnabled) {
// Possibly unregister the merge result <shuffleId, reduceId>, if the FetchFailed
// mapIndex is part of the merge result of <shuffleId, reduceId>
mapOutputTracker.
Expand Down Expand Up @@ -1982,6 +1978,8 @@ private[spark] class DAGScheduler(
case failedMapStage: ShuffleMapStage =>
// Mark all the map as broken in the map stage, to ensure retry all the tasks on
// resubmitted stage attempt.
// TODO: SPARK-32923: Clean all push-based shuffle metadata like merge enabled and
// TODO: finalized as we are clearing all the merge results.
mapOutputTracker.unregisterAllMapAndMergeOutput(failedMapStage.shuffleDep.shuffleId)

case failedResultStage: ResultStage =>
Expand Down Expand Up @@ -2027,6 +2025,8 @@ private[spark] class DAGScheduler(
* Schedules shuffle merge finalize.
*/
private[scheduler] def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
// TODO: SPARK-33701: Instead of waiting for a constant amount of time for finalization
// TODO: for all the stages, adaptively tune timeout for merge finalization
logInfo(("%s (%s) scheduled for finalizing" +
" shuffle merge in %s s").format(stage, stage.name, shuffleMergeFinalizeWaitSec))
shuffleMergeFinalizeScheduler.schedule(
Expand All @@ -2049,7 +2049,6 @@ private[spark] class DAGScheduler(
externalShuffleClient.foreach { shuffleClient =>
val shuffleId = stage.shuffleDep.shuffleId
val numMergers = stage.shuffleDep.getMergerLocs.length
val numResponses = new AtomicInteger()
val results = (0 until numMergers).map(_ => SettableFuture.create[Boolean]())

stage.shuffleDep.getMergerLocs.zipWithIndex.foreach {
Expand All @@ -2064,14 +2063,12 @@ private[spark] class DAGScheduler(
assert(shuffleId == statuses.shuffleId)
eventProcessLoop.post(RegisterMergeStatuses(stage, MergeStatus.
convertMergeStatusesToMergeStatusArr(statuses, shuffleServiceLoc)))
numResponses.incrementAndGet()
results(index).set(true)
}

override def onShuffleMergeFailure(e: Throwable): Unit = {
logWarning(s"Exception encountered when trying to finalize shuffle " +
s"merge on ${shuffleServiceLoc.host} for shuffle $shuffleId", e)
numResponses.incrementAndGet()
// Do not fail the future as this would cause dag scheduler to prematurely
// give up on waiting for merge results from the remaining shuffle services
// if one fails
Expand All @@ -2082,8 +2079,6 @@ private[spark] class DAGScheduler(
// DAGScheduler only waits for a limited amount of time for the merge results.
// It will attempt to submit the next stage(s) irrespective of whether merge results
// from all shuffle services are received or not.
// TODO: SPARK-33701: Instead of waiting for a constant amount of time for finalization
// TODO: for all the stages, adaptively tune timeout for merge finalization
try {
Futures.allAsList(results: _*).get(shuffleMergeResultsTimeoutSec, TimeUnit.SECONDS)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,4 @@ private[spark] class ShuffleMapStage(
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}

/**
* Returns true if push based shuffle is disabled for this stage, or if the shuffle merge for
* this stage is finalized, i.e. the shuffle merge results for all partitions are available.
*/
def isMergeFinalized: Boolean = {
// EmptyRDD should not be computed
if (numPartitions > 0 && shuffleDep.shuffleMergeEnabled) {
shuffleDep.shuffleMergeFinalized
} else {
true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3562,8 +3562,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val shuffleStage2 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
assert(shuffleStage2.shuffleDep.getMergerLocs.nonEmpty)

assert(shuffleStage2.isMergeFinalized)
assert(shuffleStage1.isMergeFinalized)
assert(shuffleStage2.shuffleDep.shuffleMergeFinalized)
assert(shuffleStage1.shuffleDep.shuffleMergeFinalized)
assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep1.shuffleId) == parts)
assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep2.shuffleId) == parts)

Expand Down