Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId);

while (!RunState.isFinal(currentState)) {
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();

GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> {
try {
return aliyunServerlessSparkClient
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
getJobRunRequest);
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to get job run!", e);
}
}, retryPolicy);

currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
log.info("job - {} state - {}", jobRunId, currentState);
currentState = RunState.valueOf(pollJobRunStatus());

try {
Thread.sleep(10 * 1000L);
Expand Down Expand Up @@ -227,6 +214,34 @@ public void cancelApplication() throws TaskException {
throw new AliyunServerlessSparkTaskException("Failed to cancel job run!");
}
}, retryPolicy);

RunState currentState = RunState.Cancelling;

while (!RunState.isCancelled(currentState)) {
currentState = RunState.valueOf(pollJobRunStatus());

try {
Thread.sleep(10 * 1000L);
} catch (InterruptedException e) {
break;
}
}
Comment on lines +217 to +228
Copy link
Member

@ruanwenjun ruanwenjun Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to do this change, since the kill operation is async, and only handle finish, then the worker will return the task state to master. The change might block a thread.

}

protected String pollJobRunStatus() {
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();

GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> {
try {
return aliyunServerlessSparkClient
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
getJobRunRequest);
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to get job run!", e);
}
}, retryPolicy);

return getJobRunResponse.getBody().getJobRun().getState();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,23 @@ public void testHandle() {
@Test
public void testCancelApplication() throws Exception {
doReturn(mockCancelJobRunRequest).when(aliyunServerlessSparkTask).buildCancelJobRunRequest();
doReturn(mockGetJobRunRequest).when(aliyunServerlessSparkTask).buildGetJobRunRequest();

GetJobRunResponseBody getJobRunResponseBody = new GetJobRunResponseBody();
GetJobRunResponseBody.GetJobRunResponseBodyJobRun jobRun =
new GetJobRunResponseBody.GetJobRunResponseBodyJobRun();
jobRun.setState(RunState.Cancelled.name());
getJobRunResponseBody.setJobRun(jobRun);
doReturn(getJobRunResponseBody).when(mockGetJobRunResponse).getBody();

Assertions.assertDoesNotThrow(
() -> doReturn(mockCancelJobRunResponse).when(mockAliyunServerlessSparkClient).cancelJobRun(any(),
any(), any()));

Assertions.assertDoesNotThrow(
() -> doReturn(mockGetJobRunResponse).when(mockAliyunServerlessSparkClient).getJobRun(any(),
any(), any()));

aliyunServerlessSparkTask.init();
aliyunServerlessSparkTask.cancelApplication();
verify(aliyunServerlessSparkTask).buildCancelJobRunRequest();
Expand Down
Loading