From b7a47c2e99c5728f9df0678a9f62ce08fd62f9f3 Mon Sep 17 00:00:00 2001 From: Shahid Date: Wed, 14 Nov 2018 03:54:04 +0530 Subject: [PATCH 01/13] Stages page doesn't show the right number of the total tasks --- .../scala/org/apache/spark/status/AppStatusListener.scala | 7 ++++++- .../main/scala/org/apache/spark/status/LiveEntity.scala | 2 ++ .../main/scala/org/apache/spark/status/api/v1/api.scala | 1 + .../blacklisting_for_stage_expectation.json | 1 + .../blacklisting_node_for_stage_expectation.json | 1 + .../one_stage_attempt_json_expectation.json | 1 + .../one_stage_json_expectation.json | 1 + .../stage_with_accumulable_json_expectation.json | 1 + 8 files changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 36aaf67b57298..164423e2f1c43 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -452,6 +452,10 @@ private[spark] class AppStatusListener( maybeUpdate(job, now) } + val esummary = stage.executorSummary(event.taskInfo.executorId) + esummary.activeTasks += 1 + maybeUpdate(esummary, now) + if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) { stage.cleaning = true kvstore.doAsync { @@ -558,6 +562,7 @@ private[spark] class AppStatusListener( } val esummary = stage.executorSummary(event.taskInfo.executorId) + esummary.activeTasks -= 1 esummary.taskTime += event.taskInfo.duration esummary.succeededTasks += completedDelta esummary.failedTasks += failedDelta @@ -565,7 +570,7 @@ private[spark] class AppStatusListener( if (metricsDelta != null) { esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta) } - conditionalLiveUpdate(esummary, now, removeStage) + conditionalLiveUpdate(esummary, now, esummary.activeTasks == 0) if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) { stage.cleaning = true diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 8708e64db3c17..d085da66fea52 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -318,6 +318,7 @@ private class LiveExecutorStageSummary( import LiveEntityHelpers._ var taskTime = 0L + var activeTasks = 0 var succeededTasks = 0 var failedTasks = 0 var killedTasks = 0 @@ -328,6 +329,7 @@ private class LiveExecutorStageSummary( override protected def doUpdate(): Any = { val info = new v1.ExecutorStageSummary( taskTime, + activeTasks, failedTasks, succeededTasks, killedTasks, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 77466b62ff6ed..b17f0187c1b50 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -63,6 +63,7 @@ case class ApplicationAttemptInfo private[spark]( class ExecutorStageSummary private[spark]( val taskTime : Long, + val activeTasks: Int, val failedTasks : Int, val succeededTasks : Int, val killedTasks : Int, diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json index 5e9e8230e2745..0e5a9a874d5da 100644 --- a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json @@ -602,6 +602,7 @@ "executorSummary": { "0": { "taskTime": 589, + "activeTasks" : 0, "failedTasks": 2, "succeededTasks": 0, "killedTasks": 0, diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json index acd4cc53de6cd..01380369fab93 100644 --- a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json @@ -695,6 +695,7 @@ "executorSummary" : { "4" : { "taskTime" : 1589, + "activeTasks" : 0, "failedTasks" : 2, "succeededTasks" : 0, "killedTasks" : 0, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json index 03f886afa5413..83e709006f42d 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json @@ -409,6 +409,7 @@ "executorSummary" : { "" : { "taskTime" : 3624, + "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 8, "killedTasks" : 0, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json index 947c89906955d..c05c946191225 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json @@ -409,6 +409,7 @@ "executorSummary" : { "" : { "taskTime" : 3624, + "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 8, "killedTasks" : 0, diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json index 963f010968b62..0c140b8bdef9f 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json @@ -453,6 +453,7 @@ "executorSummary" : { "" : { "taskTime" : 418, + "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 8, "killedTasks" : 0, From c53ca48885d14dc6a70412fe27a00c8f71aff561 Mon Sep 17 00:00:00 2001 From: Shahid Date: Thu, 15 Nov 2018 06:39:25 +0530 Subject: [PATCH 02/13] update --- .../blacklisting_for_stage_expectation.json | 1 + .../blacklisting_node_for_stage_expectation.json | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json index 0e5a9a874d5da..2a0910923390a 100644 --- a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json @@ -620,6 +620,7 @@ }, "1": { "taskTime": 708, + "activeTasks" : 0, "failedTasks": 0, "succeededTasks": 10, "killedTasks": 0, diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json index 01380369fab93..ab6a9e205eace 100644 --- a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json @@ -713,6 +713,7 @@ }, "5" : { "taskTime" : 1579, + "activeTasks" : 0, "failedTasks" : 2, "succeededTasks" : 0, "killedTasks" : 0, @@ -730,6 +731,7 @@ }, "1" : { "taskTime" : 2411, + "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 4, "killedTasks" : 0, @@ -747,6 +749,7 @@ }, "2" : { "taskTime" : 2446, + "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 5, "killedTasks" : 0, @@ -764,6 +767,7 @@ }, "3" : { "taskTime" : 1774, + "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 1, "killedTasks" : 0, From 805ebb8e6b103cbc0688da64ec27841a1491039f Mon Sep 17 00:00:00 2001 From: Shahid Date: Fri, 16 Nov 2018 07:42:13 +0530 Subject: [PATCH 03/13] address comment --- .../org/apache/spark/status/AppStatusListener.scala | 12 +++++++----- .../scala/org/apache/spark/status/LiveEntity.scala | 4 ++-- .../scala/org/apache/spark/status/api/v1/api.scala | 1 - .../blacklisting_for_stage_expectation.json | 2 -- .../blacklisting_node_for_stage_expectation.json | 5 ----- .../one_stage_attempt_json_expectation.json | 1 - .../one_stage_json_expectation.json | 1 - .../stage_with_accumulable_json_expectation.json | 1 - 8 files changed, 9 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 164423e2f1c43..6246529118f71 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -445,6 +445,7 @@ private[spark] class AppStatusListener( val locality = event.taskInfo.taskLocality.toString() val count = stage.localitySummary.getOrElse(locality, 0L) + 1L stage.localitySummary = stage.localitySummary ++ Map(locality -> count) + stage.activeTaskPerExecutor(event.taskInfo.executorId) += 1 maybeUpdate(stage, now) stage.jobs.foreach { job => @@ -452,9 +453,7 @@ private[spark] class AppStatusListener( maybeUpdate(job, now) } - val esummary = stage.executorSummary(event.taskInfo.executorId) - esummary.activeTasks += 1 - maybeUpdate(esummary, now) + if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) { stage.cleaning = true @@ -534,6 +533,7 @@ private[spark] class AppStatusListener( if (killedDelta > 0) { stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) } + stage.activeTaskPerExecutor(event.taskInfo.executorId) -= 1 // [SPARK-24415] Wait for all tasks to finish before removing stage from live list val removeStage = stage.activeTasks == 0 && @@ -562,7 +562,6 @@ private[spark] class AppStatusListener( } val esummary = stage.executorSummary(event.taskInfo.executorId) - esummary.activeTasks -= 1 esummary.taskTime += event.taskInfo.duration esummary.succeededTasks += completedDelta esummary.failedTasks += failedDelta @@ -570,7 +569,10 @@ private[spark] class AppStatusListener( if (metricsDelta != null) { esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta) } - conditionalLiveUpdate(esummary, now, esummary.activeTasks == 0) + + val isLastTask = stage.activeTaskPerExecutor(event.taskInfo.executorId) == 0 + + conditionalLiveUpdate(esummary, now, isLastTask) if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) { stage.cleaning = true diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index d085da66fea52..135f71605fcb0 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -318,7 +318,6 @@ private class LiveExecutorStageSummary( import LiveEntityHelpers._ var taskTime = 0L - var activeTasks = 0 var succeededTasks = 0 var failedTasks = 0 var killedTasks = 0 @@ -329,7 +328,6 @@ private class LiveExecutorStageSummary( override protected def doUpdate(): Any = { val info = new v1.ExecutorStageSummary( taskTime, - activeTasks, failedTasks, succeededTasks, killedTasks, @@ -378,6 +376,8 @@ private class LiveStage extends LiveEntity { val executorSummaries = new HashMap[String, LiveExecutorStageSummary]() + val activeTaskPerExecutor = new HashMap[String, Int]().withDefaultValue(0) + var blackListedExecutors = new HashSet[String]() // Used for cleanup of tasks after they reach the configured limit. Not written to the store. diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index b17f0187c1b50..77466b62ff6ed 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -63,7 +63,6 @@ case class ApplicationAttemptInfo private[spark]( class ExecutorStageSummary private[spark]( val taskTime : Long, - val activeTasks: Int, val failedTasks : Int, val succeededTasks : Int, val killedTasks : Int, diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json index 2a0910923390a..5e9e8230e2745 100644 --- a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json @@ -602,7 +602,6 @@ "executorSummary": { "0": { "taskTime": 589, - "activeTasks" : 0, "failedTasks": 2, "succeededTasks": 0, "killedTasks": 0, @@ -620,7 +619,6 @@ }, "1": { "taskTime": 708, - "activeTasks" : 0, "failedTasks": 0, "succeededTasks": 10, "killedTasks": 0, diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json index ab6a9e205eace..acd4cc53de6cd 100644 --- a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json @@ -695,7 +695,6 @@ "executorSummary" : { "4" : { "taskTime" : 1589, - "activeTasks" : 0, "failedTasks" : 2, "succeededTasks" : 0, "killedTasks" : 0, @@ -713,7 +712,6 @@ }, "5" : { "taskTime" : 1579, - "activeTasks" : 0, "failedTasks" : 2, "succeededTasks" : 0, "killedTasks" : 0, @@ -731,7 +729,6 @@ }, "1" : { "taskTime" : 2411, - "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 4, "killedTasks" : 0, @@ -749,7 +746,6 @@ }, "2" : { "taskTime" : 2446, - "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 5, "killedTasks" : 0, @@ -767,7 +763,6 @@ }, "3" : { "taskTime" : 1774, - "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 1, "killedTasks" : 0, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json index 83e709006f42d..03f886afa5413 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json @@ -409,7 +409,6 @@ "executorSummary" : { "" : { "taskTime" : 3624, - "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 8, "killedTasks" : 0, diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json index c05c946191225..947c89906955d 100644 --- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json @@ -409,7 +409,6 @@ "executorSummary" : { "" : { "taskTime" : 3624, - "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 8, "killedTasks" : 0, diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json index 0c140b8bdef9f..963f010968b62 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json @@ -453,7 +453,6 @@ "executorSummary" : { "" : { "taskTime" : 418, - "activeTasks" : 0, "failedTasks" : 0, "succeededTasks" : 8, "killedTasks" : 0, From 7c3a80bce0a45131091ce11e80a939e9de6ebf50 Mon Sep 17 00:00:00 2001 From: Shahid Date: Fri, 16 Nov 2018 07:53:17 +0530 Subject: [PATCH 04/13] remove space --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 6246529118f71..5325a417c030e 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -453,8 +453,6 @@ private[spark] class AppStatusListener( maybeUpdate(job, now) } - - if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) { stage.cleaning = true kvstore.doAsync { From cbd885a7602692108728aeb64b8bc227c1e69bdb Mon Sep 17 00:00:00 2001 From: Shahid Date: Sat, 17 Nov 2018 01:51:28 +0530 Subject: [PATCH 05/13] update --- .../scala/org/apache/spark/status/AppStatusListener.scala | 6 +++--- .../src/main/scala/org/apache/spark/status/LiveEntity.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5325a417c030e..7b6b818ce503e 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -445,7 +445,7 @@ private[spark] class AppStatusListener( val locality = event.taskInfo.taskLocality.toString() val count = stage.localitySummary.getOrElse(locality, 0L) + 1L stage.localitySummary = stage.localitySummary ++ Map(locality -> count) - stage.activeTaskPerExecutor(event.taskInfo.executorId) += 1 + stage.activeTasksPerExecutor(event.taskInfo.executorId) += 1 maybeUpdate(stage, now) stage.jobs.foreach { job => @@ -531,7 +531,7 @@ private[spark] class AppStatusListener( if (killedDelta > 0) { stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary) } - stage.activeTaskPerExecutor(event.taskInfo.executorId) -= 1 + stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1 // [SPARK-24415] Wait for all tasks to finish before removing stage from live list val removeStage = stage.activeTasks == 0 && @@ -568,7 +568,7 @@ private[spark] class AppStatusListener( esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta) } - val isLastTask = stage.activeTaskPerExecutor(event.taskInfo.executorId) == 0 + val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0 conditionalLiveUpdate(esummary, now, isLastTask) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 135f71605fcb0..7452ed9f8e19e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -376,7 +376,7 @@ private class LiveStage extends LiveEntity { val executorSummaries = new HashMap[String, LiveExecutorStageSummary]() - val activeTaskPerExecutor = new HashMap[String, Int]().withDefaultValue(0) + val activeTasksPerExecutor = new HashMap[String, Int]().withDefaultValue(0) var blackListedExecutors = new HashSet[String]() From 5b13b7725cc6dff0e6caed1229c05efcf89d9eff Mon Sep 17 00:00:00 2001 From: Shahid Date: Sat, 17 Nov 2018 03:58:59 +0530 Subject: [PATCH 06/13] Add UT --- .../spark/status/AppStatusListenerSuite.scala | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 0b2bbd2fa8a78..e3ec3288f32e8 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1275,6 +1275,53 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(allJobs.head.numFailedStages == 1) } + test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { + + val testConf = conf.clone() + .set("spark.ui.liveUpdate.period", s"${Int.MaxValue}s") + + val listener = new AppStatusListener(store, testConf, true) + + val stage = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") + + listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + + val tasks = createTasks(4, Array("1", "2")) + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) + } + + tasks.filter(_.index < 2).foreach { task => + time += 1 + var execId = (task.index % 2 + 1).toString + tasks(task.index).markFinished(TaskState.FAILED, time) + listener.onTaskEnd( + SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure(execId, true, Some("Lost executor")), tasks(task.index), null)) + } + + stage.failureReason = Some("Failed") + listener.onStageCompleted(SparkListenerStageCompleted(stage)) + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor")))) + + tasks.filter(_.index >= 2).foreach { task => + time += 1 + var execId = (task.index % 2 + 1).toString + tasks(task.index).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure(execId, true, Some("Lost executor")), tasks(task.index), null)) + } + + val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) + + esummary.foreach { + execSummary => assert(execSummary.failedTasks == 2) + } + } + test("driver logs") { val listener = new AppStatusListener(store, conf, true) From 6e1d2fa7a5dbc247ce69c7576ae92c8a2d0fa4c6 Mon Sep 17 00:00:00 2001 From: Shahid Date: Sat, 17 Nov 2018 04:11:42 +0530 Subject: [PATCH 07/13] remove space --- .../org/apache/spark/status/AppStatusListenerSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index e3ec3288f32e8..58fa39418966e 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1276,16 +1276,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { - val testConf = conf.clone() .set("spark.ui.liveUpdate.period", s"${Int.MaxValue}s") val listener = new AppStatusListener(store, testConf, true) - val stage = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1") - + val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details") listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) - listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) val tasks = createTasks(4, Array("1", "2")) @@ -1316,7 +1313,6 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) - esummary.foreach { execSummary => assert(execSummary.failedTasks == 2) } From 50cc7620b3459589bf8a027a71d9197ee1e269e8 Mon Sep 17 00:00:00 2001 From: Shahid Date: Sat, 17 Nov 2018 04:40:09 +0530 Subject: [PATCH 08/13] minor nit --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 7b6b818ce503e..09176837be80b 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -568,7 +568,8 @@ private[spark] class AppStatusListener( esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta) } - val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0 + val isLastTask = (stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0) && + ((stage.status == v1.StageStatus.COMPLETE) || (stage.status == v1.StageStatus.FAILED)) conditionalLiveUpdate(esummary, now, isLastTask) From 43850357b086796368532dc70f7adacccfa23b45 Mon Sep 17 00:00:00 2001 From: Shahid Date: Sat, 17 Nov 2018 12:12:02 +0530 Subject: [PATCH 09/13] address comments --- .../spark/status/AppStatusListenerSuite.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 58fa39418966e..d7a00d9d519d3 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1276,8 +1276,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { - val testConf = conf.clone() - .set("spark.ui.liveUpdate.period", s"${Int.MaxValue}s") + val testConf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) val listener = new AppStatusListener(store, testConf, true) @@ -1290,31 +1289,32 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) } - tasks.filter(_.index < 2).foreach { task => - time += 1 - var execId = (task.index % 2 + 1).toString - tasks(task.index).markFinished(TaskState.FAILED, time) - listener.onTaskEnd( - SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", - ExecutorLostFailure(execId, true, Some("Lost executor")), tasks(task.index), null)) - } + time += 1 + tasks(0).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(0), null)) + time += 1 + tasks(1).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("2", true, Some("Lost executor")), tasks(1), null)) stage.failureReason = Some("Failed") listener.onStageCompleted(SparkListenerStageCompleted(stage)) time += 1 listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor")))) - tasks.filter(_.index >= 2).foreach { task => - time += 1 - var execId = (task.index % 2 + 1).toString - tasks(task.index).markFinished(TaskState.FAILED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", - ExecutorLostFailure(execId, true, Some("Lost executor")), tasks(task.index), null)) - } + time += 1 + tasks(2).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null)) + time += 1 + tasks(3).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", + ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null)) val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) - esummary.foreach { - execSummary => assert(execSummary.failedTasks == 2) + esummary.foreach { execSummary => + assert(execSummary.failedTasks == 2) } } From ecac386acd671037c21c5f23765ec4b164d87413 Mon Sep 17 00:00:00 2001 From: Shahid Date: Sat, 17 Nov 2018 13:37:57 +0530 Subject: [PATCH 10/13] address comments --- .../main/scala/org/apache/spark/status/AppStatusListener.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 09176837be80b..7b6b818ce503e 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -568,8 +568,7 @@ private[spark] class AppStatusListener( esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta) } - val isLastTask = (stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0) && - ((stage.status == v1.StageStatus.COMPLETE) || (stage.status == v1.StageStatus.FAILED)) + val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0 conditionalLiveUpdate(esummary, now, isLastTask) From dca941d316543526ea429c2b6a993c2252d09fd6 Mon Sep 17 00:00:00 2001 From: Shahid Date: Sat, 17 Nov 2018 18:41:44 +0530 Subject: [PATCH 11/13] Update for history events --- .../apache/spark/status/AppStatusListener.scala | 14 ++++++++++++-- .../spark/status/AppStatusListenerSuite.scala | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 7b6b818ce503e..4244bf1d526a5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -556,7 +556,11 @@ private[spark] class AppStatusListener( if (killedDelta > 0) { job.killedSummary = killedTasksSummary(event.reason, job.killedSummary) } - conditionalLiveUpdate(job, now, removeStage) + if (removeStage) { + update(job, now) + } else { + maybeUpdate(job, now) + } } val esummary = stage.executorSummary(event.taskInfo.executorId) @@ -570,7 +574,13 @@ private[spark] class AppStatusListener( val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0 - conditionalLiveUpdate(esummary, now, isLastTask) + // If the last task of the executor finished, then update the esummary + // for both live and history events. + if (isLastTask) { + update(esummary, now) + } else { + maybeUpdate(esummary, now) + } if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) { stage.cleaning = true diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index d7a00d9d519d3..ea64f07e518bb 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1276,7 +1276,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { - val testConf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + val testConf = new SparkConf().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) val listener = new AppStatusListener(store, testConf, true) From a21bc0c3a24a468bf8147c7ee6f7ef12e384c454 Mon Sep 17 00:00:00 2001 From: Shahid Date: Sat, 17 Nov 2018 19:09:29 +0530 Subject: [PATCH 12/13] minor nit --- .../scala/org/apache/spark/status/AppStatusListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index ea64f07e518bb..8d504a09efa44 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1276,7 +1276,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { - val testConf = new SparkConf().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) val listener = new AppStatusListener(store, testConf, true) From ed8501667745f8bf606c37ae88ff8b47223124a8 Mon Sep 17 00:00:00 2001 From: Shahid Date: Tue, 20 Nov 2018 00:27:31 +0530 Subject: [PATCH 13/13] update test --- .../apache/spark/status/AppStatusListenerSuite.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 8d504a09efa44..59265fb16537c 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1290,13 +1290,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } time += 1 - tasks(0).markFinished(TaskState.FAILED, time) + tasks(0).markFinished(TaskState.FINISHED, time) listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", - ExecutorLostFailure("1", true, Some("Lost executor")), tasks(0), null)) + Success, tasks(0), null)) time += 1 - tasks(1).markFinished(TaskState.FAILED, time) + tasks(1).markFinished(TaskState.FINISHED, time) listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", - ExecutorLostFailure("2", true, Some("Lost executor")), tasks(1), null)) + Success, tasks(1), null)) stage.failureReason = Some("Failed") listener.onStageCompleted(SparkListenerStageCompleted(stage)) @@ -1314,7 +1314,9 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) esummary.foreach { execSummary => - assert(execSummary.failedTasks == 2) + assert(execSummary.failedTasks === 1) + assert(execSummary.succeededTasks === 1) + assert(execSummary.killedTasks === 0) } }