-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25451][SPARK-26100][CORE]Aggregated metrics table doesn't show the right number of the total tasks #23038
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #98840 has finished for PR 23038 at commit
|
|
I'll update the PR, for test failures |
ed98958 to
b7a47c2
Compare
|
retest this please |
|
Test build #98849 has finished for PR 23038 at commit
|
|
Retest this please |
|
Test build #98850 has finished for PR 23038 at commit
|
|
Test build #98856 has finished for PR 23038 at commit
|
|
retest this please |
|
Test build #98863 has finished for PR 23038 at commit
|
|
|
||
| class ExecutorStageSummary private[spark]( | ||
| val taskTime : Long, | ||
| val activeTasks: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to expose this in the public API to fix the bug, do you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @vanzin for the review.
Actually my objective is to get the last task of the particular executorId of the stage. If corresponding activeTasks == 0, then force update in the kvstore.
In stages, jobs, exec has "activeTasks" and using the parameter, it force update on the last task.
| conditionalLiveUpdate(exec, now, exec.activeTasks == 0) |
| stage.activeTasks == 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't answer my question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I will try without exposing in the public API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @vanzin , I have modified based your comment. Kindly review
0d92185 to
805ebb8
Compare
|
Test build #98893 has finished for PR 23038 at commit
|
|
Test build #98895 has finished for PR 23038 at commit
|
|
Test build #98894 has finished for PR 23038 at commit
|
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you had before was fine and probably faster than doing multiple hash lookups like this.
You just did not need to change the public API at all.
Also, is there a unit test that can be written? IIRC the unit tests disable the conditional updates so it may be hard to add one.
|
|
||
| val executorSummaries = new HashMap[String, LiveExecutorStageSummary]() | ||
|
|
||
| val activeTaskPerExecutor = new HashMap[String, Int]().withDefaultValue(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activeTasksPerExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add one UT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @vanzin I have added a UT. Kindly review
93181aa to
5b13b77
Compare
8109396 to
50cc762
Compare
|
|
||
| test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { | ||
| val testConf = conf.clone() | ||
| .set("spark.ui.liveUpdate.period", s"${Int.MaxValue}s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the config constant, like the existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
|
|
||
| tasks.filter(_.index < 2).foreach { task => | ||
| time += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whole block is indented too far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code
| listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor")))) | ||
|
|
||
| tasks.filter(_.index >= 2).foreach { task => | ||
| time += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
|
|
||
| val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) | ||
| esummary.foreach { | ||
| execSummary => assert(execSummary.failedTasks == 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep execSummary => in the previous line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| conditionalLiveUpdate(esummary, now, removeStage) | ||
|
|
||
| val isLastTask = (stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0) && | ||
| ((stage.status == v1.StageStatus.COMPLETE) || (stage.status == v1.StageStatus.FAILED)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this extra condition is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue occurs, when the taskEvent comes after stageEnd. Because during 'OnStageCompletd' event, we are writing all the esummary to the store. So, 'OnTaskEnd' method, we just need to force write only if the stageCompleted event already have happened.
Yes. the stageCompleted check isn't really required, as here we only update on the last task of each executors. I updated the code
|
Test build #98931 has finished for PR 23038 at commit
|
|
Test build #98935 has finished for PR 23038 at commit
|
|
Test build #98956 has finished for PR 23038 at commit
|
|
Hi @vanzin , In History server, both 'Jobs' table and 'Aggregated Metrics' table shows incorrect total tasks. I have modified the code, so that the issue will not happen for both History and live UI. |
ad30c36 to
dca941d
Compare
|
Test build #98968 has finished for PR 23038 at commit
|
|
It is random failure |
|
Test build #98967 has finished for PR 23038 at commit
|
|
Test build #98966 has finished for PR 23038 at commit
|
|
|
||
| val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) | ||
| esummary.foreach { execSummary => | ||
| assert(execSummary.failedTasks == 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: also check succeededTasks and killedTasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gengliangwang , I updated the test.
|
Test build #99012 has finished for PR 23038 at commit
|
|
It is a random failure. Jenkins, retest this please |
|
Test build #99025 has finished for PR 23038 at commit
|
|
@vanzin could you please check the updated changes, thanks |
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok. conditionalLiveUpdate has a single call after your changes, so it feels like it should be cleaned up since apparently it has issues that might also affect the remaining call. Anyway, that can be a separate bug.
merging to master (will fix the indentation before pushing).
| // If the last task of the executor finished, then update the esummary | ||
| // for both live and history events. | ||
| if (isLastTask) { | ||
| update(esummary, now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation is off
…w the right number of the total tasks
Total tasks in the aggregated table and the tasks table are not matching some times in the WEBUI.
We need to force update the executor summary of the particular executorId, when ever last task of that executor has reached. Currently it force update based on last task on the stage end. So, for some particular executorId task might miss at the stage end.
Tests to reproduce:
```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect()
```
Before patch:

After patch:

Closes #23038 from shahidki31/SPARK-25451.
Authored-by: Shahid <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit fbf62b7)
Signed-off-by: Marcelo Vanzin <[email protected]>
|
(Also merged to 2.4.) |
|
Thank you @vanzin |
|
This seems to have broken the master build, probably some other change that happened since this was last tested. Will send a follow up. |
|
Oh. I could have re based and tested in local. Thanks @vanzin for the fix. |
…w the right number of the total tasks
Total tasks in the aggregated table and the tasks table are not matching some times in the WEBUI.
We need to force update the executor summary of the particular executorId, when ever last task of that executor has reached. Currently it force update based on last task on the stage end. So, for some particular executorId task might miss at the stage end.
Tests to reproduce:
```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect()
```
Before patch:

After patch:

Closes apache#23038 from shahidki31/SPARK-25451.
Authored-by: Shahid <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
…obs in the history server UI The root cause of the problem is, whenever the taskEnd event comes after stageCompleted event, execSummary is updating only for live UI. we need to update for history UI too. To see the previous discussion, refer: PR for apache#23038, https://issues.apache.org/jira/browse/SPARK-26100. Added UT. Manually verified Test step to reproduce: ``` bin/spark-shell --master yarn --conf spark.executor.instances=3 sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect() ``` Open Executors page from the History UI Before patch:  After patch:  Closes apache#23181 from shahidki31/executorUpdate. Authored-by: Shahid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
…w the right number of the total tasks
Total tasks in the aggregated table and the tasks table are not matching some times in the WEBUI.
We need to force update the executor summary of the particular executorId, when ever last task of that executor has reached. Currently it force update based on last task on the stage end. So, for some particular executorId task might miss at the stage end.
Tests to reproduce:
```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect()
```
Before patch:

After patch:

Closes apache#23038 from shahidki31/SPARK-25451.
Authored-by: Shahid <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit fbf62b7)
Signed-off-by: Marcelo Vanzin <[email protected]>
…w the right number of the total tasks
Total tasks in the aggregated table and the tasks table are not matching some times in the WEBUI.
We need to force update the executor summary of the particular executorId, when ever last task of that executor has reached. Currently it force update based on last task on the stage end. So, for some particular executorId task might miss at the stage end.
Tests to reproduce:
```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect()
```
Before patch:

After patch:

Closes apache#23038 from shahidki31/SPARK-25451.
Authored-by: Shahid <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit fbf62b7)
Signed-off-by: Marcelo Vanzin <[email protected]>


What changes were proposed in this pull request?
Total tasks in the aggregated table and the tasks table are not matching some times in the WEBUI.
We need to force update the executor summary of the particular executorId, when ever last task of that executor has reached. Currently it force update based on last task on the stage end. So, for some particular executorId task might miss at the stage end.
How was this patch tested?
Tests to reproduce:
Before patch:

After patch:
