From be318decc58bd6c87a9669a3315f9fa72b0d45ce Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 23 Apr 2021 16:00:56 +0200 Subject: [PATCH 1/3] [FLINK-22431] Add information when and why the AdaptiveScheduler restarts or fails jobs This commit adds info log statements to tell the user when and why it restarts or fails a job. --- .../scheduler/adaptive/AdaptiveScheduler.java | 2 +- .../runtime/scheduler/adaptive/Executing.java | 16 ++++++++-------- .../scheduler/adaptive/StopWithSavepoint.java | 2 ++ .../scheduler/adaptive/ExecutingTest.java | 5 +++-- .../adaptive/StopWithSavepointTest.java | 6 +++--- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index bad1cc9b0819e..c7da6302b3bfc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1058,7 +1058,7 @@ public Executing.FailureResult howToHandleFailure(Throwable failure) { restartBackoffTimeStrategy.notifyFailure(failure); if (restartBackoffTimeStrategy.canRestart()) { return Executing.FailureResult.canRestart( - Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime())); + failure, Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime())); } else { return Executing.FailureResult.canNotRestart( new JobException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index dcb0366e01063..83beda3273d8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -85,12 +85,14 @@ private void handleAnyFailure(Throwable cause) { final FailureResult failureResult = context.howToHandleFailure(cause); if (failureResult.canRestart()) { + getLogger().info("Restarting job.", failureResult.getFailureCause()); context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), failureResult.getBackoffTime()); } else { + getLogger().info("Failing job.", failureResult.getFailureCause()); context.goToFailing( getExecutionGraph(), getExecutionGraphHandler(), @@ -281,9 +283,9 @@ CompletableFuture goToStopWithSavepoint( static final class FailureResult { @Nullable private final Duration backoffTime; - @Nullable private final Throwable failureCause; + private final Throwable failureCause; - private FailureResult(@Nullable Duration backoffTime, @Nullable Throwable failureCause) { + private FailureResult(Throwable failureCause, @Nullable Duration backoffTime) { this.backoffTime = backoffTime; this.failureCause = failureCause; } @@ -299,20 +301,18 @@ Duration getBackoffTime() { } Throwable getFailureCause() { - Preconditions.checkState( - failureCause != null, - "Failure result must not be restartable to return a failure cause."); return failureCause; } /** * Creates a FailureResult which allows to restart the job. * + * @param failureCause failureCause for restarting the job * @param backoffTime backoffTime to wait before restarting the job * @return FailureResult which allows to restart the job */ - static FailureResult canRestart(Duration backoffTime) { - return new FailureResult(backoffTime, null); + static FailureResult canRestart(Throwable failureCause, Duration backoffTime) { + return new FailureResult(failureCause, backoffTime); } /** @@ -322,7 +322,7 @@ static FailureResult canRestart(Duration backoffTime) { * @return FailureResult which does not allow to restart the job */ static FailureResult canNotRestart(Throwable failureCause) { - return new FailureResult(null, failureCause); + return new FailureResult(failureCause, null); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java index dcde71c25ceb5..ab4f937f74895 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java @@ -179,12 +179,14 @@ private void handleAnyFailure(Throwable cause) { final Executing.FailureResult failureResult = context.howToHandleFailure(cause); if (failureResult.canRestart()) { + getLogger().info("Restarting job.", failureResult.getFailureCause()); context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), failureResult.getBackoffTime()); } else { + getLogger().info("Failing job.", failureResult.getFailureCause()); context.goToFailing( getExecutionGraph(), getExecutionGraphHandler(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index df5f2332846ec..2852bdab7281e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -140,7 +140,7 @@ public void testRecoverableGlobalFailureTransitionsToRestarting() throws Excepti ctx.setExpectRestarting( (restartingArguments -> assertThat(restartingArguments.getBackoffTime(), is(duration)))); - ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(duration)); + ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(t, duration)); exec.handleGlobalFailure(new RuntimeException("Recoverable error")); } } @@ -234,7 +234,8 @@ public void testFailureReportedViaUpdateTaskExecutionStateCausesRestart() throws new ExecutingStateBuilder() .setExecutionGraph(returnsFailedStateExecutionGraph) .build(ctx); - ctx.setHowToHandleFailure((ign) -> Executing.FailureResult.canRestart(Duration.ZERO)); + ctx.setHowToHandleFailure( + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); exec.updateTaskExecutionState(createFailingStateTransition()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java index e0383fa33dcad..e9a1157b430dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java @@ -174,7 +174,7 @@ public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception { StopWithSavepoint sws = createStopWithSavepoint(ctx); ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); @@ -229,7 +229,7 @@ public void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Excepti createStopWithSavepoint(ctx, new StateTrackingMockExecutionGraph()); ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); @@ -277,7 +277,7 @@ public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); From a5f95ae1941afe1fd16f622d5ef18e4dad158562 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 23 Apr 2021 16:12:17 +0200 Subject: [PATCH 2/3] [hotfix] Add debug logging to the states of the AdaptiveScheduler This commit adds debug log statements to the states of the AdaptiveScheduler to log whenever we ignore a global failure. --- .../scheduler/adaptive/AdaptiveScheduler.java | 16 +++++++++++++--- .../runtime/scheduler/adaptive/Canceling.java | 6 +++++- .../runtime/scheduler/adaptive/Failing.java | 6 +++++- .../runtime/scheduler/adaptive/Finished.java | 7 ++++++- .../runtime/scheduler/adaptive/Restarting.java | 6 +++++- .../adaptive/StateWithExecutionGraph.java | 4 ++++ 6 files changed, 38 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index c7da6302b3bfc..9a1a252fcedf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1035,14 +1035,24 @@ private static int getCumulativeParallelism(VertexParallelism potentialNewParall @Override public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) { + + @Nullable + final Throwable optionalFailure = + archivedExecutionGraph.getFailureInfo() != null + ? archivedExecutionGraph.getFailureInfo().getException() + : null; + LOG.info( + "Job {} reached terminal state {}.", + archivedExecutionGraph.getJobID(), + archivedExecutionGraph.getState(), + optionalFailure); + if (jobStatusListener != null) { jobStatusListener.jobStatusChanges( jobInformation.getJobID(), archivedExecutionGraph.getState(), archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()), - archivedExecutionGraph.getFailureInfo() != null - ? archivedExecutionGraph.getFailureInfo().getException() - : null); + optionalFailure); } jobTerminationFuture.complete(archivedExecutionGraph.getState()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java index 54ddabcc7b717..f5d82a1d39996 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java @@ -56,7 +56,11 @@ public void cancel() { @Override public void handleGlobalFailure(Throwable cause) { - // ignore global failures + getLogger() + .debug( + "Ignored global failure because we are already canceling the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java index dc44c3c081dac..c8361617b733c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java @@ -58,7 +58,11 @@ public void cancel() { @Override public void handleGlobalFailure(Throwable cause) { - // nothing to do since we are already failing + getLogger() + .debug( + "Ignored global failure because we are already failing the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java index 7a7c9f236caa7..f36a98b1dd258 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java @@ -54,7 +54,12 @@ public ArchivedExecutionGraph getJob() { } @Override - public void handleGlobalFailure(Throwable cause) {} + public void handleGlobalFailure(Throwable cause) { + logger.debug( + "Ignore global failure because we already finished the job {}.", + archivedExecutionGraph.getJobID(), + cause); + } @Override public Logger getLogger() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index ffd7eb6ed68e4..ed5146ac05b1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -77,7 +77,11 @@ public void cancel() { @Override public void handleGlobalFailure(Throwable cause) { - // don't do anything + getLogger() + .debug( + "Ignored global failure because we are already restarting the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java index a85caac413550..9962c78d60532 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java @@ -112,6 +112,10 @@ ExecutionGraph getExecutionGraph() { return executionGraph; } + JobID getJobId() { + return executionGraph.getJobID(); + } + protected OperatorCoordinatorHandler getOperatorCoordinatorHandler() { return operatorCoordinatorHandler; } From 816be9988521458dc0e2e56c5a0739e3e01efe82 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 23 Apr 2021 17:50:15 +0200 Subject: [PATCH 3/3] [hotfix] Harden against FLINK-21376 by checking for null failure cause In order to harden the AdaptiveScheduler against FLINK-21376, this commit checks whether a task failure cause is null or not. In case of null, it will replace the failure with a generic cause. --- .../apache/flink/runtime/scheduler/adaptive/Executing.java | 7 ++++++- .../runtime/scheduler/adaptive/StopWithSavepoint.java | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index 83beda3273d8a..6b44c51dc9612 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -108,7 +109,11 @@ boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState if (successfulUpdate) { if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { Throwable cause = taskExecutionState.getError(userCodeClassLoader); - handleAnyFailure(cause); + handleAnyFailure( + cause == null + ? new FlinkException( + "Unknown failure cause. Probably related to FLINK-21376.") + : cause); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java index ab4f937f74895..6c8dc4fe9570b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java @@ -147,7 +147,11 @@ boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState if (successfulUpdate) { if (taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED) { Throwable cause = taskExecutionStateTransition.getError(userCodeClassLoader); - handleAnyFailure(cause); + handleAnyFailure( + cause == null + ? new FlinkException( + "Unknown failure cause. Probably related to FLINK-21376.") + : cause); } }