Skip to content

Commit 5d4691c

Browse files
committed
Merge branch 'dev' of https://github.com/apache/dolphinscheduler into dev-datavines
2 parents 603b667 + 4416548 commit 5d4691c

File tree

8 files changed

+271
-12
lines changed

8 files changed

+271
-12
lines changed

dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public abstract class AbstractDelayEvent implements IEvent, Delayed {
3939
@Builder.Default
4040
protected long createTimeInNano = System.nanoTime();
4141

42+
// set create time as default if the inheritor didn't call super()
43+
@Builder.Default
44+
protected long expiredTimeInNano = System.nanoTime();
45+
4246
public AbstractDelayEvent() {
4347
this(DEFAULT_DELAY_TIME);
4448
}
@@ -50,6 +54,7 @@ public AbstractDelayEvent(final long delayTime) {
5054
public AbstractDelayEvent(final long delayTime, final long createTimeInNano) {
5155
this.delayTime = delayTime;
5256
this.createTimeInNano = createTimeInNano;
57+
this.expiredTimeInNano = this.delayTime * 1_000_000 + this.createTimeInNano;
5358
}
5459

5560
@Override
@@ -60,7 +65,7 @@ public long getDelay(TimeUnit unit) {
6065

6166
@Override
6267
public int compareTo(Delayed other) {
63-
return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano);
68+
return Long.compare(this.expiredTimeInNano, ((AbstractDelayEvent) other).expiredTimeInNano);
6469
}
6570

6671
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/IWorkflowExecutionGraph.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ public interface IWorkflowExecutionGraph {
191191
*/
192192
boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable taskExecutionRunnable);
193193

194+
/**
195+
* Whether the given task's execution is failure and waiting for retry.
196+
*/
197+
boolean isTaskExecutionRunnableRetrying(final ITaskExecutionRunnable taskExecutionRunnable);
198+
194199
/**
195200
* Whether all predecessors task is skipped.
196201
* <p> Once all predecessors are marked as skipped, then the task will be marked as skipped, and will trigger its successors.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public ITaskExecutionRunnable getTaskExecutionRunnableByTaskCode(final Long task
140140

141141
@Override
142142
public boolean isTaskExecutionRunnableActive(final ITaskExecutionRunnable taskExecutionRunnable) {
143-
return activeTaskExecutionRunnable.add(taskExecutionRunnable.getName());
143+
return activeTaskExecutionRunnable.contains(taskExecutionRunnable.getName());
144144
}
145145

146146
@Override
@@ -256,6 +256,16 @@ public boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable tas
256256
return (taskExecutionRunnable.getTaskDefinition().getFlag() == Flag.NO);
257257
}
258258

259+
@Override
260+
public boolean isTaskExecutionRunnableRetrying(final ITaskExecutionRunnable taskExecutionRunnable) {
261+
if (!taskExecutionRunnable.isTaskInstanceInitialized()) {
262+
return false;
263+
}
264+
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
265+
return taskInstance.getState() == TaskExecutionStatus.FAILURE && taskExecutionRunnable.isTaskInstanceCanRetry()
266+
&& isTaskExecutionRunnableActive(taskExecutionRunnable);
267+
}
268+
259269
/**
260270
* Whether all predecessors are skipped.
261271
* <p> Only when all predecessors are skipped, will return true. If the given task doesn't have any predecessors, will return false.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ public void pauseEventAction(final IWorkflowExecutionRunnable workflowExecutionR
104104
final ITaskExecutionRunnable taskExecutionRunnable,
105105
final TaskPauseLifecycleEvent taskPauseEvent) {
106106
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
107+
// When the failed task is awaiting retry, we can mark it as 'paused' to ignore the retry event.
108+
if (isTaskRetrying(taskExecutionRunnable)) {
109+
super.pausedEventAction(workflowExecutionRunnable, taskExecutionRunnable,
110+
TaskPausedLifecycleEvent.of(taskExecutionRunnable));
111+
return;
112+
}
107113
logWarningIfCannotDoAction(taskExecutionRunnable, taskPauseEvent);
108114
}
109115

@@ -112,14 +118,11 @@ public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecution
112118
final ITaskExecutionRunnable taskExecutionRunnable,
113119
final TaskPausedLifecycleEvent taskPausedEvent) {
114120
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
115-
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
116121
// This case happen when the task is failure but the task is in delay retry queue.
117122
// We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is
118123
// killed.
119-
if (taskExecutionRunnable.isTaskInstanceCanRetry()
120-
&& workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
121-
workflowExecutionGraph.markTaskExecutionRunnableChainPause(taskExecutionRunnable);
122-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
124+
if (isTaskRetrying(taskExecutionRunnable)) {
125+
super.pausedEventAction(workflowExecutionRunnable, taskExecutionRunnable, taskPausedEvent);
123126
return;
124127
}
125128
logWarningIfCannotDoAction(taskExecutionRunnable, taskPausedEvent);
@@ -130,6 +133,12 @@ public void killEventAction(final IWorkflowExecutionRunnable workflowExecutionRu
130133
final ITaskExecutionRunnable taskExecutionRunnable,
131134
final TaskKillLifecycleEvent taskKillEvent) {
132135
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
136+
// When the failed task is awaiting retry, we can mark it as 'killed' to ignore the retry event.
137+
if (isTaskRetrying(taskExecutionRunnable)) {
138+
super.killedEventAction(workflowExecutionRunnable, taskExecutionRunnable,
139+
TaskKilledLifecycleEvent.of(taskExecutionRunnable));
140+
return;
141+
}
133142
logWarningIfCannotDoAction(taskExecutionRunnable, taskKillEvent);
134143
}
135144

@@ -138,14 +147,11 @@ public void killedEventAction(final IWorkflowExecutionRunnable workflowExecution
138147
final ITaskExecutionRunnable taskExecutionRunnable,
139148
final TaskKilledLifecycleEvent taskKilledEvent) {
140149
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
141-
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
142150
// This case happen when the task is failure but the task is in delay retry queue.
143151
// We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is
144152
// killed.
145-
if (taskExecutionRunnable.isTaskInstanceCanRetry()
146-
&& workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
147-
workflowExecutionGraph.markTaskExecutionRunnableChainKill(taskExecutionRunnable);
148-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
153+
if (isTaskRetrying(taskExecutionRunnable)) {
154+
super.killedEventAction(workflowExecutionRunnable, taskExecutionRunnable, taskKilledEvent);
149155
return;
150156
}
151157
logWarningIfCannotDoAction(taskExecutionRunnable, taskKilledEvent);
@@ -179,4 +185,9 @@ public void failoverEventAction(final IWorkflowExecutionRunnable workflowExecuti
179185
public TaskExecutionStatus matchState() {
180186
return TaskExecutionStatus.FAILURE;
181187
}
188+
189+
private boolean isTaskRetrying(final ITaskExecutionRunnable taskExecutionRunnable) {
190+
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
191+
return workflowExecutionGraph.isTaskExecutionRunnableRetrying(taskExecutionRunnable);
192+
}
182193
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstancePauseTestCase.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,54 @@ public void testPauseWorkflow_with_subWorkflowTask_success() {
294294
masterContainer.assertAllResourceReleased();
295295
}
296296

297+
@Test
298+
@DisplayName("Test pause a workflow with failed retrying task")
299+
public void testPauseWorkflow_with_failedRetryingTask() {
300+
final String yaml = "/it/pause/workflow_with_fake_task_failed_retrying.yaml";
301+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
302+
final WorkflowDefinition workflow = context.getOneWorkflow();
303+
304+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
305+
.workflowDefinition(workflow)
306+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
307+
.build();
308+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
309+
310+
await()
311+
.pollInterval(Duration.ofMillis(100))
312+
.atMost(Duration.ofMinutes(1))
313+
.untilAsserted(() -> {
314+
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
315+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
316+
317+
assertThat(repository.queryTaskInstance(workflowInstanceId))
318+
.satisfiesExactly(
319+
taskInstance -> {
320+
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
321+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
322+
});
323+
});
324+
325+
assertThat(workflowOperator.pauseWorkflowInstance(workflowInstanceId).isSuccess());
326+
327+
await()
328+
.pollInterval(Duration.ofMillis(100))
329+
.atMost(Duration.ofMinutes(1))
330+
.untilAsserted(() -> {
331+
assertThat(repository.queryWorkflowInstance(workflowInstanceId))
332+
.satisfies(
333+
workflowInstance -> {
334+
assertThat(workflowInstance.getState())
335+
.isEqualTo(WorkflowExecutionStatus.PAUSE);
336+
});
337+
338+
assertThat(repository.queryTaskInstance(workflowInstanceId))
339+
.satisfiesExactly(
340+
taskInstance -> {
341+
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
342+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.PAUSE);
343+
});
344+
});
345+
masterContainer.assertAllResourceReleased();
346+
}
297347
}

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowInstanceStopTestCase.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,4 +246,54 @@ public void testStopWorkflow_with_subWorkflowTask_success() {
246246

247247
masterContainer.assertAllResourceReleased();
248248
}
249+
250+
@Test
251+
@DisplayName("Test stop a workflow with failed retrying task")
252+
public void testStopWorkflow_with_failedRetryingTask() {
253+
final String yaml = "/it/stop/workflow_with_fake_task_failed_retrying.yaml";
254+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
255+
final WorkflowDefinition workflow = context.getOneWorkflow();
256+
257+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
258+
.workflowDefinition(workflow)
259+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
260+
.build();
261+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
262+
263+
await()
264+
.pollInterval(Duration.ofMillis(100))
265+
.atMost(Duration.ofMinutes(1))
266+
.untilAsserted(() -> {
267+
assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
268+
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
269+
270+
assertThat(repository.queryTaskInstance(workflowInstanceId))
271+
.satisfiesExactly(
272+
taskInstance -> {
273+
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
274+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
275+
});
276+
});
277+
278+
assertThat(workflowOperator.stopWorkflowInstance(workflowInstanceId).isSuccess());
279+
280+
await()
281+
.pollInterval(Duration.ofMillis(100))
282+
.atMost(Duration.ofMinutes(1))
283+
.untilAsserted(() -> {
284+
assertThat(repository.queryWorkflowInstance(workflowInstanceId))
285+
.satisfies(
286+
workflowInstance -> {
287+
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.STOP);
288+
});
289+
290+
assertThat(repository.queryTaskInstance(workflowInstanceId))
291+
.satisfiesExactly(
292+
taskInstance -> {
293+
assertThat(taskInstance.getName()).isEqualTo("FAILED-RETRY");
294+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL);
295+
});
296+
});
297+
masterContainer.assertAllResourceReleased();
298+
}
249299
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2021-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_one_fake_task_failed
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a fake workflow with single task
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2021-08-12 00:00:00
36+
userId: 1
37+
executionType: PARALLEL
38+
39+
tasks:
40+
- name: FAILED-RETRY
41+
code: 1
42+
version: 1
43+
projectCode: 1
44+
userId: 1
45+
taskType: LogicFakeTask
46+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}'
47+
workerGroup: default
48+
createTime: 2024-08-12 00:00:00
49+
updateTime: 2021-08-12 00:00:00
50+
taskExecuteType: BATCH
51+
failRetryTimes: 10
52+
failRetryInterval: 10
53+
54+
taskRelations:
55+
- projectCode: 1
56+
workflowDefinitionCode: 1
57+
workflowDefinitionVersion: 1
58+
preTaskCode: 0
59+
preTaskVersion: 0
60+
postTaskCode: 1
61+
postTaskVersion: 1
62+
createTime: 2024-08-12 00:00:00
63+
updateTime: 2024-08-12 00:00:00
64+
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
project:
19+
name: MasterIntegrationTest
20+
code: 1
21+
description: This is a fake project
22+
userId: 1
23+
userName: admin
24+
createTime: 2024-08-12 00:00:00
25+
updateTime: 2021-08-12 00:00:00
26+
27+
workflows:
28+
- name: workflow_with_one_fake_task_failed
29+
code: 1
30+
version: 1
31+
projectCode: 1
32+
description: This is a fake workflow with single task
33+
releaseState: ONLINE
34+
createTime: 2024-08-12 00:00:00
35+
updateTime: 2021-08-12 00:00:00
36+
userId: 1
37+
executionType: PARALLEL
38+
39+
tasks:
40+
- name: FAILED-RETRY
41+
code: 1
42+
version: 1
43+
projectCode: 1
44+
userId: 1
45+
taskType: LogicFakeTask
46+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"ls /-"}'
47+
workerGroup: default
48+
createTime: 2024-08-12 00:00:00
49+
updateTime: 2021-08-12 00:00:00
50+
taskExecuteType: BATCH
51+
failRetryTimes: 10
52+
failRetryInterval: 10
53+
54+
taskRelations:
55+
- projectCode: 1
56+
workflowDefinitionCode: 1
57+
workflowDefinitionVersion: 1
58+
preTaskCode: 0
59+
preTaskVersion: 0
60+
postTaskCode: 1
61+
postTaskVersion: 1
62+
createTime: 2024-08-12 00:00:00
63+
updateTime: 2024-08-12 00:00:00
64+

0 commit comments

Comments
 (0)