Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/en/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ The timeout for checkpoints. If the checkpoint cannot be completed within the ti

The minimum pause (in milliseconds) between consecutive checkpoints. This ensures that checkpoints are not triggered too frequently.

**tolerable-failed**

The number of consecutive checkpoint failures that can be tolerated before the job fails.

Example

```yaml
Expand All @@ -106,6 +110,7 @@ seatunnel:
interval: 300000
timeout: 10000
min-pause: 5000
tolerable-failed: 3

```

Expand Down
5 changes: 5 additions & 0 deletions docs/en/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ The timeout time of the checkpoint. If the checkpoint cannot be completed within

The minimum pause (in milliseconds) between consecutive checkpoints. This ensures that checkpoints are not triggered too frequently.

**tolerable-failed**

The number of consecutive checkpoint failures that can be tolerated before the job fails.

Example

```yaml
Expand All @@ -147,6 +151,7 @@ seatunnel:
interval: 300000
timeout: 10000
min-pause: 5000
tolerable-failed: 3
```

**checkpoint storage**
Expand Down
5 changes: 5 additions & 0 deletions docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ seatunnel:

连续检查点之间的最小暂停时间(以毫秒为单位),确保检查点不会频繁触发。

**tolerable-failed**

可容忍的连续检查点失败次数,超过该次数后作业将失败。

示例

```yaml
Expand All @@ -106,6 +110,7 @@ seatunnel:
interval: 300000
timeout: 10000
min-pause: 5000
tolerable-failed: 3
```

**checkpoint storage**
Expand Down
6 changes: 6 additions & 0 deletions docs/zh/seatunnel-engine/separated-cluster-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ seatunnel:

连续检查点之间的最小暂停时间(以毫秒为单位),确保检查点不会频繁触发。


**tolerable-failed**

可容忍的连续检查点失败次数,超过该次数后作业将失败。

示例

```yaml
Expand All @@ -150,6 +155,7 @@ seatunnel:
interval: 300000
timeout: 10000
min-pause: 5000
tolerable-failed: 3
```

**checkpoint storage**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ public class EnvCommonOptions {
"The minimum pause (in milliseconds) between consecutive checkpoints. "
+ "This ensures that checkpoints are not triggered too frequently and provides.");

public static final Option<Integer> CHECKPOINT_TOLERABLE_FAILED =
Options.key("tolerable-failed")
.intType()
.defaultValue(0)
.withDescription(
"The number of consecutive checkpoint failures that can be tolerated before the job fails. "
+ "Default is 0, which means any checkpoint failure will cause the job to fail.");

