[Fix][Zeta] make the job failed when triggering checkpoint fails (apache#10442)#10448
[Fix][Zeta] make the job failed when triggering checkpoint fails (apache#10442)#10448Sephiroth1024 wants to merge 10 commits intoapache:devfrom
Conversation
|
Please enable CI following the instructions. |
Issue 1: InterruptedException Handling Does Not Follow Best PracticesLocation: } catch (InterruptedException e) {
handleCoordinatorError(
"triggering checkpoint barrier has been interrupted",
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
throw new RuntimeException(e); // ⚠️ Problem here
}Problem Description:
Related Context:
Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: } catch (InterruptedException e) {
handleCoordinatorError(
"triggering checkpoint barrier has been interrupted",
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
Thread.currentThread().interrupt(); // Restore interrupted state
throw new RuntimeException(e);
}Rationale:
Issue 2: InterruptedException Branch May Cause Duplicate Error HandlingLocation: } catch (InterruptedException e) {
handleCoordinatorError(...); // First processing
throw new RuntimeException(e); // Throw exception
}Problem Description:
Related Context: // Lines 645-651
completableFuture.whenCompleteAsync(
(completedCheckpoint, error) -> {
if (error != null) {
handleCoordinatorError( // Possibly second call
"trigger checkpoint failed",
error,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);Potential Risks:
Scope of Impact:
Severity: MINOR Improvement Suggestions: } catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Do not call handleCoordinatorError in catch block
// Let exception propagate to whenCompleteAsync for unified handling
throw e;
} catch (Exception e) {
handleCoordinatorError(
"triggering checkpoint barrier failed",
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
return;
}Or: } catch (InterruptedException e) {
handleCoordinatorError(
"triggering checkpoint barrier has been interrupted",
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
Thread.currentThread().interrupt();
return; // Do not throw exception
} catch (Exception e) {
handleCoordinatorError(
"triggering checkpoint barrier failed",
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
return;
}Rationale:
Issue 3: Direct Failure for Transient Network Faults May Be Too AggressiveLocation: } catch (Exception e) {
handleCoordinatorError(
"triggering checkpoint barrier failed",
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
return; // Fail immediately, no retry
}Problem Description:
Direct failure leads to:
Related Context:
Potential Risks:
Scope of Impact:
Severity: MINOR (but may be MAJOR if production environment network is unstable) Improvement Suggestions: Option A: Add retry mechanism (recommended for long-term improvement) // Add in CheckpointConfig
private int checkpointBarrierTriggerRetryTimes = 3; // Retry 3 times by default
// Add retry logic in triggerCheckpoint
public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier checkpointBarrier) {
int retry = 0;
while (retry < coordinatorConfig.getCheckpointBarrierTriggerRetryTimes()) {
try {
return plan.getStartingSubtasks().stream()
.map(taskLocation -> new CheckpointBarrierTriggerOperation(checkpointBarrier, taskLocation))
.map(checkpointManager::sendOperationToMemberNode)
.toArray(InvocationFuture[]::new);
} catch (Exception e) {
retry++;
if (retry >= coordinatorConfig.getCheckpointBarrierTriggerRetryTimes()) {
throw e;
}
LOG.warn("Retry triggering checkpoint barrier, attempt {}", retry);
Thread.sleep(1000 * retry); // Exponential backoff
}
}
}Option B: Configure whether to fail immediately // Add in CheckpointConfig
private boolean failImmediatelyOnBarrierTriggerError = false; // Do not fail immediately by default
// Modify catch block
} catch (Exception e) {
if (coordinatorConfig.isFailImmediatelyOnBarrierTriggerError()) {
handleCoordinatorError(...);
return;
} else {
LOG.error("triggering checkpoint barrier failed, but job will continue", e);
return; // Only return, do not call handleCoordinatorError
}
}Option C: Keep current implementation (most conservative) // No modifications, but document in notes:
// "If checkpoint barrier trigger fails, the job will fail immediately.
// This is to ensure data consistency. Consider increasing checkpoint timeout
// if you encounter transient network issues."Rationale:
Note: This is a larger improvement, recommended for a follow-up PR. The current PR can be merged first to fix the "silent failure" bug. Issue 4: Error Messages Not Specific Enough, Hard to Diagnose ProblemsLocation: handleCoordinatorError(
"triggering checkpoint barrier has been interrupted", // ⚠️ Missing key information
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);Problem Description:
This requires users and operations personnel to view full stack traces and logs when troubleshooting, reducing diagnosability. Related Context:
Potential Risks:
Scope of Impact:
Severity: MINOR Improvement Suggestions: } catch (InterruptedException e) {
handleCoordinatorError(
String.format(
"Triggering checkpoint barrier %s has been interrupted. Pending tasks: %d",
pendingCheckpoint.getInfo(),
plan.getStartingSubtasks().size()),
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (Exception e) {
handleCoordinatorError(
String.format(
"Failed to trigger checkpoint barrier %s. Checkpoint type: %s, Pending tasks: %d",
pendingCheckpoint.getInfo(),
pendingCheckpoint.getCheckpointType(),
plan.getStartingSubtasks().size()),
e,
CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
return;
}Rationale:
Issue 6: Test Code Uses Incorrect jobId, Test Cannot Properly ValidateLocation: long jobId = System.currentTimeMillis(); // Line 33: Define variable
startJob(System.currentTimeMillis(), CONF_PATH); // Line 34: Pass new timestamp!
// ...
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId), // Line 40: Use jobId variable
JobStatus.RUNNING);Problem Description:
Confidence: 100% (this is a clear bug) Related Context:
Potential Risks:
Scope of Impact:
Severity: BLOCKER (test must be fixed before merging) Improvement Suggestions: @Test
public void testCheckpointBarrierTriggerError() throws NoSuchFieldException, IllegalAccessException {
long jobId = System.currentTimeMillis();
startJob(jobId, CONF_PATH); // Fix: Use jobId variable instead of getting timestamp again
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId),
JobStatus.RUNNING));
// ... rest of code unchanged
}Rationale:
Issue 8: Checkpoint Configuration in Test Configuration File May Cause Test InstabilityLocation: checkpoint.interval = 1000 # 1 秒
checkpoint.timeout = 60000 # 60 秒Problem Description:
This may lead to:
Related Context:
Potential Risks:
Scope of Impact:
Severity: MINOR Improvement Suggestions: env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000 # 修改:改为 10 秒,减少触发频率
checkpoint.timeout = 60000
}Rationale:
|
@dybyte thx for reminding me |
+1. There is already an open PR for the retry mechanism here: #10223 |
got it so do i need to close this PR or just keep it open and wait to be reviewed |
The suggestion was to keep this PR open and proceed with it as a bug fix. The retry/tolerance improvement can be handled separately in a follow-up PR. |
ok, got it, ty |
...erver/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrierTriggerErrorTest.java
Outdated
Show resolved
Hide resolved
...ngine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
Outdated
Show resolved
Hide resolved
...ngine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
Outdated
Show resolved
Hide resolved
...ngine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
Show resolved
Hide resolved
|
Could you please retry the CI? |
dybyte
left a comment
There was a problem hiding this comment.
LGTM. Thanks @Sephiroth1024
Purpose of this pull request
Fix #10442
Does this PR introduce any user-facing change?
How was this patch tested?
Add unit test
CheckpointBarrierTriggerErrorTest#testCheckpointBarrierTriggerErrorCheck list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.