Skip to content

Conversation

@ivoson
Copy link
Contributor

@ivoson ivoson commented Sep 15, 2025

What changes were proposed in this pull request?

This PR proposes to retry all tasks of the consumer stages, when checksum mismatches are detected on their producer stages. In the case that we can't rollback and retry all tasks of a consumer stage, we will have to abort the stage (thus the job).

How do we detect and handle nondeterministic before:

  • Stages are labeled as indeterminate at planning time, prior to query execution
  • When a task completes and FetchFailed is detected, we will abort all unrollbackable succeeding stages of the map stage, and resubmit failed stages.
  • In submitMissingTasks(), if a stage itself is isIndeterminate, we will call unregisterAllMapAndMergeOutput() and retry all tasks for stage.

How do we detect and handle nondeterministic now:

  • During query execution, we keep track on the checksums produced by each map task.
  • When a task completes and checksum mismatch is detected, we will abort unrollbackable succeeding stages of the stage with checksum mismatches. The failed stages resubmission still happen in the same places as before.
  • In submitMissingTasks(), if the parent of a stage has checksum mismatches, we will call unregisterAllMapAndMergeOutput() and retry all tasks for stage.

Note that (1) if a stage isReliablyCheckpointed, the consumer stages don't need to have whole stage retry, and (2) when mismatches are detected for a stage in a chain (e.g., the first stage in stage_i -> stage_i+1 -> stage_i+2 -> ...), the direct consumer (e.g., stage_i+1) of the stage will have a whole stage retry, and an indirect consumer (e.g., stage_i+2) will have a whole stage retry when its parent detects checksum mismatches.

Why are the changes needed?

Handle nondeterministic issues caused by the retry of shuffle map task.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UTs added.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Sep 15, 2025
@ivoson ivoson changed the title [WIP][SPARK-53575][CORE] Retry entire consumer stages when checksum mismatch detected for a retried shuffle map task [SPARK-53575][CORE] Retry entire consumer stages when checksum mismatch detected for a retried shuffle map task Sep 17, 2025
@ivoson ivoson marked this pull request as ready for review September 17, 2025 03:39
@ivoson
Copy link
Contributor Author

ivoson commented Sep 22, 2025

cc @cloud-fan @mridulm @attilapiros can you please review this PR? This is to deal with non-deterministic stage retry based on the checksum mismatch detection #50230

