Skip to content

Commit 040bd81

Browse files
committed
[FLINK-22266] Fix stop-with-savepoint operation in AdaptiveScheduler
This closes #15884
1 parent d1dd346 commit 040bd81

File tree

6 files changed

+169
-55
lines changed

6 files changed

+169
-55
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer {
5959
super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
6060
this.context = context;
6161
this.userCodeClassLoader = userCodeClassLoader;
62+
Preconditions.checkState(
63+
executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");
6264

6365
deploy();
6466

@@ -129,7 +131,10 @@ private void deploy() {
129131
for (ExecutionJobVertex executionJobVertex :
130132
getExecutionGraph().getVerticesTopologically()) {
131133
for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
132-
deploySafely(executionVertex);
134+
if (executionVertex.getExecutionState() == ExecutionState.CREATED
135+
|| executionVertex.getExecutionState() == ExecutionState.SCHEDULED) {
136+
deploySafely(executionVertex);
137+
}
133138
}
134139
}
135140
}

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ public void testTaskFailuresAreIgnored() throws Exception {
9696
StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph();
9797
Canceling canceling = createCancelingState(ctx, meg);
9898
// register execution at EG
99-
ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex();
99+
ExecutingTest.MockExecutionJobVertex ejv =
100+
new ExecutingTest.MockExecutionJobVertex(
101+
ExecutingTest.MockExecutionVertex::new);
100102
TaskExecutionStateTransition update =
101103
new TaskExecutionStateTransition(
102104
new TaskExecutionState(

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java

Lines changed: 121 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
8282
import static org.hamcrest.CoreMatchers.instanceOf;
8383
import static org.hamcrest.CoreMatchers.is;
84+
import static org.hamcrest.CoreMatchers.not;
8485
import static org.hamcrest.CoreMatchers.notNullValue;
8586
import static org.junit.Assert.assertThat;
8687

@@ -90,17 +91,60 @@ public class ExecutingTest extends TestLogger {
9091
@Test
9192
public void testExecutionGraphDeploymentOnEnter() throws Exception {
9293
try (MockExecutingContext ctx = new MockExecutingContext()) {
93-
MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex();
94+
MockExecutionJobVertex mockExecutionJobVertex =
95+
new MockExecutionJobVertex(MockExecutionVertex::new);
96+
MockExecutionVertex mockExecutionVertex =
97+
(MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex();
98+
mockExecutionVertex.setMockedExecutionState(ExecutionState.CREATED);
9499
ExecutionGraph executionGraph =
95100
new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
96101
Executing exec =
97102
new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);
98103

99-
assertThat(mockExecutionJobVertex.isExecutionDeployed(), is(true));
104+
assertThat(mockExecutionVertex.isDeployCalled(), is(true));
100105
assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
101106
}
102107
}
103108

109+
@Test
110+
public void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
111+
try (MockExecutingContext ctx = new MockExecutingContext()) {
112+
MockExecutionJobVertex mockExecutionJobVertex =
113+
new MockExecutionJobVertex(MockExecutionVertex::new);
114+
ExecutionGraph executionGraph =
115+
new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
116+
executionGraph.transitionToRunning();
117+
final MockExecutionVertex mockExecutionVertex =
118+
((MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex());
119+
mockExecutionVertex.setMockedExecutionState(ExecutionState.RUNNING);
120+
121+
new Executing(
122+
executionGraph,
123+
getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()),
124+
new TestingOperatorCoordinatorHandler(),
125+
log,
126+
ctx,
127+
ClassLoader.getSystemClassLoader());
128+
assertThat(mockExecutionVertex.isDeployCalled(), is(false));
129+
}
130+
}
131+
132+
@Test(expected = IllegalStateException.class)
133+
public void testIllegalStateExceptionOnNotRunningExecutionGraph() throws Exception {
134+
try (MockExecutingContext ctx = new MockExecutingContext()) {
135+
ExecutionGraph notRunningExecutionGraph = new StateTrackingMockExecutionGraph();
136+
assertThat(notRunningExecutionGraph.getState(), is(not(JobStatus.RUNNING)));
137+
138+
new Executing(
139+
notRunningExecutionGraph,
140+
getExecutionGraphHandler(notRunningExecutionGraph, ctx.getMainThreadExecutor()),
141+
new TestingOperatorCoordinatorHandler(),
142+
log,
143+
ctx,
144+
ClassLoader.getSystemClassLoader());
145+
}
146+
}
147+
104148
@Test
105149
public void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph()
106150
throws Exception {
@@ -258,6 +302,23 @@ public void testFalseReportsViaUpdateTaskExecutionStateAreIgnored() throws Excep
258302
}
259303
}
260304

305+
@Test
306+
public void testExecutionVertexMarkedAsFailedOnDeploymentFailure() throws Exception {
307+
try (MockExecutingContext ctx = new MockExecutingContext()) {
308+
MockExecutionJobVertex mejv =
309+
new MockExecutionJobVertex(FailOnDeployMockExecutionVertex::new);
310+
ExecutionGraph executionGraph =
311+
new MockExecutionGraph(() -> Collections.singletonList(mejv));
312+
Executing exec =
313+
new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);
314+
315+
assertThat(
316+
((FailOnDeployMockExecutionVertex) mejv.getMockExecutionVertex())
317+
.getMarkedFailure(),
318+
is(instanceOf(JobException.class)));
319+
}
320+
}
321+
261322
@Test
262323
public void testTransitionToStopWithSavepointState() throws Exception {
263324
try (MockExecutingContext ctx = new MockExecutingContext()) {
@@ -389,31 +450,31 @@ public ExecutingStateBuilder setOperatorCoordinatorHandler(
389450

390451
private Executing build(MockExecutingContext ctx) {
391452
executionGraph.transitionToRunning();
392-
final ExecutionGraphHandler executionGraphHandler =
393-
new ExecutionGraphHandler(
394-
executionGraph,
395-
log,
396-
ctx.getMainThreadExecutor(),
397-
ctx.getMainThreadExecutor());
398453

399454
return new Executing(
400455
executionGraph,
401-
executionGraphHandler,
456+
getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()),
402457
operatorCoordinatorHandler,
403458
log,
404459
ctx,
405460
ClassLoader.getSystemClassLoader());
406461
}
407462
}
408463

464+
private ExecutionGraphHandler getExecutionGraphHandler(
465+
ExecutionGraph executionGraph, ComponentMainThreadExecutor mainThreadExecutor) {
466+
return new ExecutionGraphHandler(
467+
executionGraph, log, mainThreadExecutor, mainThreadExecutor);
468+
}
469+
409470
private static class MockExecutingContext extends MockStateWithExecutionGraphContext
410471
implements Executing.Context {
411472

412473
private final StateValidator<FailingArguments> failingStateValidator =
413474
new StateValidator<>("failing");
414475
private final StateValidator<RestartingArguments> restartingStateValidator =
415476
new StateValidator<>("restarting");
416-
private final StateValidator<ExecutingAndCancellingArguments> cancellingStateValidator =
477+
private final StateValidator<CancellingArguments> cancellingStateValidator =
417478
new StateValidator<>("cancelling");
418479

419480
private Function<Throwable, Executing.FailureResult> howToHandleFailure;
@@ -431,7 +492,7 @@ public void setExpectRestarting(Consumer<RestartingArguments> asserter) {
431492
restartingStateValidator.expectInput(asserter);
432493
}
433494

434-
public void setExpectCancelling(Consumer<ExecutingAndCancellingArguments> asserter) {
495+
public void setExpectCancelling(Consumer<CancellingArguments> asserter) {
435496
cancellingStateValidator.expectInput(asserter);
436497
}
437498

@@ -455,7 +516,7 @@ public void goToCanceling(
455516
ExecutionGraphHandler executionGraphHandler,
456517
OperatorCoordinatorHandler operatorCoordinatorHandler) {
457518
cancellingStateValidator.validateInput(
458-
new ExecutingAndCancellingArguments(
519+
new CancellingArguments(
459520
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
460521
hadStateTransition = true;
461522
}
@@ -537,12 +598,12 @@ public void close() throws Exception {
537598
}
538599
}
539600

540-
static class ExecutingAndCancellingArguments {
601+
static class CancellingArguments {
541602
private final ExecutionGraph executionGraph;
542603
private final ExecutionGraphHandler executionGraphHandler;
543604
private final OperatorCoordinatorHandler operatorCoordinatorHandle;
544605

545-
public ExecutingAndCancellingArguments(
606+
public CancellingArguments(
546607
ExecutionGraph executionGraph,
547608
ExecutionGraphHandler executionGraphHandler,
548609
OperatorCoordinatorHandler operatorCoordinatorHandle) {
@@ -564,7 +625,7 @@ public OperatorCoordinatorHandler getOperatorCoordinatorHandle() {
564625
}
565626
}
566627

567-
static class StopWithSavepointArguments extends ExecutingAndCancellingArguments {
628+
static class StopWithSavepointArguments extends CancellingArguments {
568629
private final CheckpointScheduling checkpointScheduling;
569630
private final CompletableFuture<String> savepointFuture;
570631

@@ -580,7 +641,7 @@ public StopWithSavepointArguments(
580641
}
581642
}
582643

583-
static class RestartingArguments extends ExecutingAndCancellingArguments {
644+
static class RestartingArguments extends CancellingArguments {
584645
private final Duration backoffTime;
585646

586647
public RestartingArguments(
@@ -597,7 +658,7 @@ public Duration getBackoffTime() {
597658
}
598659
}
599660

600-
static class FailingArguments extends ExecutingAndCancellingArguments {
661+
static class FailingArguments extends CancellingArguments {
601662
private final Throwable failureCause;
602663

603664
public FailingArguments(
@@ -614,7 +675,7 @@ public Throwable getFailureCause() {
614675
}
615676
}
616677

617-
private static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
678+
static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
618679
private final boolean updateStateReturnValue;
619680
private final Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier;
620681

@@ -623,10 +684,6 @@ private static class MockExecutionGraph extends StateTrackingMockExecutionGraph
623684
this(false, getVerticesTopologicallySupplier);
624685
}
625686

626-
MockExecutionGraph(boolean updateStateReturnValue) {
627-
this(updateStateReturnValue, null);
628-
}
629-
630687
private MockExecutionGraph(
631688
boolean updateStateReturnValue,
632689
Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier) {
@@ -689,9 +746,11 @@ public Logger getLogger() {
689746
}
690747

691748
static class MockExecutionJobVertex extends ExecutionJobVertex {
692-
private final MockExecutionVertex mockExecutionVertex;
749+
private final ExecutionVertex mockExecutionVertex;
693750

694-
MockExecutionJobVertex() throws JobException {
751+
MockExecutionJobVertex(
752+
Function<ExecutionJobVertex, ExecutionVertex> executionVertexSupplier)
753+
throws JobException {
695754
super(
696755
new MockInternalExecutionGraphAccessor(),
697756
new JobVertex("test"),
@@ -700,37 +759,67 @@ static class MockExecutionJobVertex extends ExecutionJobVertex {
700759
1L,
701760
new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()),
702761
new DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
703-
mockExecutionVertex = new MockExecutionVertex(this);
762+
mockExecutionVertex = executionVertexSupplier.apply(this);
704763
}
705764

706765
@Override
707766
public ExecutionVertex[] getTaskVertices() {
708767
return new ExecutionVertex[] {mockExecutionVertex};
709768
}
710769

711-
public MockExecutionVertex getMockExecutionVertex() {
770+
public ExecutionVertex getMockExecutionVertex() {
712771
return mockExecutionVertex;
713772
}
773+
}
774+
775+
static class FailOnDeployMockExecutionVertex extends ExecutionVertex {
776+
777+
@Nullable private Throwable markFailed = null;
778+
779+
public FailOnDeployMockExecutionVertex(ExecutionJobVertex jobVertex) {
780+
super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0);
781+
}
714782

715-
public boolean isExecutionDeployed() {
716-
return mockExecutionVertex.isDeployed();
783+
@Override
784+
public void deploy() throws JobException {
785+
throw new JobException("Intentional Test exception");
786+
}
787+
788+
@Override
789+
public void markFailed(Throwable t) {
790+
markFailed = t;
791+
}
792+
793+
@Nullable
794+
public Throwable getMarkedFailure() {
795+
return markFailed;
717796
}
718797
}
719798

720799
static class MockExecutionVertex extends ExecutionVertex {
721-
private boolean deployed = false;
800+
private boolean deployCalled = false;
801+
private ExecutionState mockedExecutionState = ExecutionState.RUNNING;
722802

723803
MockExecutionVertex(ExecutionJobVertex jobVertex) {
724804
super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0);
725805
}
726806

727807
@Override
728808
public void deploy() throws JobException {
729-
deployed = true;
809+
deployCalled = true;
810+
}
811+
812+
public boolean isDeployCalled() {
813+
return deployCalled;
814+
}
815+
816+
@Override
817+
public ExecutionState getExecutionState() {
818+
return mockedExecutionState;
730819
}
731820

732-
public boolean isDeployed() {
733-
return deployed;
821+
public void setMockedExecutionState(ExecutionState mockedExecutionState) {
822+
this.mockedExecutionState = mockedExecutionState;
734823
}
735824
}
736825

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ public void testTaskFailuresAreIgnored() throws Exception {
102102
StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph();
103103
Failing failing = createFailingState(ctx, meg);
104104
// register execution at EG
105-
ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex();
105+
ExecutingTest.MockExecutionJobVertex ejv =
106+
new ExecutingTest.MockExecutionJobVertex(
107+
ExecutingTest.MockExecutionVertex::new);
106108
TaskExecutionStateTransition update =
107109
new TaskExecutionStateTransition(
108110
new TaskExecutionState(
@@ -159,11 +161,10 @@ private Failing createFailingState(MockFailingContext ctx, ExecutionGraph execut
159161
private static class MockFailingContext extends MockStateWithExecutionGraphContext
160162
implements Failing.Context {
161163

162-
private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
163-
cancellingStateValidator = new StateValidator<>("cancelling");
164+
private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
165+
new StateValidator<>("cancelling");
164166

165-
public void setExpectCanceling(
166-
Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
167+
public void setExpectCanceling(Consumer<ExecutingTest.CancellingArguments> asserter) {
167168
cancellingStateValidator.expectInput(asserter);
168169
}
169170

@@ -173,7 +174,7 @@ public void goToCanceling(
173174
ExecutionGraphHandler executionGraphHandler,
174175
OperatorCoordinatorHandler operatorCoordinatorHandler) {
175176
cancellingStateValidator.validateInput(
176-
new ExecutingTest.ExecutingAndCancellingArguments(
177+
new ExecutingTest.CancellingArguments(
177178
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
178179
hadStateTransition = true;
179180
}

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,13 @@ public Restarting createRestartingState(MockRestartingContext ctx)
138138
private static class MockRestartingContext extends MockStateWithExecutionGraphContext
139139
implements Restarting.Context {
140140

141-
private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
142-
cancellingStateValidator = new StateValidator<>("Cancelling");
141+
private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
142+
new StateValidator<>("Cancelling");
143143

144144
private final StateValidator<Void> waitingForResourcesStateValidator =
145145
new StateValidator<>("WaitingForResources");
146146

147-
public void setExpectCancelling(
148-
Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
147+
public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
149148
cancellingStateValidator.expectInput(asserter);
150149
}
151150

@@ -159,7 +158,7 @@ public void goToCanceling(
159158
ExecutionGraphHandler executionGraphHandler,
160159
OperatorCoordinatorHandler operatorCoordinatorHandler) {
161160
cancellingStateValidator.validateInput(
162-
new ExecutingTest.ExecutingAndCancellingArguments(
161+
new ExecutingTest.CancellingArguments(
163162
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
164163
hadStateTransition = true;
165164
}

0 commit comments

Comments
 (0)