Skip to content

Commit 8f4b930

Browse files
authored
fix(taskframework): running task be canceled incorrect due to heartbeat timeout (#2763)
* add stop and failed * format code * rename method * fix for comment
1 parent 7003a3e commit 8f4b930

4 files changed

Lines changed: 24 additions & 3 deletions

File tree

server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/server/TaskMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void monitor() {
6969
this.reportScheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
7070
reportScheduledExecutor.scheduleAtFixedRate(() -> {
7171
if (isTimeout() && !getTask().getStatus().isTerminated()) {
72-
getTask().stop();
72+
getTask().abort();
7373
}
7474
try {
7575
if (JobUtils.getExecutorPort().isPresent()) {

server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/BaseTask.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,21 @@ public void start(JobContext context) {
7171

7272
@Override
7373
public boolean stop() {
74+
return stop(JobStatus.CANCELED);
75+
}
76+
77+
@Override
78+
public boolean abort() {
79+
return stop(JobStatus.FAILED);
80+
}
81+
82+
private synchronized boolean stop(JobStatus status) {
7483
try {
7584
if (getStatus().isTerminated()) {
76-
log.warn("Task is already finished and cannot be canceled, id={}, status={}.", getJobId(), getStatus());
85+
log.warn("Task is already finished and cannot be stopped, id={}, status={}.", getJobId(), getStatus());
7786
} else {
7887
doStop();
79-
updateStatus(JobStatus.CANCELED);
88+
updateStatus(status);
8089
}
8190
return true;
8291
} catch (Throwable e) {

server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/Task.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.oceanbase.odc.service.task.caller.JobContext;
2222
import com.oceanbase.odc.service.task.enums.JobStatus;
2323
import com.oceanbase.odc.service.task.executor.server.TaskExecutor;
24+
import com.oceanbase.odc.service.task.executor.server.TaskMonitor;
2425

2526
/**
2627
* Task interface. Each task should implement this interface
@@ -40,6 +41,13 @@ public interface Task<RESULT> {
4041
*/
4142
boolean stop();
4243

44+
45+
/**
46+
* Stop current task and set status to failed. This method will be called by {@link TaskMonitor} for
47+
* stop a timeout task
48+
*/
49+
boolean abort();
50+
4351
/**
4452
* Modify current task parameters
4553
*/

server/odc-service/src/main/java/com/oceanbase/odc/service/task/service/StdTaskFrameworkService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ public void handleResult(TaskResult taskResult) {
305305
log.warn("Job identity is null");
306306
return;
307307
}
308+
if (taskResult.getStatus() == JobStatus.CANCELED) {
309+
log.warn("Job is canceled by odc server, this result is ignored.");
310+
return;
311+
}
308312
JobEntity je = find(taskResult.getJobIdentity().getId());
309313
if (je == null) {
310314
log.warn("Job identity is not exists by id {}", taskResult.getJobIdentity().getId());

0 commit comments

Comments
 (0)