Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class Constant {

public static final String IMAP_RUNNING_JOB_METRICS = "engine_runningJobMetrics";

public static final String IMAP_PENDING_PIPELINE_CLEANUP = "engine_pendingPipelineCleanup";

public static final String IMAP_CHECKPOINT_MONITOR = "engine_checkpoint_monitor";

public static final String IMAP_CONNECTOR_JAR_REF_COUNTERS = "engine_connectorJarRefCounters";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation;
import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
import org.apache.seatunnel.engine.server.telemetry.metrics.entity.JobCounter;
import org.apache.seatunnel.engine.server.telemetry.metrics.entity.ThreadPoolStatus;
Expand Down Expand Up @@ -108,6 +110,7 @@
import static org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap;

public class CoordinatorService {
private static final int PIPELINE_CLEANUP_INTERVAL_SECONDS = 60;
private final NodeEngineImpl nodeEngine;
private final ILogger logger;

Expand Down Expand Up @@ -173,6 +176,8 @@ public class CoordinatorService {

private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;

private IMap<PipelineLocation, PipelineCleanupRecord> pendingPipelineCleanupIMap;

/** If this node is a master node */
private volatile boolean isActive = false;

Expand All @@ -182,6 +187,8 @@ public class CoordinatorService {

private final ScheduledExecutorService masterActiveListener;

private final ScheduledExecutorService pipelineCleanupScheduler;

private final EngineConfig engineConfig;

private ConnectorPackageService connectorPackageService;
Expand Down Expand Up @@ -217,6 +224,16 @@ public CoordinatorService(
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
pipelineCleanupScheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("seatunnel-pipeline-cleanup-%d")
.build());
pipelineCleanupScheduler.scheduleAtFixedRate(
this::cleanupPendingPipelines,
PIPELINE_CLEANUP_INTERVAL_SECONDS,
PIPELINE_CLEANUP_INTERVAL_SECONDS,
TimeUnit.SECONDS);
scheduleStrategy = engineConfig.getScheduleStrategy();
isWaitStrategy = scheduleStrategy.equals(ScheduleStrategy.WAIT);
logger.info("Start pending job schedule thread");
Expand Down Expand Up @@ -411,7 +428,8 @@ private void initCoordinatorService() {
ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);

pendingPipelineCleanupIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
jobHistoryService =
new JobHistoryService(
nodeEngine,
Expand Down Expand Up @@ -446,6 +464,168 @@ private void initCoordinatorService() {
this::restoreAllRunningJobFromMasterNodeSwitch, executorService));
}

private void cleanupPendingPipelines() {
if (!isActive) {
return;
}
IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
this.pendingPipelineCleanupIMap;
if (pendingCleanupIMap == null || pendingCleanupIMap.isEmpty()) {
return;
}

try {
for (Map.Entry<PipelineLocation, PipelineCleanupRecord> entry :
pendingCleanupIMap.entrySet()) {
processPendingPipelineCleanup(entry.getKey(), entry.getValue());
}
} catch (HazelcastInstanceNotActiveException e) {
logger.warning(
String.format(
"Skip pending pipeline cleanup: hazelcast not active: %s",
ExceptionUtils.getMessage(e)));
} catch (Throwable t) {
logger.warning(
String.format(
"Unexpected exception in pending pipeline cleanup: %s",
ExceptionUtils.getMessage(t)),
t);
}
}

private void processPendingPipelineCleanup(
PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
if (pipelineLocation == null || record == null) {
return;
}
if (!shouldCleanup(record)) {
removePendingCleanupRecord(pipelineLocation, record);
return;
}

PipelineStatus currentStatus = getPipelineStatusFromIMap(pipelineLocation);
if (currentStatus != null && !currentStatus.isEndState()) {
return;
}

long now = System.currentTimeMillis();
PipelineCleanupRecord updated = copy(record);
updated.setLastAttemptTimeMillis(now);
updated.setAttemptCount(record.getAttemptCount() + 1);

if (!updated.isMetricsImapCleaned() && cleanupPipelineMetrics(pipelineLocation)) {
updated.setMetricsImapCleaned(true);
}

Map<TaskGroupLocation, Address> taskGroups = updated.getTaskGroups();
if (taskGroups != null && !taskGroups.isEmpty()) {
for (Map.Entry<TaskGroupLocation, Address> taskGroup : taskGroups.entrySet()) {
TaskGroupLocation taskGroupLocation = taskGroup.getKey();
if (updated.getCleanedTaskGroups() != null
&& updated.getCleanedTaskGroups().contains(taskGroupLocation)) {
continue;
}
Address workerAddress = taskGroup.getValue();
if (workerAddress == null
|| nodeEngine.getClusterService().getMember(workerAddress) == null) {
continue;
}
try {
NodeEngineUtil.sendOperationToMemberNode(
nodeEngine,
new CleanTaskGroupContextOperation(taskGroupLocation),
workerAddress)
.get();
updated.getCleanedTaskGroups().add(taskGroupLocation);
} catch (HazelcastInstanceNotActiveException e) {
logger.warning(
String.format(
"%s clean TaskGroupContext failed: %s",
taskGroupLocation, ExceptionUtils.getMessage(e)));
} catch (Exception e) {
logger.warning(
String.format(
"%s clean TaskGroupContext failed: %s",
taskGroupLocation, ExceptionUtils.getMessage(e)),
e);
}
}
}

boolean replaced = pendingPipelineCleanupIMap.replace(pipelineLocation, record, updated);
if (!replaced) {
return;
}
if (updated.isCleaned()) {
pendingPipelineCleanupIMap.remove(pipelineLocation, updated);
}
}

private void removePendingCleanupRecord(
PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
try {
pendingPipelineCleanupIMap.remove(pipelineLocation, record);
} catch (Exception e) {
logger.warning(
String.format(
"Remove pending pipeline cleanup record failed: %s",
ExceptionUtils.getMessage(e)),
e);
}
}

private boolean shouldCleanup(PipelineCleanupRecord record) {
if (record == null || record.getFinalStatus() == null) {
return false;
}
if (record.isSavepointEnd()) {
return false;
}
return PipelineStatus.CANCELED.equals(record.getFinalStatus())
|| PipelineStatus.FINISHED.equals(record.getFinalStatus());
}

private PipelineStatus getPipelineStatusFromIMap(PipelineLocation pipelineLocation) {
Object state =
runningJobStateIMap != null ? runningJobStateIMap.get(pipelineLocation) : null;
return state instanceof PipelineStatus ? (PipelineStatus) state : null;
}

private PipelineCleanupRecord copy(PipelineCleanupRecord record) {
Map<TaskGroupLocation, Address> taskGroups =
record.getTaskGroups() == null
? Collections.emptyMap()
: new HashMap<>(record.getTaskGroups());
Set<TaskGroupLocation> cleanedTaskGroups =
record.getCleanedTaskGroups() == null
? new HashSet<>()
: new HashSet<>(record.getCleanedTaskGroups());
return new PipelineCleanupRecord(
record.getPipelineLocation(),
record.getFinalStatus(),
record.isSavepointEnd(),
taskGroups,
cleanedTaskGroups,
record.isMetricsImapCleaned(),
record.getCreateTimeMillis(),
record.getLastAttemptTimeMillis(),
record.getAttemptCount());
}

private boolean cleanupPipelineMetrics(PipelineLocation pipelineLocation) {
try {
seaTunnelServer.removeMetrics(pipelineLocation);
return true;
} catch (Exception e) {
logger.warning(
String.format(
"Failed to remove metrics context for pipeline %s: %s",
pipelineLocation, ExceptionUtils.getMessage(e)),
e);
return false;
}
}

private void restoreAllRunningJobFromMasterNodeSwitch() {
List<Map.Entry<Long, JobInfo>> needRestoreFromMasterNodeSwitchJobs =
runningJobInfoIMap.entrySet().stream()
Expand Down Expand Up @@ -895,6 +1075,9 @@ public void shutdown() {
if (masterActiveListener != null) {
masterActiveListener.shutdownNow();
}
if (pipelineCleanupScheduler != null) {
pipelineCleanupScheduler.shutdownNow();
}
clearCoordinatorService();
}

Expand Down Expand Up @@ -1159,6 +1342,11 @@ protected IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> getMetricsI
return metricsImap;
}

@VisibleForTesting
void runPendingPipelineCleanupOnce() {
cleanupPendingPipelines();
}

@VisibleForTesting
public PeekBlockingQueue<PendingJobInfo> getPendingJobQueue() {
return pendingJobQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ private void subPlanDone(PipelineStatus pipelineStatus) {
try {
RetryUtils.retryWithException(
() -> {
jobMaster.enqueuePipelineCleanupIfNeeded(
getPipelineLocation(), pipelineStatus);
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
try {
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.cleanup.PipelineCleanupRecord;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.resourcemanager.AbstractResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
Expand Down Expand Up @@ -890,6 +891,66 @@ public void savePipelineMetricsToHistory(PipelineLocation pipelineLocation) {
this.cleanTaskGroupContext(pipelineLocation);
}

public void enqueuePipelineCleanupIfNeeded(
PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
if (pipelineLocation == null || pipelineStatus == null) {
return;
}
boolean savepointEnd =
PipelineStatus.FINISHED.equals(pipelineStatus)
&& checkpointManager != null
&& checkpointManager.isPipelineSavePointEnd(pipelineLocation);
boolean shouldCleanup =
PipelineStatus.CANCELED.equals(pipelineStatus)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add cleanup for failed pipelines here?

|| (PipelineStatus.FINISHED.equals(pipelineStatus) && !savepointEnd);
if (!shouldCleanup) {
return;
}

Map<TaskGroupLocation, SlotProfile> slotProfileMap =
ownedSlotProfilesIMap.get(pipelineLocation);
Map<TaskGroupLocation, Address> taskGroups = new HashMap<>();
if (slotProfileMap != null) {
slotProfileMap.forEach(
(taskGroupLocation, slotProfile) ->
taskGroups.put(taskGroupLocation, slotProfile.getWorker()));
}

IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_PENDING_PIPELINE_CLEANUP);
long now = System.currentTimeMillis();
PipelineCleanupRecord newRecord =
new PipelineCleanupRecord(
pipelineLocation,
pipelineStatus,
savepointEnd,
taskGroups,
Collections.emptySet(),
false,
now,
0,
0);

while (true) {
PipelineCleanupRecord existing = pendingCleanupIMap.get(pipelineLocation);
if (existing == null) {
PipelineCleanupRecord prev =
pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
if (prev == null) {
return;
}
existing = prev;
}
PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
if (merged.equals(existing)) {
return;
}
if (pendingCleanupIMap.replace(pipelineLocation, existing, merged)) {
return;
}
}
}

public void removeMetricsContext(
PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
if ((pipelineStatus.equals(PipelineStatus.FINISHED)
Expand Down
Loading