Conversation
Issue 1: Missing Maximum Retry LimitLocation:
Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: // CoordinatorService.java
private static final int MAX_CLEANUP_ATTEMPTS = 10; // Maximum 10 attempts (10 minutes)
private void processPendingPipelineCleanup(
PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
// ... existing logic ...
// Add retry count check at the end of the method
if (updated.getAttemptCount() >= MAX_CLEANUP_ATTEMPTS && !updated.isCleaned()) {
logger.severe(
String.format(
"Pipeline %s cleanup failed after %d attempts. Giving up. " +
"Metrics cleaned: %s, TaskGroups cleaned: %s/%s",
pipelineLocation,
updated.getAttemptCount(),
updated.isMetricsImapCleaned(),
updated.getCleanedTaskGroups().size(),
updated.getTaskGroups() != null ? updated.getTaskGroups().size() : 0));
// Delete record even if giving up, to avoid infinite accumulation
pendingPipelineCleanupIMap.remove(pipelineLocation, record);
return;
}
// ... existing logic ...
}Rationale:
Issue 2: Missing Record Expiration Time (TTL)Location:
Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: // Option 1: Set TTL in IMap configuration (recommended)
// Constant.java
public static final String IMAP_PENDING_PIPELINE_CLEANUP = "engine_pendingPipelineCleanup";
// In Hazelcast configuration (requires documentation or setting in initialization code):
config.getMapConfig("engine_pendingPipelineCleanup")
.setMaxIdleSeconds(86400); // Expires after 24 hours of inactivity
// Option 2: Check expiration time in code
private static final long CLEANUP_RECORD_TTL_MILLIS = TimeUnit.HOURS.toMillis(24); // 24 hours
private void processPendingPipelineCleanup(
PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
long now = System.currentTimeMillis();
// Check if the record has expired
if (now - record.getCreateTimeMillis() > CLEANUP_RECORD_TTL_MILLIS) {
logger.warning(
String.format(
"Pipeline cleanup record %s expired after %d ms. Removing.",
pipelineLocation,
now - record.getCreateTimeMillis()));
pendingPipelineCleanupIMap.remove(pipelineLocation, record);
return;
}
// ... existing logic ...
}Rationale:
Issue 3: PipelineCleanupRecord's isCleaned() Method Has NPE RiskLocation: public boolean isCleaned() {
return metricsImapCleaned
&& taskGroups != null
&& cleanedTaskGroups != null
&& cleanedTaskGroups.containsAll(taskGroups.keySet());
}Related Context:
Problem Description: // PipelineCleanupRecord.java:104-114
int taskGroupsSize = in.readInt();
if (taskGroupsSize >= 0) {
taskGroups = new HashMap<>(taskGroupsSize);
// ...
} else {
taskGroups = null; // May be null
}Although the current code only writes -1 when the collection is null in Potential Risks:
Impact Scope:
Severity: MINOR (current code will not trigger, but defensive programming suggests fixing) Improvement Suggestions: public boolean isCleaned() {
if (!metricsImapCleaned) {
return false;
}
if (taskGroups == null || taskGroups.isEmpty()) {
// If there are no taskGroups, only check if metrics are cleaned up
return metricsImapCleaned;
}
if (cleanedTaskGroups == null) {
return false;
}
return cleanedTaskGroups.containsAll(taskGroups.keySet());
}Or a more concise version: public boolean isCleaned() {
return metricsImapCleaned
&& (taskGroups == null || taskGroups.isEmpty()
|| (cleanedTaskGroups != null
&& cleanedTaskGroups.containsAll(taskGroups.keySet())));
}Rationale:
Issue 4: shouldCleanup Logic DuplicationLocation:
Problem Description: // JobMaster.java:902-904
boolean shouldCleanup =
PipelineStatus.CANCELED.equals(pipelineStatus)
|| (PipelineStatus.FINISHED.equals(pipelineStatus) && !savepointEnd);
// CoordinatorService.java:589-595
private boolean shouldCleanup(PipelineCleanupRecord record) {
if (record == null || record.getFinalStatus() == null) {
return false;
}
if (record.isSavepointEnd()) {
return false;
}
return PipelineStatus.CANCELED.equals(record.getFinalStatus())
|| PipelineStatus.FINISHED.equals(record.getFinalStatus());
}This violates the DRY (Don't Repeat Yourself) principle. If cleanup conditions need to be adjusted in the future, both places must be modified simultaneously. Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // Add static utility method in PipelineCleanupRecord
public static boolean shouldCleanup(
PipelineStatus finalStatus,
boolean isSavepointEnd) {
if (finalStatus == null) {
return false;
}
if (isSavepointEnd) {
return false;
}
return PipelineStatus.CANCELED.equals(finalStatus)
|| PipelineStatus.FINISHED.equals(finalStatus);
}
// Add instance method in PipelineCleanupRecord
public boolean shouldCleanup() {
return shouldCleanup(this.finalStatus, this.savepointEnd);
}
// JobMaster.java usage
boolean shouldCleanup = PipelineCleanupRecord.shouldCleanup(
pipelineStatus,
savepointEnd);
// CoordinatorService.java usage
private boolean shouldCleanup(PipelineCleanupRecord record) {
return record != null && record.shouldCleanup();
}Rationale:
Issue 5: Cleanup Interval HardcodedLocation: private static final int PIPELINE_CLEANUP_INTERVAL_SECONDS = 60;Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // Option 1: Read from configuration file
private final int pipelineCleanupIntervalSeconds;
public CoordinatorService(
@NonNull NodeEngineImpl nodeEngine,
@NonNull SeaTunnelServer seaTunnelServer,
EngineConfig engineConfig) {
// ...
this.pipelineCleanupIntervalSeconds =
engineConfig.getCoordinatorServiceConfig()
.getPipelineCleanupIntervalSeconds() != null
? engineConfig.getCoordinatorServiceConfig().getPipelineCleanupIntervalSeconds()
: 60; // Default 60 seconds
pipelineCleanupScheduler.scheduleAtFixedRate(
this::cleanupPendingPipelines,
this.pipelineCleanupIntervalSeconds,
this.pipelineCleanupIntervalSeconds,
TimeUnit.SECONDS);
}
// Option 2: Use dynamic interval (exponential backoff)
// Calculate next cleanup time in PipelineCleanupRecord based on attemptCountRationale:
Issue 6: FAILED Status Pipelines Not Cleaned UpLocation:
Problem Description: boolean shouldCleanup =
PipelineStatus.CANCELED.equals(pipelineStatus)
|| (PipelineStatus.FINISHED.equals(pipelineStatus) && !savepointEnd);But from the definition of // PipelineStatus.java:76-78
public boolean isEndState() {
return this == FINISHED || this == CANCELED || this == FAILED;
}This means FAILED Pipelines will not be added to the cleanup queue, and their resources (metrics and TaskGroupContext) may not be cleaned up. Related Context:
Potential Risks:
Impact Scope:
Severity: CRITICAL (this is a serious omission) Improvement Suggestions: // JobMaster.java:902-904
boolean shouldCleanup =
PipelineStatus.CANCELED.equals(pipelineStatus)
|| PipelineStatus.FAILED.equals(pipelineStatus)
|| (PipelineStatus.FINISHED.equals(pipelineStatus) && !savepointEnd);
// Or use isEndState() but exclude savepoint
boolean shouldCleanup = pipelineStatus.isEndState() && !savepointEnd;
// CoordinatorService.java:589-595
private boolean shouldCleanup(PipelineCleanupRecord record) {
if (record == null || record.getFinalStatus() == null) {
return false;
}
if (record.isSavepointEnd()) {
return false;
}
// Modify to support all end states
return record.getFinalStatus().isEndState();
}Rationale:
Issue 7: Missing JavaDoc DocumentationLocation: @Data
@NoArgsConstructor
@AllArgsConstructor
public class PipelineCleanupRecord implements IdentifiedDataSerializable {
// No class-level JavaDoc
}Problem Description: Impact Scope:
Severity: MINOR Improvement Suggestions: /**
* Record tracking the cleanup state of a finished pipeline.
*
* <p>This record is persisted in Hazelcast IMap (IMAP_PENDING_PIPELINE_CLEANUP)
* and used by the background cleanup task in {@link CoordinatorService}
* to asynchronously release resources when the synchronous cleanup fails.
*
* <p><b>Resources tracked:</b>
* <ul>
* <li>{@link #metricsImapCleaned} - Metrics in {@code IMAP_RUNNING_JOB_METRICS}</li>
* <li>{@link #taskGroups} - TaskGroups with their worker addresses</li>
* <li>{@link #cleanedTaskGroups} - TaskGroups whose contexts have been cleaned</li>
* </ul>
*
* <p><b>Cleanup conditions:</b>
* <ul>
* <li>CANCELED pipelines are always cleaned</li>
* <li>FINISHED pipelines are cleaned unless they ended with savepoint</li>
* <li>FAILED pipelines are cleaned (note: original code may have missed this)</li>
* </ul>
*
* <p><b>Lifecycle:</b>
* <ol>
* <li>Created by {@link JobMaster#enqueuePipelineCleanupIfNeeded}</li>
* <li>Updated by {@link CoordinatorService#processPendingPipelineCleanup}</li>
* <li>Removed when {@link #isCleaned()} returns true</li>
* </ol>
*
* @see PipelineLocation
* @see PipelineStatus
* @see org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PipelineCleanupRecord implements IdentifiedDataSerializable {
// ...
}Rationale:
|
| && checkpointManager != null | ||
| && checkpointManager.isPipelineSavePointEnd(pipelineLocation); | ||
| boolean shouldCleanup = | ||
| PipelineStatus.CANCELED.equals(pipelineStatus) |
There was a problem hiding this comment.
Should we add cleanup for failed pipelines here?
|
Good job Issue: Missing Retry Limit - Could Cause IMAP Memory LeakSeverity: High Problem Description:
Impact:
Risk Scenario: Recommended Fix: // Add this constant to CoordinatorService
private static final int MAX_CLEANUP_ATTEMPTS = 100; // ~100 hours of retries at 60s intervals
private void processPendingPipelineCleanup(
PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
// Add this check at the beginning
if (record.getAttemptCount() > MAX_CLEANUP_ATTEMPTS) {
logger.warning(String.format(
"Pipeline %s cleanup exceeded max attempts (%d), giving up and removing record. " +
"Metrics cleaned: %s, TaskGroups cleaned: %d/%d",
pipelineLocation, MAX_CLEANUP_ATTEMPTS,
record.isMetricsImapCleaned(),
record.getCleanedTaskGroups() != null ? record.getCleanedTaskGroups().size() : 0,
record.getTaskGroups() != null ? record.getTaskGroups().size() : 0));
removePendingCleanupRecord(pipelineLocation, record);
return;
}
// Continue with existing cleanup logic...
}Issue : Concurrent IMAP Traversal - Risk of ConcurrentModificationExceptionSeverity: High Problem Description:
Current Code: for (Map.Entry<PipelineLocation, PipelineCleanupRecord> entry :
pendingCleanupIMap.entrySet()) { // ← Unsafe: direct iteration
processPendingPipelineCleanup(entry.getKey(), entry.getValue());
}Impact:
Risk Scenario: Recommended Fix: private void cleanupPendingPipelines() {
if (!isActive) {
return;
}
IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
this.pendingPipelineCleanupIMap;
if (pendingCleanupIMap == null || pendingCleanupIMap.isEmpty()) {
return;
}
// Copy the key set first to avoid concurrent modification
Set<PipelineLocation> keys;
try {
keys = new HashSet<>(pendingCleanupIMap.keySet());
} catch (Exception e) {
logger.warning("Failed to get pending cleanup keys: " + e.getMessage());
return;
}
// Now iterate over the copied key set
for (PipelineLocation key : keys) {
try {
PipelineCleanupRecord record = pendingCleanupIMap.get(key);
if (record != null) {
processPendingPipelineCleanup(key, record);
}
} catch (HazelcastInstanceNotActiveException e) {
logger.warning("Skip cleanup: hazelcast not active");
break;
} catch (Throwable t) {
logger.warning(String.format(
"Failed to cleanup pipeline %s: %s", key, t.getMessage()), t);
// Continue with next pipeline instead of crashing
}
}
}Issue : Infinite Loop - Risk of Thread Starvation and CPU SpikeSeverity: High Problem Description:
Current Code: while (true) { // ← No exit condition!
PipelineCleanupRecord existing = pendingCleanupIMap.get(pipelineLocation);
if (existing == null) {
PipelineCleanupRecord prev = pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
if (prev == null) return;
existing = prev;
}
PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
if (merged.equals(existing)) return;
if (pendingCleanupIMap.replace(pipelineLocation, existing, merged)) {
return;
}
// ← If replace fails, loop continues indefinitely
}Impact:
Risk Scenario: Recommended Fix: // Add this constant to JobMaster
private static final int MAX_ENQUEUE_RETRIES = 100;
public void enqueuePipelineCleanupIfNeeded(
PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
// ... existing validation logic ...
int retryCount = 0;
while (retryCount < MAX_ENQUEUE_RETRIES) {
PipelineCleanupRecord existing = pendingCleanupIMap.get(pipelineLocation);
if (existing == null) {
PipelineCleanupRecord prev = pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
if (prev == null) {
return; // Success
}
existing = prev;
}
PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
if (merged.equals(existing)) {
return; // No changes needed
}
if (pendingCleanupIMap.replace(pipelineLocation, existing, merged)) {
return; // Success
}
retryCount++;
// Optional: Add backoff to reduce contention
if (retryCount % 10 == 0) {
try {
Thread.sleep(1); // 1ms backoff every 10 retries
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warning("Enqueue cleanup interrupted for pipeline: " + pipelineLocation);
return;
}
}
}
// Failed after max retries - log error but don't throw exception
logger.error(String.format(
"Failed to enqueue pipeline cleanup for %s after %d retries due to high contention. " +
"Cleanup may be delayed until next scheduled run.",
pipelineLocation, MAX_ENQUEUE_RETRIES));
} |
Issue 1: The maximum number of retries must not be set, as it can easily lead to data loss and program exceptions. Deletion must be performed through normal logic. Issue 2:It will not throw a ConcurrentModificationException. The result returned by IMap.entrySet is a cloned one, not distributed data. Issue3:What is actually triggered is at the end of SubPlan. It is not a frequent call. For more stability, I will add a sleep instead of setting a maximum number of times. |

Purpose of this pull request
In extreme cases, it can cause resources in IMAP not to be released. For example, network communication is unstable
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.