Skip to content

Commit 55db262

Browse files
ajbozarthTom Graves
authored andcommitted
[SPARK-15083][WEB UI] History Server can OOM due to unlimited TaskUIData
## What changes were proposed in this pull request? This is a back port of #14673 addressing merge conflicts in package.scala that prevented a cherry-pick to `branch-2.0` when it was merged to `master` Since the History Server currently loads all application's data it can OOM if too many applications have a significant task count. This trims tasks by `spark.ui.retainedTasks` (default: 100000) ## How was this patch tested? Manual testing and dev/run-tests Author: Alex Bozarth <ajbozart@us.ibm.com> Closes #14794 from ajbozarth/spark15083-branch-2.0.
1 parent 356a359 commit 55db262

9 files changed

Lines changed: 256 additions & 228 deletions

File tree

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,9 @@ package object config {
103103
.stringConf
104104
.checkValues(Set("hive", "in-memory"))
105105
.createWithDefault("in-memory")
106+
107+
// To limit memory usage, we only track information for a fixed number of tasks
108+
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
109+
.intConf
110+
.createWithDefault(100000)
106111
}

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ package org.apache.spark.ui.jobs
1919

2020
import java.util.concurrent.TimeoutException
2121

22-
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
22+
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap, ListBuffer}
2323

2424
import org.apache.spark._
2525
import org.apache.spark.annotation.DeveloperApi
2626
import org.apache.spark.executor.TaskMetrics
2727
import org.apache.spark.internal.Logging
28+
import org.apache.spark.internal.config._
2829
import org.apache.spark.scheduler._
2930
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
3031
import org.apache.spark.storage.BlockManagerId
@@ -93,6 +94,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
9394

9495
val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
9596
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
97+
val retainedTasks = conf.get(UI_RETAINED_TASKS)
9698

9799
// We can test for memory leaks by ensuring that collections that track non-active jobs and
98100
// stages do not grow without bound and that collections for active jobs/stages eventually become
@@ -400,6 +402,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
400402
taskData.updateTaskMetrics(taskMetrics)
401403
taskData.errorMessage = errorMessage
402404

405+
// If Tasks is too large, remove and garbage collect old tasks
406+
if (stageData.taskData.size > retainedTasks) {
407+
stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks)
408+
}
409+
403410
for (
404411
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId);
405412
jobId <- activeJobsDependentOnStage;

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
131131

132132
val stageData = stageDataOption.get
133133
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
134-
val numCompleted = tasks.count(_.taskInfo.finished)
134+
val numCompleted = stageData.numCompleteTasks
135+
val totalTasks = stageData.numActiveTasks +
136+
stageData.numCompleteTasks + stageData.numFailedTasks
137+
val totalTasksNumStr = if (totalTasks == tasks.size) {
138+
s"$totalTasks"
139+
} else {
140+
s"$totalTasks, showing ${tasks.size}"
141+
}
135142

136143
val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
137144
val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal }
@@ -576,7 +583,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
576583
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
577584
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
578585
maybeAccumulableTable ++
579-
<h4 id="tasks-section">Tasks</h4> ++ taskTableHTML ++ jsForScrollingDownToTaskTable
586+
<h4 id="tasks-section">Tasks ({totalTasksNumStr})</h4> ++
587+
taskTableHTML ++ jsForScrollingDownToTaskTable
580588
UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true)
581589
}
582590
}

core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.ui.jobs
1919

2020
import scala.collection.mutable
21-
import scala.collection.mutable.HashMap
21+
import scala.collection.mutable.{HashMap, LinkedHashMap}
2222

2323
import org.apache.spark.JobExecutionStatus
2424
import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
@@ -94,7 +94,7 @@ private[spark] object UIData {
9494
var description: Option[String] = None
9595

9696
var accumulables = new HashMap[Long, AccumulableInfo]
97-
var taskData = new HashMap[Long, TaskUIData]
97+
var taskData = new LinkedHashMap[Long, TaskUIData]
9898
var executorSummary = new HashMap[String, ExecutorSummary]
9999

100100
def hasInput: Boolean = inputBytes > 0

0 commit comments

Comments
 (0)