diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index 6b44c51dc9612..29e3cad4dc8ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -59,6 +59,8 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer { super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger); this.context = context; this.userCodeClassLoader = userCodeClassLoader; + Preconditions.checkState( + executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph"); deploy(); @@ -129,7 +131,10 @@ private void deploy() { for (ExecutionJobVertex executionJobVertex : getExecutionGraph().getVerticesTopologically()) { for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) { - deploySafely(executionVertex); + if (executionVertex.getExecutionState() == ExecutionState.CREATED + || executionVertex.getExecutionState() == ExecutionState.SCHEDULED) { + deploySafely(executionVertex); + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java index 0d74a1053cac3..1d7258ce8ba03 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java @@ -96,7 +96,9 @@ public void testTaskFailuresAreIgnored() throws Exception { StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph(); Canceling canceling = createCancelingState(ctx, meg); // register execution at EG - ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex(); + ExecutingTest.MockExecutionJobVertex ejv = + new ExecutingTest.MockExecutionJobVertex( + ExecutingTest.MockExecutionVertex::new); TaskExecutionStateTransition update = new TaskExecutionStateTransition( new TaskExecutionState( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index 2852bdab7281e..ca0b041e62dd5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -81,6 +81,7 @@ import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.Assert.assertThat; @@ -90,17 +91,60 @@ public class ExecutingTest extends TestLogger { @Test public void testExecutionGraphDeploymentOnEnter() throws Exception { try (MockExecutingContext ctx = new MockExecutingContext()) { - MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex(); + MockExecutionJobVertex mockExecutionJobVertex = + new MockExecutionJobVertex(MockExecutionVertex::new); + MockExecutionVertex mockExecutionVertex = + (MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex(); + mockExecutionVertex.setMockedExecutionState(ExecutionState.CREATED); ExecutionGraph executionGraph = new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex)); Executing exec = new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx); - assertThat(mockExecutionJobVertex.isExecutionDeployed(), is(true)); + assertThat(mockExecutionVertex.isDeployCalled(), is(true)); assertThat(executionGraph.getState(), is(JobStatus.RUNNING)); } } + @Test + public void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception { + try (MockExecutingContext ctx = new MockExecutingContext()) { + MockExecutionJobVertex mockExecutionJobVertex = + new MockExecutionJobVertex(MockExecutionVertex::new); + ExecutionGraph executionGraph = + new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex)); + executionGraph.transitionToRunning(); + final MockExecutionVertex mockExecutionVertex = + ((MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex()); + mockExecutionVertex.setMockedExecutionState(ExecutionState.RUNNING); + + new Executing( + executionGraph, + getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()), + new TestingOperatorCoordinatorHandler(), + log, + ctx, + ClassLoader.getSystemClassLoader()); + assertThat(mockExecutionVertex.isDeployCalled(), is(false)); + } + } + + @Test(expected = IllegalStateException.class) + public void testIllegalStateExceptionOnNotRunningExecutionGraph() throws Exception { + try (MockExecutingContext ctx = new MockExecutingContext()) { + ExecutionGraph notRunningExecutionGraph = new StateTrackingMockExecutionGraph(); + assertThat(notRunningExecutionGraph.getState(), is(not(JobStatus.RUNNING))); + + new Executing( + notRunningExecutionGraph, + getExecutionGraphHandler(notRunningExecutionGraph, ctx.getMainThreadExecutor()), + new TestingOperatorCoordinatorHandler(), + log, + ctx, + ClassLoader.getSystemClassLoader()); + } + } + @Test public void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph() throws Exception { @@ -258,6 +302,23 @@ public void testFalseReportsViaUpdateTaskExecutionStateAreIgnored() throws Excep } } + @Test + public void testExecutionVertexMarkedAsFailedOnDeploymentFailure() throws Exception { + try (MockExecutingContext ctx = new MockExecutingContext()) { + MockExecutionJobVertex mejv = + new MockExecutionJobVertex(FailOnDeployMockExecutionVertex::new); + ExecutionGraph executionGraph = + new MockExecutionGraph(() -> Collections.singletonList(mejv)); + Executing exec = + new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx); + + assertThat( + ((FailOnDeployMockExecutionVertex) mejv.getMockExecutionVertex()) + .getMarkedFailure(), + is(instanceOf(JobException.class))); + } + } + @Test public void testTransitionToStopWithSavepointState() throws Exception { try (MockExecutingContext ctx = new MockExecutingContext()) { @@ -389,16 +450,10 @@ public ExecutingStateBuilder setOperatorCoordinatorHandler( private Executing build(MockExecutingContext ctx) { executionGraph.transitionToRunning(); - final ExecutionGraphHandler executionGraphHandler = - new ExecutionGraphHandler( - executionGraph, - log, - ctx.getMainThreadExecutor(), - ctx.getMainThreadExecutor()); return new Executing( executionGraph, - executionGraphHandler, + getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()), operatorCoordinatorHandler, log, ctx, @@ -406,6 +461,12 @@ private Executing build(MockExecutingContext ctx) { } } + private ExecutionGraphHandler getExecutionGraphHandler( + ExecutionGraph executionGraph, ComponentMainThreadExecutor mainThreadExecutor) { + return new ExecutionGraphHandler( + executionGraph, log, mainThreadExecutor, mainThreadExecutor); + } + private static class MockExecutingContext extends MockStateWithExecutionGraphContext implements Executing.Context { @@ -413,7 +474,7 @@ private static class MockExecutingContext extends MockStateWithExecutionGraphCon new StateValidator<>("failing"); private final StateValidator restartingStateValidator = new StateValidator<>("restarting"); - private final StateValidator cancellingStateValidator = + private final StateValidator cancellingStateValidator = new StateValidator<>("cancelling"); private Function howToHandleFailure; @@ -431,7 +492,7 @@ public void setExpectRestarting(Consumer asserter) { restartingStateValidator.expectInput(asserter); } - public void setExpectCancelling(Consumer asserter) { + public void setExpectCancelling(Consumer asserter) { cancellingStateValidator.expectInput(asserter); } @@ -455,7 +516,7 @@ public void goToCanceling( ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler) { cancellingStateValidator.validateInput( - new ExecutingAndCancellingArguments( + new CancellingArguments( executionGraph, executionGraphHandler, operatorCoordinatorHandler)); hadStateTransition = true; } @@ -537,12 +598,12 @@ public void close() throws Exception { } } - static class ExecutingAndCancellingArguments { + static class CancellingArguments { private final ExecutionGraph executionGraph; private final ExecutionGraphHandler executionGraphHandler; private final OperatorCoordinatorHandler operatorCoordinatorHandle; - public ExecutingAndCancellingArguments( + public CancellingArguments( ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandle) { @@ -564,7 +625,7 @@ public OperatorCoordinatorHandler getOperatorCoordinatorHandle() { } } - static class StopWithSavepointArguments extends ExecutingAndCancellingArguments { + static class StopWithSavepointArguments extends CancellingArguments { private final CheckpointScheduling checkpointScheduling; private final CompletableFuture savepointFuture; @@ -580,7 +641,7 @@ public StopWithSavepointArguments( } } - static class RestartingArguments extends ExecutingAndCancellingArguments { + static class RestartingArguments extends CancellingArguments { private final Duration backoffTime; public RestartingArguments( @@ -597,7 +658,7 @@ public Duration getBackoffTime() { } } - static class FailingArguments extends ExecutingAndCancellingArguments { + static class FailingArguments extends CancellingArguments { private final Throwable failureCause; public FailingArguments( @@ -614,7 +675,7 @@ public Throwable getFailureCause() { } } - private static class MockExecutionGraph extends StateTrackingMockExecutionGraph { + static class MockExecutionGraph extends StateTrackingMockExecutionGraph { private final boolean updateStateReturnValue; private final Supplier> getVerticesTopologicallySupplier; @@ -623,10 +684,6 @@ private static class MockExecutionGraph extends StateTrackingMockExecutionGraph this(false, getVerticesTopologicallySupplier); } - MockExecutionGraph(boolean updateStateReturnValue) { - this(updateStateReturnValue, null); - } - private MockExecutionGraph( boolean updateStateReturnValue, Supplier> getVerticesTopologicallySupplier) { @@ -689,9 +746,11 @@ public Logger getLogger() { } static class MockExecutionJobVertex extends ExecutionJobVertex { - private final MockExecutionVertex mockExecutionVertex; + private final ExecutionVertex mockExecutionVertex; - MockExecutionJobVertex() throws JobException { + MockExecutionJobVertex( + Function executionVertexSupplier) + throws JobException { super( new MockInternalExecutionGraphAccessor(), new JobVertex("test"), @@ -700,7 +759,7 @@ static class MockExecutionJobVertex extends ExecutionJobVertex { 1L, new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()), new DefaultSubtaskAttemptNumberStore(Collections.emptyList())); - mockExecutionVertex = new MockExecutionVertex(this); + mockExecutionVertex = executionVertexSupplier.apply(this); } @Override @@ -708,17 +767,38 @@ public ExecutionVertex[] getTaskVertices() { return new ExecutionVertex[] {mockExecutionVertex}; } - public MockExecutionVertex getMockExecutionVertex() { + public ExecutionVertex getMockExecutionVertex() { return mockExecutionVertex; } + } + + static class FailOnDeployMockExecutionVertex extends ExecutionVertex { + + @Nullable private Throwable markFailed = null; + + public FailOnDeployMockExecutionVertex(ExecutionJobVertex jobVertex) { + super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0); + } - public boolean isExecutionDeployed() { - return mockExecutionVertex.isDeployed(); + @Override + public void deploy() throws JobException { + throw new JobException("Intentional Test exception"); + } + + @Override + public void markFailed(Throwable t) { + markFailed = t; + } + + @Nullable + public Throwable getMarkedFailure() { + return markFailed; } } static class MockExecutionVertex extends ExecutionVertex { - private boolean deployed = false; + private boolean deployCalled = false; + private ExecutionState mockedExecutionState = ExecutionState.RUNNING; MockExecutionVertex(ExecutionJobVertex jobVertex) { super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0); @@ -726,11 +806,20 @@ static class MockExecutionVertex extends ExecutionVertex { @Override public void deploy() throws JobException { - deployed = true; + deployCalled = true; + } + + public boolean isDeployCalled() { + return deployCalled; + } + + @Override + public ExecutionState getExecutionState() { + return mockedExecutionState; } - public boolean isDeployed() { - return deployed; + public void setMockedExecutionState(ExecutionState mockedExecutionState) { + this.mockedExecutionState = mockedExecutionState; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java index 1b347e012a4ca..40056c623cb08 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java @@ -102,7 +102,9 @@ public void testTaskFailuresAreIgnored() throws Exception { StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph(); Failing failing = createFailingState(ctx, meg); // register execution at EG - ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex(); + ExecutingTest.MockExecutionJobVertex ejv = + new ExecutingTest.MockExecutionJobVertex( + ExecutingTest.MockExecutionVertex::new); TaskExecutionStateTransition update = new TaskExecutionStateTransition( new TaskExecutionState( @@ -159,11 +161,10 @@ private Failing createFailingState(MockFailingContext ctx, ExecutionGraph execut private static class MockFailingContext extends MockStateWithExecutionGraphContext implements Failing.Context { - private final StateValidator - cancellingStateValidator = new StateValidator<>("cancelling"); + private final StateValidator cancellingStateValidator = + new StateValidator<>("cancelling"); - public void setExpectCanceling( - Consumer asserter) { + public void setExpectCanceling(Consumer asserter) { cancellingStateValidator.expectInput(asserter); } @@ -173,7 +174,7 @@ public void goToCanceling( ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler) { cancellingStateValidator.validateInput( - new ExecutingTest.ExecutingAndCancellingArguments( + new ExecutingTest.CancellingArguments( executionGraph, executionGraphHandler, operatorCoordinatorHandler)); hadStateTransition = true; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java index d1d63953b07c9..fb8d19472ac94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java @@ -138,14 +138,13 @@ public Restarting createRestartingState(MockRestartingContext ctx) private static class MockRestartingContext extends MockStateWithExecutionGraphContext implements Restarting.Context { - private final StateValidator - cancellingStateValidator = new StateValidator<>("Cancelling"); + private final StateValidator cancellingStateValidator = + new StateValidator<>("Cancelling"); private final StateValidator waitingForResourcesStateValidator = new StateValidator<>("WaitingForResources"); - public void setExpectCancelling( - Consumer asserter) { + public void setExpectCancelling(Consumer asserter) { cancellingStateValidator.expectInput(asserter); } @@ -159,7 +158,7 @@ public void goToCanceling( ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler) { cancellingStateValidator.validateInput( - new ExecutingTest.ExecutingAndCancellingArguments( + new ExecutingTest.CancellingArguments( executionGraph, executionGraphHandler, operatorCoordinatorHandler)); hadStateTransition = true; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java index e9a1157b430dc..77ca94af4c9a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java @@ -267,6 +267,26 @@ public void testExceptionalSavepointCompletionLeadsToExceptionalOperationFutureC assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); } + @Test + public void testErrorCreatingSavepointLeadsToTransitionToExecutingState() throws Exception { + MockStopWithSavepointContext ctx = new MockStopWithSavepointContext(); + CheckpointScheduling mockStopWithSavepointOperations = new MockCheckpointScheduling(); + CompletableFuture savepointFuture = new CompletableFuture<>(); + StopWithSavepoint sws = + createStopWithSavepoint(ctx, mockStopWithSavepointOperations, savepointFuture); + ctx.setStopWithSavepoint(sws); + ctx.setExpectExecuting( + executingArguments -> + assertThat( + executingArguments.getExecutionGraph().getState(), + is(JobStatus.RUNNING))); + + savepointFuture.completeExceptionally(new RuntimeException("Test error")); + + ctx.close(); + assertThat(sws.getOperationFuture().isCompletedExceptionally(), is(true)); + } + @Test public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception { try (MockStopWithSavepointContext ctx = new MockStopWithSavepointContext()) { @@ -376,11 +396,11 @@ private static class MockStopWithSavepointContext extends MockStateWithExecution new StateValidator<>("failing"); private final StateValidator restartingStateValidator = new StateValidator<>("restarting"); - private final StateValidator - cancellingStateValidator = new StateValidator<>("cancelling"); + private final StateValidator cancellingStateValidator = + new StateValidator<>("cancelling"); - private final StateValidator - executingStateTransition = new StateValidator<>("executing"); + private final StateValidator executingStateTransition = + new StateValidator<>("executing"); private StopWithSavepoint state; @@ -396,13 +416,11 @@ public void setExpectRestarting(Consumer asse restartingStateValidator.expectInput(asserter); } - public void setExpectCancelling( - Consumer asserter) { + public void setExpectCancelling(Consumer asserter) { cancellingStateValidator.expectInput(asserter); } - public void setExpectExecuting( - Consumer asserter) { + public void setExpectExecuting(Consumer asserter) { executingStateTransition.expectInput(asserter); } @@ -430,7 +448,7 @@ public void goToCanceling( simulateTransitionToState(Canceling.class); cancellingStateValidator.validateInput( - new ExecutingTest.ExecutingAndCancellingArguments( + new ExecutingTest.CancellingArguments( executionGraph, executionGraphHandler, operatorCoordinatorHandler)); hadStateTransition = true; } @@ -474,7 +492,7 @@ public void goToExecuting( OperatorCoordinatorHandler operatorCoordinatorHandler) { simulateTransitionToState(Executing.class); executingStateTransition.validateInput( - new ExecutingTest.ExecutingAndCancellingArguments( + new ExecutingTest.CancellingArguments( executionGraph, executionGraphHandler, operatorCoordinatorHandler)); hadStateTransition = true; }