Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ private class ShuffleStatus(

/**
* Register a map output. If there is already a registered location for the map output then it
* will be replaced by the new location.
* will be replaced by the new location. Returns true if the checksum in the new MapStatus is
* different from a previous registered MapStatus. Otherwise, returns false.
*/
def addMapOutput(mapIndex: Int, status: MapStatus): Unit = withWriteLock {
def addMapOutput(mapIndex: Int, status: MapStatus): Boolean = withWriteLock {
var isChecksumMismatch: Boolean = false
val currentMapStatus = mapStatuses(mapIndex)
if (currentMapStatus == null) {
_numAvailableMapOutputs += 1
Expand All @@ -183,9 +185,11 @@ private class ShuffleStatus(
logInfo(s"Checksum of map output changes from ${preStatus.checksumValue} to " +
s"${status.checksumValue} for task ${status.mapId}.")
checksumMismatchIndices.add(mapIndex)
isChecksumMismatch = true
}
mapStatuses(mapIndex) = status
mapIdToMapIndex(status.mapId) = mapIndex
isChecksumMismatch
}

/**
Expand Down Expand Up @@ -853,7 +857,7 @@ private[spark] class MapOutputTrackerMaster(
}
}

def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = {
def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Boolean = {
shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,15 @@ package object config {
.checkValues(Set("ADLER32", "CRC32", "CRC32C"))
.createWithDefault("ADLER32")

private[spark] val SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED =
ConfigBuilder("spark.scheduler.checksumMismatchFullRetry.enabled")
.doc("Whether to retry all tasks of a consumer stage when we detect checksum mismatches " +
"with its producer stages. The checksum computation is controlled by another config " +
"called SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED without SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED ? or vice versa?

What about getting removing the SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED (as the version is where it is introduced is also 4.1.0 we can do that) and computing the checksum when SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED is true? So having only one config for the feature?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. Why do we separate the checksum computation and stage retry with two flag? Do we have logging for checksum mismatch without retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is log written when checksum mismatch happens. If SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED is false, there'll be no fully retry for succeeding stages but only logs for the checksum mismatch.

And when we want to enable SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED, need to make sure SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED is also true if we keep these two configs.

One config can be easier to use. If it makes sense to you all, I'll remove SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to have the log-only mode, so that Spark users can do impact analysis before turnning on the retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can improve it a bit more: we will compute checksum when either SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED or SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

.version("4.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_COMPRESS =
ConfigBuilder("spark.shuffle.compress")
.doc("Whether to compress shuffle output. Compression will use " +
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,7 @@ abstract class RDD[T: ClassTag](
/**
* Return whether this RDD is reliably checkpointed and materialized.
*/
private[rdd] def isReliablyCheckpointed: Boolean = {
private[spark] def isReliablyCheckpointed: Boolean = {
checkpointData match {
case Some(reliable: ReliableRDDCheckpointData[_]) if reliable.isCheckpointed => true
case _ => false
Expand Down
98 changes: 70 additions & 28 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ private[spark] class DAGScheduler(

private val shuffleFinalizeRpcThreads = sc.conf.get(config.PUSH_SHUFFLE_FINALIZE_RPC_THREADS)

private val checksumMismatchFullRetryEnabled =
sc.getConf.get(config.SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED)

// Since SparkEnv gets initialized after DAGScheduler, externalShuffleClient needs to be
// initialized lazily
private lazy val externalShuffleClient: Option[BlockStoreClient] =
Expand Down Expand Up @@ -1551,29 +1554,41 @@ private[spark] class DAGScheduler(
// The operation here can make sure for the partially completed intermediate stage,
// `findMissingPartitions()` returns all partitions every time.
stage match {
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
// already executed at least once
if (sms.getNextAttemptId > 0) {
// While we previously validated possible rollbacks during the handling of a FetchFailure,
// where we were fetching from an indeterminate source map stages, this later check
// covers additional cases like recalculating an indeterminate stage after an executor
// loss. Moreover, because this check occurs later in the process, if a result stage task
// has successfully completed, we can detect this and abort the job, as rolling back a
// result stage is not possible.
val stagesToRollback = collectSucceedingStages(sms)
abortStageWithInvalidRollBack(stagesToRollback)
// stages which cannot be rolled back were aborted which leads to removing the
// the dependant job(s) from the active jobs set
val numActiveJobsWithStageAfterRollback =
activeJobs.count(job => stagesToRollback.contains(job.finalStage))
if (numActiveJobsWithStageAfterRollback == 0) {
logInfo(log"All jobs depending on the indeterminate stage " +
log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage is not needed anymore.")
return
case sms: ShuffleMapStage if !sms.isAvailable =>
if (checksumMismatchFullRetryEnabled) {
// When the parents of this stage are indeterminate (e.g., some parents are not
// checkpointed and checksum mismatches are detected), the output data of the parents
// may have changed due to task retries. For correctness reason, we need to
// retry all tasks of the current stage. The legacy way of using current stage's
// deterministic level to trigger full stage retry is not accurate.
if (stage.isParentIndeterminate) {
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
sms.shuffleDep.newShuffleMergeState()
}
} else if (stage.isIndeterminate) {
// already executed at least once
if (sms.getNextAttemptId > 0) {
// While we previously validated possible rollbacks during the handling of a FetchFailure,
// where we were fetching from an indeterminate source map stages, this later check
// covers additional cases like recalculating an indeterminate stage after an executor
// loss. Moreover, because this check occurs later in the process, if a result stage task
// has successfully completed, we can detect this and abort the job, as rolling back a
// result stage is not possible.
val stagesToRollback = collectSucceedingStages(sms)
abortStageWithInvalidRollBack(stagesToRollback)
// stages which cannot be rolled back were aborted which leads to removing the
// the dependant job(s) from the active jobs set
val numActiveJobsWithStageAfterRollback =
activeJobs.count(job => stagesToRollback.contains(job.finalStage))
if (numActiveJobsWithStageAfterRollback == 0) {
logInfo(log"All jobs depending on the indeterminate stage " +
log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage is not needed anymore.")
return
}
}
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
sms.shuffleDep.newShuffleMergeState()
}
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
sms.shuffleDep.newShuffleMergeState()
case _ =>
}

Expand Down Expand Up @@ -1886,6 +1901,20 @@ private[spark] class DAGScheduler(
}
}

/**
* If a map stage is non-deterministic, the map tasks of the stage may return different result
* when re-try. To make sure data correctness, we need to re-try all the tasks of its succeeding
* stages, as the input data may be changed after the map tasks are re-tried. For stages where
* rollback and retry all tasks are not possible, we will need to abort the stages.
*/
private[scheduler] def abortUnrollbackableStages(mapStage: ShuffleMapStage): Unit = {
val stagesToRollback = collectSucceedingStages(mapStage)
val rollingBackStages = abortStageWithInvalidRollBack(stagesToRollback)
logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output " +
log"was failed, we will roll back and rerun below stages which include itself and all its " +
log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")
}

/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
Expand Down Expand Up @@ -2022,8 +2051,25 @@ private[spark] class DAGScheduler(
// The epoch of the task is acceptable (i.e., the task was launched after the most
// recent failure we're aware of for the executor), so mark the task's output as
// available.
mapOutputTracker.registerMapOutput(
val isChecksumMismatched = mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
if (isChecksumMismatched) {
shuffleStage.isChecksumMismatched = isChecksumMismatched
Copy link
Contributor

@mridulm mridulm Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never reset back to false when the stage attempt is retried and succeeds - what am I missing ?
This would mean the app will always fail, right ?

Not sure what I am missing here.
+CC @ivoson , @cloud-fan , @attilapiros

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mridulm , this is not set back to false. Would expect all the succeeding stages do fully retry once there is checksum mismatch happening for the stage, as we don't know the successful tasks consumed which version shuffle output.

This won't fail the app, the impact is that the succeeding stages would have a fully-retry.

The code logic has changed a little bit in PR: #53274

Pls take a look once you get a change. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On retry - when we throw away the entire mapper output and recompute it -> at which point, we can go back to setting it to false ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it's not setting back to false. We'll only recompute once any new shuffle checksum mismatch is detected. Maybe we can remove the flag to avoid the confusion here.

// There could be multiple checksum mismatches detected for a single stage attempt.
// We check for stage abortion once and only once when we first detect checksum
// mismatch for each stage attempt. For example, assume that we have
// stage1 -> stage2, and we encounter checksum mismatch during the retry of stage1.
// In this case, we need to call abortUnrollbackableStages() for the succeeding
// stages. Assume that when stage2 is retried, some tasks finish and some tasks
// failed again with FetchFailed. In case that we encounter checksum mismatch again
// during the retry of stage1, we need to call abortUnrollbackableStages() again.
if (shuffleStage.maxChecksumMismatchedId < smt.stageAttemptId) {
shuffleStage.maxChecksumMismatchedId = smt.stageAttemptId
if (checksumMismatchFullRetryEnabled && shuffleStage.isStageIndeterminate) {
abortUnrollbackableStages(shuffleStage)
}
}
}
}
} else {
logInfo(log"Ignoring ${MDC(TASK_NAME, smt)} completion from an older attempt of indeterminate stage")
Expand Down Expand Up @@ -2148,12 +2194,8 @@ private[spark] class DAGScheduler(
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
// guaranteed to be determinate, so the input data of the reducers will not change
// even if the map tasks are re-tried.
if (mapStage.isIndeterminate) {
val stagesToRollback = collectSucceedingStages(mapStage)
val rollingBackStages = abortStageWithInvalidRollBack(stagesToRollback)
logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " +
log"we will roll back and rerun below stages which include itself and all its " +
log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}")
if (mapStage.isIndeterminate && !checksumMismatchFullRetryEnabled) {
abortUnrollbackableStages(mapStage)
}

// We expect one executor failure to trigger many FetchFailures in rapid succession,
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ private[scheduler] abstract class Stage(
private var nextAttemptId: Int = 0
private[scheduler] def getNextAttemptId: Int = nextAttemptId

/**
* Whether checksum mismatches have been detected across different attempt of the stage, where
* checksum mismatches typically indicates that different stage attempts have produced different
* data.
*/
private[scheduler] var isChecksumMismatched: Boolean = false

/**
* The maximum of task attempt id where checksum mismatches are detected.
*/
private[scheduler] var maxChecksumMismatchedId: Int = nextAttemptId

val name: String = callSite.shortForm
val details: String = callSite.longForm

Expand Down Expand Up @@ -131,4 +143,14 @@ private[scheduler] abstract class Stage(
def isIndeterminate: Boolean = {
rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}

// Returns true if any parents of this stage are indeterminate.
def isParentIndeterminate: Boolean = {
parents.exists(_.isStageIndeterminate)
}

// Returns true if the stage itself is indeterminate.
def isStageIndeterminate: Boolean = {
!rdd.isReliablyCheckpointed && isChecksumMismatched
}
}
Loading