-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-22266] Fix stop-with-savepoint operation in AdaptiveScheduler #15884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ddabee5 (Sat Aug 28 12:10:46 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @rmetzger. I think your solution does not work because it breaks with the contract that only StateWithExecutionGraph states can process updateTaskExecutionState messages. Concretely, with this change I think that we will ignore deployment failures.
Independent of this, we seem to be lacking test coverage for deployment failures on the unit test level as far as I can tell.
| private void handleDeploymentFailure(ExecutionVertex executionVertex, JobException e) { | ||
| executionVertex.markFailed(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionVertex.markFailed will trigger an updateTaskExecutionState which will only be processed by an StateWithExecutionGraph. Hence, I think this will now simply be ignored.
| // creating the savepoint has failed but job is still running | ||
| Preconditions.checkState(getExecutionGraph().getState() == JobStatus.RUNNING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you introduce this checkState? Wouldn't this be caught by the Executing state?
| CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create( | ||
| executionGraph, new TestingVertexParallelism())); | ||
|
|
||
| assertThat(mockExecutionJobVertex.isExecutionDeployed(), is(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no test which ensures the proper behavior if deploying fails in state CreatingExecutionGraph.
7b64aa1 to
4c6fcf4
Compare
|
Thanks a lot for your feedback. I agree with your findings, and I've reworked the change accordingly.
I added a test for this |
| } else { | ||
| throw new IllegalStateException( | ||
| "Unexpected executing state behavior " + executingStateBehavior); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having slept over this one night, I'm not so sure anymore if this is the right approach. We can probably always assume the execution graph to be in state RUNNING, and on Behavior.EXPECT_RUNNING we can go through all ExecutionVertex and check if their state is running. I'll try to look into this Monday morning latest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it.
4c6fcf4 to
94a973c
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @rmetzger. I think the fix looks good. I had one comment concerning a possible simplification and another comment concerning the test coverage of the failed deploy call. Please take a look.
| if (executingStateBehavior == Behavior.DEPLOY_ON_ENTER) { | ||
| onAllExecutionVertexes(this::deploySafely); | ||
| } else if (executingStateBehavior == Behavior.EXPECT_RUNNING) { | ||
| onAllExecutionVertexes(this::expectRunning); | ||
| } else { | ||
| throw new IllegalStateException( | ||
| "Unexpected executing state behavior " + executingStateBehavior); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we say that we deploy all not currently running Executions when entering this state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, that's a good simplification. I pushed a commit addressing this item.
| ((FailOnDeployMockExecutionVertex) mejv.getMockExecutionVertex()) | ||
| .getMarkedFailure(), | ||
| is(instanceOf(JobException.class))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test which ensures that Execution.markFailed will result in the proper exception handling in the Executing state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling of markFailed is difficult to test, because so many components are involved. But in my opinion, we have good test coverage:
markFailed will (through the DefaultExecutionGraph) notify the InternalFailuresListener about the task failure. The UpdateSchedulerNgOnInternalFailuresListener implementation used by adaptive scheduler will call updateTaskExecutionState on the scheduler. This chain of calls will be used for example for the failure in the AdaptiveSchedulerITCase.testGlobalFailoverCanRecoverState() test.
For the Executing state, we have tests that exceptions during deployment lead to a markFailed call (testExecutionVertexMarkedAsFailedOnDeploymentFailure), and failures reported via updateTaskExecutionState to appropriate error handling (testFailureReportedViaUpdateTaskExecutionStateCausesFailingOnNoRestart, testFailureReportedViaUpdateTaskExecutionStateCausesRestart, testFalseReportsViaUpdateTaskExecutionStateAreIgnored).
Adding a test that a markFailed call will notify the InternalFailuresListener is out of the scope of the ExecutingTest (because we are testing the ExecutionVertex and Execution classes).
Adding a test that a markFailed call will call updateTaskExecutionState will need to go through a test specific InternalFailuresListener: Since all the relevant calls on ExecutingState are already covered, this would only test the test specific InternalFailuresListener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, sounds good. Thanks for all the details.
94a973c to
8f0b36f
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @rmetzger. LGTM. +1 for merging after resolving my last comment.
| if (executionVertex.getExecutionState() != ExecutionState.RUNNING) { | ||
| deploySafely(executionVertex); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we say that we call deploy if the ExecutionState is CREATED or SCHEDULED?
|
Thanks a lot for your review. I'll address your last comment & merge the change. |
8f0b36f to
ddabee5
Compare
Brief change log
When the creation of the savepoint fails, we go back into the Executing state (because the job is still running). However, this doesn't work, because Executing state is deploying the job when entering the state.
In this change, we are moving the job deployment out of the Executing state, and assert in the state that it is running.
I didn't chose the option of restarting the job, as this is potentially an expensive operation for the user.
Once #15882 has been merged, I'll adjust this PR.