Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
*
* @author Dave Syer
* @author Mahmoud Ben Hassine
* @author Yejeong Ham
* @since 2.0
*/
@SuppressWarnings("removal")
Expand Down Expand Up @@ -248,6 +249,18 @@ JobExecution startNextInstance(Job job) throws JobRestartException, JobExecution
*/
JobExecution abandon(JobExecution jobExecution) throws JobExecutionAlreadyRunningException;

/**
* Marks the given {@link JobExecution} as {@code FAILED} when it is stuck in a
* {@code STARTED} state due to an abrupt shutdown or failure, in order to make it
* restartable. This operation makes a previously non-restartable execution eligible
* for restart by updating its execution context with the flag {@code recovered=true}.
* @param jobExecution the {@link JobExecution} to recover
* @return the {@link JobExecution} after it has been marked as recovered
* @throws UnexpectedJobExecutionException if the job execution is already complete or
* abandoned
*/
JobExecution recover(JobExecution jobExecution);

/**
* List the {@link JobExecution JobExecutions} associated with a particular
* {@link JobInstance}, in reverse order of creation (and therefore usually of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* {@link #main(String[])} method explains the various operations and exit codes.
*
* @author Mahmoud Ben Hassine
* @author Yejeong Ham
* @since 6.0
*/
public class CommandLineJobOperator {
Expand Down Expand Up @@ -204,6 +205,29 @@ public int abandon(long jobExecutionId) {
}
}

/**
* Recover the job execution with the given ID that is stuck in a {@code STARTED}
* state due to an abrupt shutdown or failure, making it eligible for restart.
* @param jobExecutionId the ID of the job execution to recover
* @return the exit code of the recovered job execution, or JVM_EXITCODE_GENERIC_ERROR
* if an error occurs
*/
public int recover(long jobExecutionId) {
logger.info(() -> "Recovering job execution with ID: " + jobExecutionId);
try {
JobExecution jobExecution = this.jobRepository.getJobExecution(jobExecutionId);
if (jobExecution == null) {
logger.error(() -> "No job execution found with ID: " + jobExecutionId);
return JVM_EXITCODE_GENERIC_ERROR;
}
JobExecution recoveredExecution = this.jobOperator.recover(jobExecution);
return this.exitCodeMapper.intValue(recoveredExecution.getExitStatus().getExitCode());
}
catch (Exception e) {
return JVM_EXITCODE_GENERIC_ERROR;
}
}

/*
* Main method to operate jobs from the command line.
*
Expand Down Expand Up @@ -287,6 +311,10 @@ public static void main(String[] args) {
jobExecutionId = Long.parseLong(args[2]);
exitCode = operator.abandon(jobExecutionId);
break;
case "recover":
jobExecutionId = Long.parseLong(args[2]);
exitCode = operator.recover(jobExecutionId);
break;
default:
System.err.println("Unknown operation: " + operation);
exitCode = JVM_EXITCODE_GENERIC_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* @author Will Schipp
* @author Mahmoud Ben Hassine
* @author Andrey Litvitski
* @author Yejeong Ham
* @since 2.0
* @deprecated since 6.0 in favor of {@link TaskExecutorJobOperator}. Scheduled for
* removal in 6.2 or later.
Expand Down Expand Up @@ -391,6 +392,45 @@ public JobExecution abandon(JobExecution jobExecution) throws JobExecutionAlread
return jobExecution;
}

@Override
public JobExecution recover(JobExecution jobExecution) {
Assert.notNull(jobExecution, "JobExecution must not be null");

if (jobExecution.getExecutionContext().containsKey("recovered")) {
if (logger.isInfoEnabled()) {
logger.info("already recovered job execution: " + jobExecution);
}
throw new UnexpectedJobExecutionException("JobExecution is already recovered");
}

BatchStatus jobStatus = jobExecution.getStatus();
if (jobStatus == BatchStatus.COMPLETED || jobStatus == BatchStatus.ABANDONED) {
throw new UnexpectedJobExecutionException(
"JobExecution is already complete or abandoned and therefore cannot be recovered: " + jobExecution);
}

if (logger.isInfoEnabled()) {
logger.info("Recovering job execution: " + jobExecution);
}

for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
BatchStatus stepStatus = stepExecution.getStatus();
if (stepStatus.isRunning() || stepStatus == BatchStatus.STOPPING) {
stepExecution.setStatus(BatchStatus.FAILED);
stepExecution.setEndTime(LocalDateTime.now());
stepExecution.getExecutionContext().put("recovered", true);
jobRepository.update(stepExecution);
}
}

jobExecution.setStatus(BatchStatus.FAILED);
jobExecution.setEndTime(LocalDateTime.now());
jobExecution.getExecutionContext().put("recovered", true);
jobRepository.update(jobExecution);

return jobExecution;
}

@Override
@Deprecated(since = "6.0", forRemoval = true)
public Set<String> getJobNames() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* @author Lucas Ward
* @author Will Schipp
* @author Mahmoud Ben Hassine
* @author Yejeong Ham
* @since 6.0
*/
@SuppressWarnings("removal")
Expand Down Expand Up @@ -119,4 +120,10 @@ public JobExecution abandon(JobExecution jobExecution) throws JobExecutionAlread
return super.abandon(jobExecution);
}

