-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of TaskInfo.accumulables()
#44321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
664e06d
0b56a28
c6fc226
72e9345
a60eaca
86e47af
6773750
cd80408
aeed0e2
e72830b
8cbe951
03cedb4
65c327f
0bffbd2
1e27935
aca9329
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -258,6 +258,9 @@ private[spark] class TaskSetManager( | |
|
|
||
| private[scheduler] var emittedTaskSizeWarning = false | ||
|
|
||
| private[scheduler] val dropTaskInfoAccumulablesOnTaskCompletion = | ||
| conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION) | ||
|
|
||
| /** Add a task to all the pending-task lists that it should be on. */ | ||
| private[spark] def addPendingTask( | ||
| index: Int, | ||
|
|
@@ -787,6 +790,11 @@ private[spark] class TaskSetManager( | |
| // SPARK-37300: when the task was already finished state, just ignore it, | ||
| // so that there won't cause successful and tasksSuccessful wrong result. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading this comment, the partition is already completed, probably by another
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this branch is handling a rare corner-case where the same |
||
| if(info.finished) { | ||
| if (dropTaskInfoAccumulablesOnTaskCompletion) { | ||
| // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable | ||
| // lifetime. | ||
| info.setAccumulables(Nil) | ||
| } | ||
| return | ||
| } | ||
| val index = info.index | ||
|
|
@@ -804,6 +812,8 @@ private[spark] class TaskSetManager( | |
| // Handle this task as a killed task | ||
| handleFailedTask(tid, TaskState.KILLED, | ||
| TaskKilled("Finish but did not commit due to another attempt succeeded")) | ||
| // SPARK-46383: Not clearing the accumulables here because they are already cleared in | ||
| // handleFailedTask. | ||
| return | ||
| } | ||
|
|
||
|
|
@@ -846,11 +856,50 @@ private[spark] class TaskSetManager( | |
| // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. | ||
| // Note: "result.value()" only deserializes the value when it's called at the first time, so | ||
| // here "result.value()" just returns the value and won't block other threads. | ||
| sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, | ||
| result.metricPeaks, info) | ||
|
|
||
| emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(), | ||
| result.accumUpdates, result.metricPeaks) | ||
| maybeFinishTaskSet() | ||
| } | ||
|
|
||
| /** | ||
| * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the | ||
| * TaskInfo object, corresponding to the completed task, referenced by this class. | ||
| * | ||
| * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only | ||
| * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo | ||
| * object that corresponds to the completed task. | ||
| * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo | ||
| * objects held by this class are long-lived and have a heavy memory footprint on the driver. | ||
| * | ||
| * This is safe as the TaskInfo accumulables are not needed once they are shipped to the | ||
| * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a | ||
| * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the | ||
| * DAGScheduler on multiple events during the task's lifetime. Users can install | ||
| * SparkListeners that compare the TaskInfo objects across these SparkListener events and | ||
| * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo | ||
| * object. | ||
| */ | ||
| private def emptyTaskInfoAccumulablesAndNotifyDagScheduler( | ||
| taskId: Long, | ||
| task: Task[_], | ||
| reason: TaskEndReason, | ||
| result: Any, | ||
| accumUpdates: Seq[AccumulatorV2[_, _]], | ||
| metricPeaks: Array[Long]): Unit = { | ||
| val taskInfoWithAccumulables = taskInfos(taskId); | ||
| if (dropTaskInfoAccumulablesOnTaskCompletion) { | ||
| val index = taskInfoWithAccumulables.index | ||
| val clonedTaskInfo = taskInfoWithAccumulables.cloneWithEmptyAccumulables() | ||
| // Update this task's taskInfo while preserving its position in the list | ||
| taskAttempts(index) = | ||
| taskAttempts(index).map { i => if (i eq taskInfoWithAccumulables) clonedTaskInfo else i } | ||
| taskInfos(taskId) = clonedTaskInfo | ||
| } | ||
| sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, | ||
| taskInfoWithAccumulables) | ||
| } | ||
|
|
||
| private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { | ||
| partitionToIndex.get(partitionId).foreach { index => | ||
| if (!successful(index)) { | ||
|
|
@@ -874,6 +923,11 @@ private[spark] class TaskSetManager( | |
| // SPARK-37300: when the task was already finished state, just ignore it, | ||
| // so that there won't cause copiesRunning wrong result. | ||
| if (info.finished) { | ||
| if (dropTaskInfoAccumulablesOnTaskCompletion) { | ||
| // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable | ||
| // lifetime. | ||
| info.setAccumulables(Nil) | ||
| } | ||
| return | ||
| } | ||
| removeRunningTask(tid) | ||
|
|
@@ -908,7 +962,8 @@ private[spark] class TaskSetManager( | |
| if (ef.className == classOf[NotSerializableException].getName) { | ||
| // If the task result wasn't serializable, there's no point in trying to re-execute it. | ||
| logError(s"$task had a not serializable result: ${ef.description}; not retrying") | ||
| sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, metricPeaks, info) | ||
| emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, | ||
| accumUpdates, metricPeaks) | ||
| abort(s"$task had a not serializable result: ${ef.description}") | ||
| return | ||
| } | ||
|
|
@@ -917,7 +972,8 @@ private[spark] class TaskSetManager( | |
| // re-execute it. | ||
| logError("Task %s in stage %s (TID %d) can not write to output file: %s; not retrying" | ||
| .format(info.id, taskSet.id, tid, ef.description)) | ||
| sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, metricPeaks, info) | ||
| emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, | ||
| accumUpdates, metricPeaks) | ||
| abort("Task %s in stage %s (TID %d) can not write to output file: %s".format( | ||
| info.id, taskSet.id, tid, ef.description)) | ||
| return | ||
|
|
@@ -970,7 +1026,8 @@ private[spark] class TaskSetManager( | |
| isZombie = true | ||
| } | ||
|
|
||
| sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, metricPeaks, info) | ||
| emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), reason, null, | ||
| accumUpdates, metricPeaks) | ||
|
|
||
| if (!isZombie && reason.countTowardsTaskFailures) { | ||
| assert (null != failureReason) | ||
|
|
@@ -1086,8 +1143,8 @@ private[spark] class TaskSetManager( | |
| addPendingTask(index) | ||
| // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our | ||
| // stage finishes when a total of tasks.size tasks finish. | ||
| sched.dagScheduler.taskEnded( | ||
| tasks(index), Resubmitted, null, Seq.empty, Array.empty, info) | ||
| emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, | ||
| tasks(index), Resubmitted, null, Seq.empty, Array.empty) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.spark.scheduler | ||
|
|
||
| import java.io.{Externalizable, ObjectInput, ObjectOutput} | ||
| import java.util.{Collections, IdentityHashMap} | ||
| import java.util.concurrent.Semaphore | ||
|
|
||
| import scala.collection.mutable | ||
|
|
@@ -289,6 +290,19 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match | |
| stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true} | ||
| } | ||
|
|
||
| test("SPARK-46383: Track TaskInfo objects") { | ||
| // Test that the same TaskInfo object is sent to the `DAGScheduler` in the `onTaskStart` and | ||
| // `onTaskEnd` events. | ||
| val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true) | ||
| sc = new SparkContext("local", "SparkListenerSuite", conf) | ||
| val listener = new SaveActiveTaskInfos | ||
| sc.addSparkListener(listener) | ||
| val rdd1 = sc.parallelize(1 to 100, 4) | ||
| sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1)) | ||
| sc.listenerBus.waitUntilEmpty() | ||
| listener.taskInfos.size should be { 0 } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I follow this test, what is it trying to do ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test asserts that the same
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't that not simply an implementation detail ? (for ex, the resubmission case would break it) I dont see a harm is keeping it, but want to make sure I am not missing something here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't mind dropping it. I was just trying to assert one of the ways SparkListeners could be used. The test is more of a general test to ensure that we preserve the behavior of SparkListeners
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Functionally that (the right task info is in the event) should be covered already (in use of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| test("local metrics") { | ||
| sc = new SparkContext("local", "SparkListenerSuite") | ||
| val listener = new SaveStageAndTaskInfo | ||
|
|
@@ -643,6 +657,27 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * A simple listener that tracks task infos for all active tasks. | ||
| */ | ||
| private class SaveActiveTaskInfos extends SparkListener { | ||
| // Use a set based on IdentityHashMap instead of a HashSet to track unique references of | ||
| // TaskInfo objects. | ||
| val taskInfos = Collections.newSetFromMap[TaskInfo](new IdentityHashMap) | ||
|
|
||
| override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { | ||
| val info = taskStart.taskInfo | ||
| if (info != null) { | ||
| taskInfos.add(info) | ||
| } | ||
| } | ||
|
|
||
| override def onTaskEnd(task: SparkListenerTaskEnd): Unit = { | ||
| val info = task.taskInfo | ||
| taskInfos.remove(info) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A simple listener that saves the task indices for all task events. | ||
| */ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.