From 3de1ac55b824cae11b035da2e47699dd34639aed Mon Sep 17 00:00:00 2001 From: "sunyifan.syf" Date: Thu, 11 Sep 2025 20:08:02 +0800 Subject: [PATCH] [Improvement-16994][TaskPlugin] ensure spark application cancelled --- .../AliyunServerlessSparkTask.java | 43 +++++++++++++------ .../AliyunServerlessSparkTaskTest.java | 13 ++++++ 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java index 15f5ea25a23c..2623d67aa4b8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java @@ -163,20 +163,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId); while (!RunState.isFinal(currentState)) { - GetJobRunRequest getJobRunRequest = buildGetJobRunRequest(); - - GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> { - try { - return aliyunServerlessSparkClient - .getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, - getJobRunRequest); - } catch (Exception e) { - throw new AliyunServerlessSparkTaskException("Failed to get job run!", e); - } - }, retryPolicy); - - currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState()); - log.info("job - {} state - {}", jobRunId, currentState); + currentState = RunState.valueOf(pollJobRunStatus()); try { Thread.sleep(10 * 1000L); @@ -227,6 +214,34 @@ public void cancelApplication() throws TaskException { throw new AliyunServerlessSparkTaskException("Failed to cancel job run!"); } }, retryPolicy); + + RunState currentState = RunState.Cancelling; + + while (!RunState.isCancelled(currentState)) { + currentState = RunState.valueOf(pollJobRunStatus()); + + try { + Thread.sleep(10 * 1000L); + } catch (InterruptedException e) { + break; + } + } + } + + protected String pollJobRunStatus() { + GetJobRunRequest getJobRunRequest = buildGetJobRunRequest(); + + GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> { + try { + return aliyunServerlessSparkClient + .getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, + getJobRunRequest); + } catch (Exception e) { + throw new AliyunServerlessSparkTaskException("Failed to get job run!", e); + } + }, retryPolicy); + + return getJobRunResponse.getBody().getJobRun().getState(); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java index 705f451980ab..53eb2bd919d5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java @@ -192,10 +192,23 @@ public void testHandle() { @Test public void testCancelApplication() throws Exception { doReturn(mockCancelJobRunRequest).when(aliyunServerlessSparkTask).buildCancelJobRunRequest(); + doReturn(mockGetJobRunRequest).when(aliyunServerlessSparkTask).buildGetJobRunRequest(); + + GetJobRunResponseBody getJobRunResponseBody = new GetJobRunResponseBody(); + GetJobRunResponseBody.GetJobRunResponseBodyJobRun jobRun = + new GetJobRunResponseBody.GetJobRunResponseBodyJobRun(); + jobRun.setState(RunState.Cancelled.name()); + getJobRunResponseBody.setJobRun(jobRun); + doReturn(getJobRunResponseBody).when(mockGetJobRunResponse).getBody(); + Assertions.assertDoesNotThrow( () -> doReturn(mockCancelJobRunResponse).when(mockAliyunServerlessSparkClient).cancelJobRun(any(), any(), any())); + Assertions.assertDoesNotThrow( + () -> doReturn(mockGetJobRunResponse).when(mockAliyunServerlessSparkClient).getJobRun(any(), + any(), any())); + aliyunServerlessSparkTask.init(); aliyunServerlessSparkTask.cancelApplication(); verify(aliyunServerlessSparkTask).buildCancelJobRunRequest();