@Override
public JobExecution recover(JobExecution jobExecution) {
Assert.notNull(jobExecution, "JobExecution must not be null");
return super.recover(jobExecution);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Tests for {@link CommandLineJobOperator}.
*
* @author Mahmoud Ben Hassine
* @author Yejeong Ham
*/
class CommandLineJobOperatorTests {

Expand Down Expand Up @@ -133,4 +134,18 @@ void abandon() throws Exception {
Mockito.verify(jobOperator).abandon(jobExecution);
}

@Test
void recover() {
// given
long jobExecutionId = 1;
JobExecution jobExecution = mock();

// when
Mockito.when(jobRepository.getJobExecution(jobExecutionId)).thenReturn(jobExecution);
this.commandLineJobOperator.recover(jobExecutionId);

// then
Mockito.verify(jobOperator).recover(jobExecution);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.batch.core.job.JobExecution;
import org.springframework.batch.core.job.JobExecutionException;
import org.springframework.batch.core.job.JobInstance;
import org.springframework.batch.core.job.UnexpectedJobExecutionException;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.job.parameters.JobParametersIncrementer;
import org.springframework.batch.core.step.Step;
Expand Down Expand Up @@ -69,7 +70,7 @@
* @author Will Schipp
* @author Mahmoud Ben Hassine
* @author Jinwoo Bae
*
* @author Yejeong Ham
*/
@SuppressWarnings("removal")
class TaskExecutorJobOperatorTests {
Expand Down Expand Up @@ -427,6 +428,52 @@ void testAbortNonStopping() {
assertThrows(JobExecutionAlreadyRunningException.class, () -> jobOperator.abandon(123L));
}

@Test
void testRecover() {
JobInstance jobInstance = new JobInstance(123L, job.getName());
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
jobExecution.setStatus(BatchStatus.STARTED);
jobExecution.createStepExecution("step1").setStatus(BatchStatus.STARTED);
when(jobRepository.getJobExecution(111L)).thenReturn(jobExecution);
when(jobRepository.getLastJobExecution(jobInstance)).thenReturn(jobExecution);
JobExecution recover = jobOperator.recover(jobExecution);
assertEquals(BatchStatus.FAILED, recover.getStatus());
assertTrue(recover.getExecutionContext().containsKey("recovered"));
}

@Test
void testRecoverStepStopping() {
JobInstance jobInstance = new JobInstance(123L, job.getName());
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
jobExecution.setStatus(BatchStatus.STARTED);
jobExecution.createStepExecution("step1").setStatus(BatchStatus.STOPPING);
when(jobRepository.getJobExecution(111L)).thenReturn(jobExecution);
when(jobRepository.getLastJobExecution(jobInstance)).thenReturn(jobExecution);
JobExecution recover = jobOperator.recover(jobExecution);
assertEquals(BatchStatus.FAILED, recover.getStatus());
assertTrue(recover.getExecutionContext().containsKey("recovered"));
}

@Test
void testRecoverJobAbandon() {
JobInstance jobInstance = new JobInstance(123L, job.getName());
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
jobExecution.setStatus(BatchStatus.ABANDONED);
when(jobRepository.getJobExecution(111L)).thenReturn(jobExecution);
when(jobRepository.getLastJobExecution(jobInstance)).thenReturn(jobExecution);
assertThrows(UnexpectedJobExecutionException.class, () -> jobOperator.recover(jobExecution));
}

@Test
void testRecoverJobCompleted() {
JobInstance jobInstance = new JobInstance(123L, job.getName());
JobExecution jobExecution = new JobExecution(jobInstance, 111L, jobParameters);
jobExecution.setStatus(BatchStatus.COMPLETED);
when(jobRepository.getJobExecution(111L)).thenReturn(jobExecution);
when(jobRepository.getLastJobExecution(jobInstance)).thenReturn(jobExecution);
assertThrows(UnexpectedJobExecutionException.class, () -> jobOperator.recover(jobExecution));
}

static class MockJob extends AbstractJob {

private TaskletStep taskletStep;
Expand Down