Skip to content

Commit 6dcafa7

Browse files
Nathan KronenfeldJoshRosen
authored andcommitted
[SPARK-4772] Clear local copies of accumulators as soon as we're done with them
Accumulators keep thread-local copies of themselves. These copies were only cleared at the beginning of a task. This meant that (a) the memory they used was tied up until the next task ran on that thread, and (b) if a thread died, the memory it had used for accumulators was locked up forever on that worker. This PR clears the thread-local copies of accumulators at the end of each task, in the tasks finally block, to make sure they are cleaned up between tasks. It also stores them in a ThreadLocal object, so that if, for some reason, the thread dies, any memory they are using at the time should be freed up. Author: Nathan Kronenfeld <[email protected]> Closes #3570 from nkronenfeld/Accumulator-Improvements and squashes the following commits: a581f3f [Nathan Kronenfeld] Change Accumulators to private[spark] instead of adding mima exclude to get around false positive in mima tests b6c2180 [Nathan Kronenfeld] Include MiMa exclude as per build error instructions - this version incompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark. 537baad [Nathan Kronenfeld] Fuller refactoring as intended, incorporating JR's suggestions for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task. 39a82f2 [Nathan Kronenfeld] Clear local copies of accumulators as soon as we're done with them (cherry picked from commit 94b377f) Signed-off-by: Josh Rosen <[email protected]> Conflicts: core/src/main/scala/org/apache/spark/Accumulators.scala core/src/main/scala/org/apache/spark/executor/Executor.scala
1 parent 9b99237 commit 6dcafa7

2 files changed

Lines changed: 10 additions & 7 deletions

File tree

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.io.{ObjectInputStream, Serializable}
21+
import java.lang.ThreadLocal
2122

2223
import scala.collection.generic.Growable
2324
import scala.collection.mutable.Map
@@ -246,10 +247,12 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
246247

247248
// TODO: The multi-thread support in accumulators is kind of lame; check
248249
// if there's a more intuitive way of doing it right
249-
private object Accumulators {
250+
private[spark] object Accumulators {
250251
// TODO: Use soft references? => need to make readObject work properly then
251252
val originals = Map[Long, Accumulable[_, _]]()
252-
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
253+
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
254+
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
255+
}
253256
var lastId: Long = 0
254257

255258
def newId: Long = synchronized {
@@ -261,22 +264,21 @@ private object Accumulators {
261264
if (original) {
262265
originals(a.id) = a
263266
} else {
264-
val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
265-
accums(a.id) = a
267+
localAccums.get()(a.id) = a
266268
}
267269
}
268270

269271
// Clear the local (non-original) accumulators for the current thread
270272
def clear() {
271273
synchronized {
272-
localAccums.remove(Thread.currentThread)
274+
localAccums.get.clear
273275
}
274276
}
275277

276278
// Get the values of the local accumulators for the current thread (by ID)
277279
def values: Map[Long, Any] = synchronized {
278280
val ret = Map[Long, Any]()
279-
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
281+
for ((id, accum) <- localAccums.get) {
280282
ret(id) = accum.localValue
281283
}
282284
return ret

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ private[spark] class Executor(
154154

155155
try {
156156
SparkEnv.set(env)
157-
Accumulators.clear()
158157
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
159158
updateDependencies(taskFiles, taskJars)
160159
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
@@ -258,6 +257,8 @@ private[spark] class Executor(
258257
env.shuffleMemoryManager.releaseMemoryForThisThread()
259258
// Release memory used by this thread for unrolling blocks
260259
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
260+
// Release memory used by this thread for accumulators
261+
Accumulators.clear()
261262
runningTasks.remove(taskId)
262263
}
263264
}

0 commit comments

Comments
 (0)