ConfigBuilder("spark.scheduler.checksumMismatchFullRetry.enabled")
.doc("Whether to retry all tasks of a consumer stage when we detect checksum mismatches " +
"with its producer stages. The checksum computation is controlled by another config " +
"called SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED without SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED ? or vice versa?

What about getting removing the SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED (as the version is where it is introduced is also 4.1.0 we can do that) and computing the checksum when SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED is true? So having only one config for the feature?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. Why do we separate the checksum computation and stage retry with two flag? Do we have logging for checksum mismatch without retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is log written when checksum mismatch happens. If SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED is false, there'll be no fully retry for succeeding stages but only logs for the checksum mismatch.

And when we want to enable SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED, need to make sure SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED is also true if we keep these two configs.

One config can be easier to use. If it makes sense to you all, I'll remove SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to have the log-only mode, so that Spark users can do impact analysis before turnning on the retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can improve it a bit more: we will compute checksum when either SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED or SCHEDULER_CHECKSUM_MISMATCH_FULL_RETRY_ENABLED is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

@github-actions github-actions bot added the SQL label Sep 24, 2025
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new configuration should be under spark.scheduler.checksum namespace because spark.scheduler.checksum.enabled=false will disable this, @ivoson .

Specifically, I'd like to propose the following new name. WDYT?

- spark.scheduler.checksumMismatchFullRetry.enabled
+ spark.scheduler.checksum.enableFullRetryOnMismatch

@ivoson
Copy link
Contributor Author

ivoson commented Sep 24, 2025

The new configuration should be under spark.scheduler.checksum namespace because spark.scheduler.checksum.enabled=false will disable this, @ivoson .

Specifically, I'd like to propose the following new name. WDYT?

- spark.scheduler.checksumMismatchFullRetry.enabled
+ spark.scheduler.checksum.enableFullRetryOnMismatch

Hey @dongjoon-hyun, there might be some misunderstanding here, we don't depends on spark.scheduler.checksum.enabled, and the config actually does not exist.

Currently there are two related configs for the feature:
spark.sql.shuffle.orderIndependentChecksum.enabled: whether compute order independent checksum for shuffle output;
spark.scheduler.checksumMismatchFullRetry.enabled: whether retry all tasks for a succeeding stages when shuffle checksum mismatch detected;

Pls let me know if you have any suggestions regarding above configs. Thanks.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Sep 25, 2025

Currently there are two related configs for the feature:
spark.sql.shuffle.orderIndependentChecksum.enabled: whether compute order independent checksum for shuffle output;
spark.scheduler.checksumMismatchFullRetry.enabled: whether retry all tasks for a succeeding stages when shuffle checksum mismatch detected;

Thank you for correcting me. In that case, spark.sql.shuffle.orderIndependentChecksum.* seems to be the parent name space for this feature. If spark.sql.shuffle.orderIndependentChecksum.enabled=false disabled this PR's configuration, this should be under the same namespace. The revised config name might be the following. WDYT, @ivoson ?

- spark.scheduler.checksumMismatchFullRetry.enabled
+ spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch

The basic idea is the dependency among the configurations. Please let me know your hierarchy for new set of configurations for this feature.

@ivoson
Copy link
Contributor Author

ivoson commented Sep 26, 2025

Currently there are two related configs for the feature:
spark.sql.shuffle.orderIndependentChecksum.enabled: whether compute order independent checksum for shuffle output;
spark.scheduler.checksumMismatchFullRetry.enabled: whether retry all tasks for a succeeding stages when shuffle checksum mismatch detected;

Thank you for correcting me. In that case, spark.sql.shuffle.orderIndependentChecksum.* seems to be the parent name space for this feature. If spark.sql.shuffle.orderIndependentChecksum.enabled=false disabled this PR's configuration, this should be under the same namespace. The revised config name might be the following. WDYT, @ivoson ?

- spark.scheduler.checksumMismatchFullRetry.enabled
+ spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch

The basic idea is the dependency among the configurations. Please let me know your hierarchy for new set of configurations for this feature.

Thanks @dongjoon-hyun for the suggestion. Updated. For the new configs:

spark.sql.shuffle.orderIndependentChecksum.enabled -> when it's true, we'll compute the shuffle checksum and only log detected checksum mismatch if spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch is false;
spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch -> when it's true, we'll compute the shuffle checksum and fully retry consumer stages once mismatch happens.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for updating, @ivoson . +1, LGTM (if CI passes)

Could you rebase to master branch because yesterday branch was broken? It's fixed now via 3d19a65 .

log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage is not needed anymore.")
return
case sms: ShuffleMapStage if !sms.isAvailable =>
if (sms.shuffleDep.checksumMismatchFullRetryEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can make the code a bit more clearer

val needsFullStageRetry = if (sms.shuffleDep.checksumMismatchFullRetryEnabled) {
  // the comment
  stage.isParentIndeterminate
} else {
  the legacy code
}
if (needsFullStageRetry) {
  mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
  sms.shuffleDep.newShuffleMergeState()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.

Seq(("true", "false"), ("false", "true"), ("true", "true")).foreach {
case (orderIndependentChecksumEnabled: String, checksumMismatchFullRetryEnabled: String) =>
withSQLConf(
"spark.sql.shuffle.orderIndependentChecksum.enabled" -> orderIndependentChecksumEnabled,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's not hardcode it, we can reference them by SQLConf.key_name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

checksumMismatchFullRetryEnabled) {
assert(SQLConf.get.shuffleOrderIndependentChecksumEnabled ===
orderIndependentChecksumEnabled.toBoolean)
assert(SQLConf.get.shuffleChecksumMismatchFullRetryEnabled ===
Copy link
Contributor

@cloud-fan cloud-fan Sep 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have sql conf test suites to verify the basic functionalities, no need to test it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 922adad Sep 29, 2025
@ivoson ivoson deleted the SPARK-53575 branch September 29, 2025 14:35
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…ch detected for a retried shuffle map task

### What changes were proposed in this pull request?
This PR proposes to retry all tasks of the consumer stages, when checksum mismatches are detected on their producer stages. In the case that we can't rollback and retry all tasks of a consumer stage, we will have to abort the stage (thus the job).

How do we detect and handle nondeterministic before:
- Stages are labeled as indeterminate at planning time, prior to query execution
- When a task completes and `FetchFailed` is detected, we will abort all unrollbackable succeeding stages of the map stage, and resubmit failed stages.
- In `submitMissingTasks()`, if a stage itself is isIndeterminate, we will call `unregisterAllMapAndMergeOutput()` and retry all tasks for stage.

How do we detect and handle nondeterministic now:
- During query execution, we keep track on the checksums produced by each map task.
- When a task completes and checksum mismatch is detected, we will abort unrollbackable succeeding stages of the stage with checksum mismatches. The failed stages resubmission still happen in the same places as before.
- In `submitMissingTasks()`, if the parent of a stage has checksum mismatches, we will call `unregisterAllMapAndMergeOutput()` and retry all tasks for stage.

Note that (1) if a stage `isReliablyCheckpointed`, the consumer stages don't need to have whole stage retry, and (2) when mismatches are detected for a stage in a chain (e.g., the first stage in stage_i -> stage_i+1 -> stage_i+2 -> ...), the direct consumer (e.g., stage_i+1) of the stage will have a whole stage retry, and an indirect consumer (e.g., stage_i+2) will have a whole stage retry when its parent detects checksum mismatches.

### Why are the changes needed?
Handle nondeterministic issues caused by the retry of shuffle map task.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UTs added.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52336 from ivoson/SPARK-53575.

Authored-by: Tengfei Huang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
log"(${MDC(STAGE_ID, stage.id)}) were aborted so this stage is not needed anymore.")
return
case sms: ShuffleMapStage if !sms.isAvailable =>
val needFullStageRetry = if (sms.shuffleDep.checksumMismatchFullRetryEnabled) {
Copy link
Contributor

@mridulm mridulm Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching up on PR's I missed out on reviewing.

This negatively interacts if there is push based shuffle enabled.
The condition should be sms.shuffleDep.checksumMismatchFullRetryEnabled && !pushBasedShuffleEnabled

+CC @ivoson

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mridulm can you please explain more about the issue with push based shuffle? Thanks.

Copy link
Contributor

@mridulm mridulm Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With push based shuffle enabled - a mappers output would also be pushed to mergers to create a reducer oriented view (all mappers write to a single merger for a given reducer).
If a subset of mapper tasks are now getting reexecuted - the merged output would get impacted as they have already been finalized when the previous attempt completed : causing a disconnect between the mapper output from the new attempt, and merged output from previous attempt.

Essentially, for indeterminate stages, the entire reducer oriented view is unusable - and needs to be recomputed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mridulm to recompute the indeterminate stages, we'll clean up all the shuffle outputs and shuffle merge state for push-based shuffle. Would that resolve your concern regarding to push-based shuffle?

mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
sms.shuffleDep.newShuffleMergeState()

mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
sms.shuffleDep.newShuffleMergeState()

Comment on lines +1572 to +1573
val stagesToRollback = collectSucceedingStages(sms)
abortStageWithInvalidRollBack(stagesToRollback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could have delegated this to abortUnrollbackableStages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mridulm

I am working on another PR to capture more scenarios, can you please also take a look? Thanks

Will do some code refactor in the PR: #53274

val isChecksumMismatched = mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
if (isChecksumMismatched) {
shuffleStage.isChecksumMismatched = isChecksumMismatched
Copy link
Contributor

@mridulm mridulm Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never reset back to false when the stage attempt is retried and succeeds - what am I missing ?
This would mean the app will always fail, right ?

Not sure what I am missing here.
+CC @ivoson , @cloud-fan , @attilapiros

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mridulm , this is not set back to false. Would expect all the succeeding stages do fully retry once there is checksum mismatch happening for the stage, as we don't know the successful tasks consumed which version shuffle output.

This won't fail the app, the impact is that the succeeding stages would have a fully-retry.

The code logic has changed a little bit in PR: #53274

Pls take a look once you get a change. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On retry - when we throw away the entire mapper output and recompute it -> at which point, we can go back to setting it to false ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it's not setting back to false. We'll only recompute once any new shuffle checksum mismatch is detected. Maybe we can remove the flag to avoid the confusion here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants