Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer {
super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
this.context = context;
this.userCodeClassLoader = userCodeClassLoader;
Preconditions.checkState(
executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");

deploy();

Expand Down Expand Up @@ -129,7 +131,10 @@ private void deploy() {
for (ExecutionJobVertex executionJobVertex :
getExecutionGraph().getVerticesTopologically()) {
for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
deploySafely(executionVertex);
if (executionVertex.getExecutionState() == ExecutionState.CREATED
|| executionVertex.getExecutionState() == ExecutionState.SCHEDULED) {
deploySafely(executionVertex);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public void testTaskFailuresAreIgnored() throws Exception {
StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph();
Canceling canceling = createCancelingState(ctx, meg);
// register execution at EG
ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex();
ExecutingTest.MockExecutionJobVertex ejv =
new ExecutingTest.MockExecutionJobVertex(
ExecutingTest.MockExecutionVertex::new);
TaskExecutionStateTransition update =
new TaskExecutionStateTransition(
new TaskExecutionState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import static org.apache.flink.runtime.scheduler.adaptive.WaitingForResourcesTest.assertNonNull;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;

Expand All @@ -90,17 +91,60 @@ public class ExecutingTest extends TestLogger {
@Test
public void testExecutionGraphDeploymentOnEnter() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
MockExecutionJobVertex mockExecutionJobVertex = new MockExecutionJobVertex();
MockExecutionJobVertex mockExecutionJobVertex =
new MockExecutionJobVertex(MockExecutionVertex::new);
MockExecutionVertex mockExecutionVertex =
(MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex();
mockExecutionVertex.setMockedExecutionState(ExecutionState.CREATED);
ExecutionGraph executionGraph =
new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
Executing exec =
new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);

assertThat(mockExecutionJobVertex.isExecutionDeployed(), is(true));
assertThat(mockExecutionVertex.isDeployCalled(), is(true));
assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
}
}

@Test
public void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
MockExecutionJobVertex mockExecutionJobVertex =
new MockExecutionJobVertex(MockExecutionVertex::new);
ExecutionGraph executionGraph =
new MockExecutionGraph(() -> Collections.singletonList(mockExecutionJobVertex));
executionGraph.transitionToRunning();
final MockExecutionVertex mockExecutionVertex =
((MockExecutionVertex) mockExecutionJobVertex.getMockExecutionVertex());
mockExecutionVertex.setMockedExecutionState(ExecutionState.RUNNING);

new Executing(
executionGraph,
getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()),
new TestingOperatorCoordinatorHandler(),
log,
ctx,
ClassLoader.getSystemClassLoader());
assertThat(mockExecutionVertex.isDeployCalled(), is(false));
}
}

@Test(expected = IllegalStateException.class)
public void testIllegalStateExceptionOnNotRunningExecutionGraph() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
ExecutionGraph notRunningExecutionGraph = new StateTrackingMockExecutionGraph();
assertThat(notRunningExecutionGraph.getState(), is(not(JobStatus.RUNNING)));

new Executing(
notRunningExecutionGraph,
getExecutionGraphHandler(notRunningExecutionGraph, ctx.getMainThreadExecutor()),
new TestingOperatorCoordinatorHandler(),
log,
ctx,
ClassLoader.getSystemClassLoader());
}
}

@Test
public void testDisposalOfOperatorCoordinatorsOnLeaveOfStateWithExecutionGraph()
throws Exception {
Expand Down Expand Up @@ -258,6 +302,23 @@ public void testFalseReportsViaUpdateTaskExecutionStateAreIgnored() throws Excep
}
}

@Test
public void testExecutionVertexMarkedAsFailedOnDeploymentFailure() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
MockExecutionJobVertex mejv =
new MockExecutionJobVertex(FailOnDeployMockExecutionVertex::new);
ExecutionGraph executionGraph =
new MockExecutionGraph(() -> Collections.singletonList(mejv));
Executing exec =
new ExecutingStateBuilder().setExecutionGraph(executionGraph).build(ctx);

assertThat(
((FailOnDeployMockExecutionVertex) mejv.getMockExecutionVertex())
.getMarkedFailure(),
is(instanceOf(JobException.class)));
Comment on lines +316 to +318
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test which ensures that Execution.markFailed will result in the proper exception handling in the Executing state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling of markFailed is difficult to test, because so many components are involved. But in my opinion, we have good test coverage:

markFailed will (through the DefaultExecutionGraph) notify the InternalFailuresListener about the task failure. The UpdateSchedulerNgOnInternalFailuresListener implementation used by adaptive scheduler will call updateTaskExecutionState on the scheduler. This chain of calls will be used for example for the failure in the AdaptiveSchedulerITCase.testGlobalFailoverCanRecoverState() test.

For the Executing state, we have tests that exceptions during deployment lead to a markFailed call (testExecutionVertexMarkedAsFailedOnDeploymentFailure), and failures reported via updateTaskExecutionState to appropriate error handling (testFailureReportedViaUpdateTaskExecutionStateCausesFailingOnNoRestart, testFailureReportedViaUpdateTaskExecutionStateCausesRestart, testFalseReportsViaUpdateTaskExecutionStateAreIgnored).

Adding a test that a markFailed call will notify the InternalFailuresListener is out of the scope of the ExecutingTest (because we are testing the ExecutionVertex and Execution classes).
Adding a test that a markFailed call will call updateTaskExecutionState will need to go through a test specific InternalFailuresListener: Since all the relevant calls on ExecutingState are already covered, this would only test the test specific InternalFailuresListener.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds good. Thanks for all the details.

}
}

@Test
public void testTransitionToStopWithSavepointState() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
Expand Down Expand Up @@ -389,31 +450,31 @@ public ExecutingStateBuilder setOperatorCoordinatorHandler(

private Executing build(MockExecutingContext ctx) {
executionGraph.transitionToRunning();
final ExecutionGraphHandler executionGraphHandler =
new ExecutionGraphHandler(
executionGraph,
log,
ctx.getMainThreadExecutor(),
ctx.getMainThreadExecutor());

return new Executing(
executionGraph,
executionGraphHandler,
getExecutionGraphHandler(executionGraph, ctx.getMainThreadExecutor()),
operatorCoordinatorHandler,
log,
ctx,
ClassLoader.getSystemClassLoader());
}
}

private ExecutionGraphHandler getExecutionGraphHandler(
ExecutionGraph executionGraph, ComponentMainThreadExecutor mainThreadExecutor) {
return new ExecutionGraphHandler(
executionGraph, log, mainThreadExecutor, mainThreadExecutor);
}

private static class MockExecutingContext extends MockStateWithExecutionGraphContext
implements Executing.Context {

private final StateValidator<FailingArguments> failingStateValidator =
new StateValidator<>("failing");
private final StateValidator<RestartingArguments> restartingStateValidator =
new StateValidator<>("restarting");
private final StateValidator<ExecutingAndCancellingArguments> cancellingStateValidator =
private final StateValidator<CancellingArguments> cancellingStateValidator =
new StateValidator<>("cancelling");

private Function<Throwable, Executing.FailureResult> howToHandleFailure;
Expand All @@ -431,7 +492,7 @@ public void setExpectRestarting(Consumer<RestartingArguments> asserter) {
restartingStateValidator.expectInput(asserter);
}

public void setExpectCancelling(Consumer<ExecutingAndCancellingArguments> asserter) {
public void setExpectCancelling(Consumer<CancellingArguments> asserter) {
cancellingStateValidator.expectInput(asserter);
}

Expand All @@ -455,7 +516,7 @@ public void goToCanceling(
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler) {
cancellingStateValidator.validateInput(
new ExecutingAndCancellingArguments(
new CancellingArguments(
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
hadStateTransition = true;
}
Expand Down Expand Up @@ -537,12 +598,12 @@ public void close() throws Exception {
}
}

static class ExecutingAndCancellingArguments {
static class CancellingArguments {
private final ExecutionGraph executionGraph;
private final ExecutionGraphHandler executionGraphHandler;
private final OperatorCoordinatorHandler operatorCoordinatorHandle;

public ExecutingAndCancellingArguments(
public CancellingArguments(
ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandle) {
Expand All @@ -564,7 +625,7 @@ public OperatorCoordinatorHandler getOperatorCoordinatorHandle() {
}
}

static class StopWithSavepointArguments extends ExecutingAndCancellingArguments {
static class StopWithSavepointArguments extends CancellingArguments {
private final CheckpointScheduling checkpointScheduling;
private final CompletableFuture<String> savepointFuture;

Expand All @@ -580,7 +641,7 @@ public StopWithSavepointArguments(
}
}

static class RestartingArguments extends ExecutingAndCancellingArguments {
static class RestartingArguments extends CancellingArguments {
private final Duration backoffTime;

public RestartingArguments(
Expand All @@ -597,7 +658,7 @@ public Duration getBackoffTime() {
}
}

static class FailingArguments extends ExecutingAndCancellingArguments {
static class FailingArguments extends CancellingArguments {
private final Throwable failureCause;

public FailingArguments(
Expand All @@ -614,7 +675,7 @@ public Throwable getFailureCause() {
}
}

private static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
static class MockExecutionGraph extends StateTrackingMockExecutionGraph {
private final boolean updateStateReturnValue;
private final Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier;

Expand All @@ -623,10 +684,6 @@ private static class MockExecutionGraph extends StateTrackingMockExecutionGraph
this(false, getVerticesTopologicallySupplier);
}

MockExecutionGraph(boolean updateStateReturnValue) {
this(updateStateReturnValue, null);
}

private MockExecutionGraph(
boolean updateStateReturnValue,
Supplier<Iterable<ExecutionJobVertex>> getVerticesTopologicallySupplier) {
Expand Down Expand Up @@ -689,9 +746,11 @@ public Logger getLogger() {
}

static class MockExecutionJobVertex extends ExecutionJobVertex {
private final MockExecutionVertex mockExecutionVertex;
private final ExecutionVertex mockExecutionVertex;

MockExecutionJobVertex() throws JobException {
MockExecutionJobVertex(
Function<ExecutionJobVertex, ExecutionVertex> executionVertexSupplier)
throws JobException {
super(
new MockInternalExecutionGraphAccessor(),
new JobVertex("test"),
Expand All @@ -700,37 +759,67 @@ static class MockExecutionJobVertex extends ExecutionJobVertex {
1L,
new DefaultVertexParallelismInfo(1, 1, max -> Optional.empty()),
new DefaultSubtaskAttemptNumberStore(Collections.emptyList()));
mockExecutionVertex = new MockExecutionVertex(this);
mockExecutionVertex = executionVertexSupplier.apply(this);
}

@Override
public ExecutionVertex[] getTaskVertices() {
return new ExecutionVertex[] {mockExecutionVertex};
}

public MockExecutionVertex getMockExecutionVertex() {
public ExecutionVertex getMockExecutionVertex() {
return mockExecutionVertex;
}
}

static class FailOnDeployMockExecutionVertex extends ExecutionVertex {

@Nullable private Throwable markFailed = null;

public FailOnDeployMockExecutionVertex(ExecutionJobVertex jobVertex) {
super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0);
}

public boolean isExecutionDeployed() {
return mockExecutionVertex.isDeployed();
@Override
public void deploy() throws JobException {
throw new JobException("Intentional Test exception");
}

@Override
public void markFailed(Throwable t) {
markFailed = t;
}

@Nullable
public Throwable getMarkedFailure() {
return markFailed;
}
}

static class MockExecutionVertex extends ExecutionVertex {
private boolean deployed = false;
private boolean deployCalled = false;
private ExecutionState mockedExecutionState = ExecutionState.RUNNING;

MockExecutionVertex(ExecutionJobVertex jobVertex) {
super(jobVertex, 1, new IntermediateResult[] {}, Time.milliseconds(1L), 1L, 1, 0);
}

@Override
public void deploy() throws JobException {
deployed = true;
deployCalled = true;
}

public boolean isDeployCalled() {
return deployCalled;
}

@Override
public ExecutionState getExecutionState() {
return mockedExecutionState;
}

public boolean isDeployed() {
return deployed;
public void setMockedExecutionState(ExecutionState mockedExecutionState) {
this.mockedExecutionState = mockedExecutionState;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ public void testTaskFailuresAreIgnored() throws Exception {
StateTrackingMockExecutionGraph meg = new StateTrackingMockExecutionGraph();
Failing failing = createFailingState(ctx, meg);
// register execution at EG
ExecutingTest.MockExecutionJobVertex ejv = new ExecutingTest.MockExecutionJobVertex();
ExecutingTest.MockExecutionJobVertex ejv =
new ExecutingTest.MockExecutionJobVertex(
ExecutingTest.MockExecutionVertex::new);
TaskExecutionStateTransition update =
new TaskExecutionStateTransition(
new TaskExecutionState(
Expand Down Expand Up @@ -159,11 +161,10 @@ private Failing createFailingState(MockFailingContext ctx, ExecutionGraph execut
private static class MockFailingContext extends MockStateWithExecutionGraphContext
implements Failing.Context {

private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
cancellingStateValidator = new StateValidator<>("cancelling");
private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
new StateValidator<>("cancelling");

public void setExpectCanceling(
Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
public void setExpectCanceling(Consumer<ExecutingTest.CancellingArguments> asserter) {
cancellingStateValidator.expectInput(asserter);
}

Expand All @@ -173,7 +174,7 @@ public void goToCanceling(
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler) {
cancellingStateValidator.validateInput(
new ExecutingTest.ExecutingAndCancellingArguments(
new ExecutingTest.CancellingArguments(
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
hadStateTransition = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,13 @@ public Restarting createRestartingState(MockRestartingContext ctx)
private static class MockRestartingContext extends MockStateWithExecutionGraphContext
implements Restarting.Context {

private final StateValidator<ExecutingTest.ExecutingAndCancellingArguments>
cancellingStateValidator = new StateValidator<>("Cancelling");
private final StateValidator<ExecutingTest.CancellingArguments> cancellingStateValidator =
new StateValidator<>("Cancelling");

private final StateValidator<Void> waitingForResourcesStateValidator =
new StateValidator<>("WaitingForResources");

public void setExpectCancelling(
Consumer<ExecutingTest.ExecutingAndCancellingArguments> asserter) {
public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
cancellingStateValidator.expectInput(asserter);
}

Expand All @@ -159,7 +158,7 @@ public void goToCanceling(
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler) {
cancellingStateValidator.validateInput(
new ExecutingTest.ExecutingAndCancellingArguments(
new ExecutingTest.CancellingArguments(
executionGraph, executionGraphHandler, operatorCoordinatorHandler));
hadStateTransition = true;
}
Expand Down
Loading