public static Option<SaveModeExecuteLocation> SAVEMODE_EXECUTE_LOCATION =
Options.key("savemode.execute.location")
.enumType(SaveModeExecuteLocation.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public OptionRule optionRule() {
EnvCommonOptions.CHECKPOINT_INTERVAL,
EnvCommonOptions.CHECKPOINT_TIMEOUT,
EnvCommonOptions.CHECKPOINT_MIN_PAUSE,
EnvCommonOptions.CHECKPOINT_TOLERABLE_FAILED,
EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ protected void setCheckpoint() {
} else if (config.hasPath(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key())) {
long minPause = config.getLong(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key());
checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
} else if (config.hasPath(EnvCommonOptions.CHECKPOINT_TOLERABLE_FAILED.key())) {
int tolerableFailedCheckpointsNums =
config.getInt(EnvCommonOptions.CHECKPOINT_TOLERABLE_FAILED.key());
checkpointConfig.setTolerableCheckpointFailureNumber(tolerableFailedCheckpointsNums);
}

if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.CHECKPOINT_MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,15 @@ private CheckpointConfig parseCheckpointConfig(Node checkpointNode) {
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_MIN_PAUSE
.key(),
getTextContent(node)));
} else if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TOLERABLE_FAILED
.key()
.equals(name)) {
checkpointConfig.setCheckpointTolerableFailed(
getIntegerValue(
ServerConfigOptions.MasterServerConfigOptions
.CHECKPOINT_TOLERABLE_FAILED
.key(),
getTextContent(node)));
} else if (ServerConfigOptions.MasterServerConfigOptions
.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class CheckpointConfig implements Serializable {
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
private long checkpointMinPause =
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_MIN_PAUSE.defaultValue();
private int checkpointTolerableFailed =
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TOLERABLE_FAILED
.defaultValue();
private long schemaChangeCheckpointTimeout =
ServerConfigOptions.MasterServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
.defaultValue();
Expand Down Expand Up @@ -67,4 +70,11 @@ public void setSchemaChangeCheckpointTimeout(long checkpointTimeout) {
"The minimum checkpoint timeout is 10 ms.");
this.schemaChangeCheckpointTimeout = checkpointTimeout;
}

public void setCheckpointTolerableFailed(int tolerableFailedCheckpoints) {
checkArgument(
tolerableFailedCheckpoints >= 0,
"The tolerable failed checkpoints must be non-negative.");
this.checkpointTolerableFailed = tolerableFailedCheckpoints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ public static class MasterServerConfigOptions {
"The minimum pause (in milliseconds) between consecutive checkpoints. "
+ "This ensures that checkpoints are not triggered too frequently and provides.");

public static final Option<Integer> CHECKPOINT_TOLERABLE_FAILED =
Options.key("tolerable-failed")
.intType()
.defaultValue(0)
.withDescription(
"The number of consecutive checkpoint failures that can be tolerated before the job fails. "
+ "Default is 0, which means any checkpoint failure will cause the job to fail.");

public static final Option<String> CHECKPOINT_STORAGE_TYPE =
Options.key("type")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class CheckpointCoordinator {

private final CheckpointManager checkpointManager;

private final CheckpointStorage checkpointStorage;
@Getter private CheckpointStorage checkpointStorage;

@Getter private final CheckpointIDCounter checkpointIdCounter;

Expand Down Expand Up @@ -132,6 +132,8 @@ public class CheckpointCoordinator {

private final AtomicInteger pendingCounter = new AtomicInteger(0);

private final AtomicInteger consecutiveFailedCounter = new AtomicInteger(0);

private final AtomicBoolean schemaChanging = new AtomicBoolean(false);

private final Object lock = new Object();
Expand All @@ -153,7 +155,7 @@ public class CheckpointCoordinator {
// processed with one savepoint operation in the same time.
private PendingCheckpoint savepointPendingCheckpoint;

private final String checkpointStateImapKey;
@Getter private final String checkpointStateImapKey;

@SneakyThrows
public CheckpointCoordinator(
Expand Down Expand Up @@ -287,6 +289,25 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) {
if (checkpointCoordinatorFuture.isDone()) {
return;
}

int failedCount = consecutiveFailedCounter.incrementAndGet();
int tolerableFailures = coordinatorConfig.getCheckpointTolerableFailed();

if (tolerableFailures > 0 && failedCount <= tolerableFailures) {
LOG.warn(
"Checkpoint failed (consecutive failures: {}/{}): {}",
failedCount,
tolerableFailures,
ExceptionUtils.getMessage(checkpointException));
cleanFailedCheckpoint(reason);
return;
}
Comment on lines +296 to +304
Copy link
Copy Markdown
Contributor

@dybyte dybyte Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a checkpoint fails during a savepoint operation? Is there a possibility that the job become non-responsive?


LOG.error(
"Checkpoint failures exceeded tolerable limit ({}/{}), failing the job",
failedCount,
tolerableFailures);

updateStatus(CheckpointCoordinatorStatus.FAILED);
checkpointCoordinatorFuture.complete(
new CheckpointCoordinatorState(
Expand Down Expand Up @@ -630,7 +651,8 @@ private PassiveCompletableFuture<CompletedCheckpoint> completableFutureWithError
return new PassiveCompletableFuture<>(future);
}

private void startTriggerPendingCheckpoint(
@VisibleForTesting
protected void startTriggerPendingCheckpoint(
CompletableFuture<PendingCheckpoint> pendingCompletableFuture) {
pendingCompletableFuture.thenAccept(
pendingCheckpoint -> {
Expand Down Expand Up @@ -717,7 +739,8 @@ private void startTriggerPendingCheckpoint(
pendingCounter.incrementAndGet();
}

private CompletableFuture<PendingCheckpoint> createPendingCheckpoint(
@VisibleForTesting
protected CompletableFuture<PendingCheckpoint> createPendingCheckpoint(
long triggerTimestamp, CheckpointType checkpointType) {
synchronized (lock) {
CompletableFuture<Long> idFuture;
Expand Down Expand Up @@ -951,6 +974,13 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed
notifyCompleted(completedCheckpoint);
pendingCheckpoints.remove(checkpointId).abortCheckpointTimeoutFutureWhenIsCompleted();
pendingCounter.decrementAndGet();
int lastestFailedCount = consecutiveFailedCounter.getAndSet(0);
if (lastestFailedCount > 0) {
LOG.info(
"Reset consecutive failed counter from {} to 0 after checkpoint {} completed",
lastestFailedCount,
completedCheckpoint.getCheckpointId());
}

if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
Expand Down Expand Up @@ -1112,8 +1142,32 @@ protected void completeSchemaChangeAfterCheckpoint(CompletedCheckpoint checkpoin
}
}

public String getCheckpointStateImapKey() {
return checkpointStateImapKey;
/**
* Clean only the failed checkpoint(s) without shutting down the coordinator. This is used for
* tolerable checkpoint failures to allow subsequent checkpoints to continue.
*/
protected void cleanFailedCheckpoint(CheckpointCloseReason closedReason) {
synchronized (lock) {
LOG.info("start clean failed checkpoint cause {}", closedReason.message());
if (!pendingCheckpoints.isEmpty()) {
pendingCheckpoints
.values()
.forEach(
pendingCheckpoint ->
pendingCheckpoint.abortCheckpoint(closedReason, null));
pendingCheckpoints.clear();
}
pendingCounter.set(0);
}
}

public int getConsecutiveFailedCounter() {
return consecutiveFailedCounter.get();
}

@VisibleForTesting
public void setCheckpointStorage(CheckpointStorage checkpointStorage) {
this.checkpointStorage = checkpointStorage;
}

/** Only for test */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.engine.server.checkpoint;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
Expand Down Expand Up @@ -183,6 +185,11 @@ public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
return coordinator;
}

@VisibleForTesting
protected void setCheckpointCoordinator(int pipelineId, CheckpointCoordinator coordinator) {
coordinatorMap.put(pipelineId, coordinator);
}

/**
* Called by the {@link Task}. <br>
* used by Task to report the {@link SeaTunnelTaskState} of the state machine.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ private CompletedCheckpoint toCompletedCheckpoint() {

public void abortCheckpoint(CheckpointCloseReason closedReason, @Nullable Throwable cause) {
if (closedReason.equals(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET)
|| closedReason.equals(CheckpointCloseReason.PIPELINE_END)) {
|| closedReason.equals(CheckpointCloseReason.PIPELINE_END)
|| closedReason.equals(CheckpointCloseReason.CHECKPOINT_EXPIRED)) {
completableFuture.complete(null);
} else {
this.failureCause = new CheckpointException(closedReason, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ private CheckpointConfig createJobCheckpointConfig(
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
jobCheckpointConfig.setCheckpointMinPause(defaultCheckpointConfig.getCheckpointMinPause());
jobCheckpointConfig.setCheckpointTolerableFailed(
defaultCheckpointConfig.getCheckpointTolerableFailed());

CheckpointStorageConfig jobCheckpointStorageConfig = new CheckpointStorageConfig();
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
Expand Down Expand Up @@ -363,6 +365,12 @@ private CheckpointConfig createJobCheckpointConfig(
Long.parseLong(
jobEnv.get(EnvCommonOptions.CHECKPOINT_MIN_PAUSE.key()).toString()));
}
if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TOLERABLE_FAILED.key())) {
jobCheckpointConfig.setCheckpointTolerableFailed(
Integer.parseInt(
jobEnv.get(EnvCommonOptions.CHECKPOINT_TOLERABLE_FAILED.key())
.toString()));
}
return jobCheckpointConfig;
}

Expand Down
Loading