@@ -32,6 +32,7 @@ private sealed trait CleanupTask
3232private case class CleanRDD (rddId : Int ) extends CleanupTask
3333private case class CleanShuffle (shuffleId : Int ) extends CleanupTask
3434private case class CleanBroadcast (broadcastId : Long ) extends CleanupTask
35+ private case class CleanAccum (accId : Long ) extends CleanupTask
3536
3637/**
3738 * A WeakReference associated with a CleanupTask.
@@ -104,16 +105,30 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
104105 cleaningThread.start()
105106 }
106107
107- /** Stop the cleaner. */
108+ /**
109+ * Stop the cleaning thread and wait until the thread has finished running its current task.
110+ */
108111 def stop () {
109112 stopped = true
113+ // Interrupt the cleaning thread, but wait until the current task has finished before
114+ // doing so. This guards against the race condition where a cleaning thread may
115+ // potentially clean similarly named variables created by a different SparkContext,
116+ // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
117+ synchronized {
118+ cleaningThread.interrupt()
119+ }
120+ cleaningThread.join()
110121 }
111122
112123 /** Register a RDD for cleanup when it is garbage collected. */
113124 def registerRDDForCleanup (rdd : RDD [_]) {
114125 registerForCleanup(rdd, CleanRDD (rdd.id))
115126 }
116127
128+ def registerAccumulatorForCleanup (a : Accumulable [_, _]): Unit = {
129+ registerForCleanup(a, CleanAccum (a.id))
130+ }
131+
117132 /** Register a ShuffleDependency for cleanup when it is garbage collected. */
118133 def registerShuffleForCleanup (shuffleDependency : ShuffleDependency [_, _, _]) {
119134 registerForCleanup(shuffleDependency, CleanShuffle (shuffleDependency.shuffleId))
@@ -135,19 +150,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
135150 try {
136151 val reference = Option (referenceQueue.remove(ContextCleaner .REF_QUEUE_POLL_TIMEOUT ))
137152 .map(_.asInstanceOf [CleanupTaskWeakReference ])
138- reference.map(_.task).foreach { task =>
139- logDebug(" Got cleaning task " + task)
140- referenceBuffer -= reference.get
141- task match {
142- case CleanRDD (rddId) =>
143- doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
144- case CleanShuffle (shuffleId) =>
145- doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
146- case CleanBroadcast (broadcastId) =>
147- doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
153+ // Synchronize here to avoid being interrupted on stop()
154+ synchronized {
155+ reference.map(_.task).foreach { task =>
156+ logDebug(" Got cleaning task " + task)
157+ referenceBuffer -= reference.get
158+ task match {
159+ case CleanRDD (rddId) =>
160+ doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
161+ case CleanShuffle (shuffleId) =>
162+ doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
163+ case CleanBroadcast (broadcastId) =>
164+ doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
165+ case CleanAccum (accId) =>
166+ doCleanupAccum(accId, blocking = blockOnCleanupTasks)
167+ }
148168 }
149169 }
150170 } catch {
171+ case ie : InterruptedException if stopped => // ignore
151172 case e : Exception => logError(" Error in cleaning thread" , e)
152173 }
153174 }
@@ -181,15 +202,27 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
181202 /** Perform broadcast cleanup. */
182203 def doCleanupBroadcast (broadcastId : Long , blocking : Boolean ) {
183204 try {
184- logDebug(" Cleaning broadcast " + broadcastId)
205+ logDebug(s " Cleaning broadcast $ broadcastId" )
185206 broadcastManager.unbroadcast(broadcastId, true , blocking)
186207 listeners.foreach(_.broadcastCleaned(broadcastId))
187- logInfo( " Cleaned broadcast " + broadcastId)
208+ logDebug( s " Cleaned broadcast $ broadcastId" )
188209 } catch {
189210 case e : Exception => logError(" Error cleaning broadcast " + broadcastId, e)
190211 }
191212 }
192213
214+ /** Perform accumulator cleanup. */
215+ def doCleanupAccum (accId : Long , blocking : Boolean ) {
216+ try {
217+ logDebug(" Cleaning accumulator " + accId)
218+ Accumulators .remove(accId)
219+ listeners.foreach(_.accumCleaned(accId))
220+ logInfo(" Cleaned accumulator " + accId)
221+ } catch {
222+ case e : Exception => logError(" Error cleaning accumulator " + accId, e)
223+ }
224+ }
225+
193226 private def blockManagerMaster = sc.env.blockManager.master
194227 private def broadcastManager = sc.env.broadcastManager
195228 private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf [MapOutputTrackerMaster ]
@@ -206,4 +239,5 @@ private[spark] trait CleanerListener {
206239 def rddCleaned (rddId : Int )
207240 def shuffleCleaned (shuffleId : Int )
208241 def broadcastCleaned (broadcastId : Long )
242+ def accumCleaned (accId : Long )
209243}
0 commit comments