@@ -19,7 +19,7 @@ package org.apache.spark
1919
2020import java .lang .ref .{ReferenceQueue , WeakReference }
2121import java .util .Collections
22- import java .util .concurrent .{ConcurrentHashMap , ConcurrentLinkedQueue , ScheduledExecutorService , TimeUnit }
22+ import java .util .concurrent .{ConcurrentHashMap , ConcurrentLinkedQueue , ExecutorService , ScheduledExecutorService , TimeUnit }
2323
2424import scala .collection .JavaConverters ._
2525
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
2828import org .apache .spark .rdd .{RDD , ReliableRDDCheckpointData }
2929import org .apache .spark .util .{AccumulatorContext , AccumulatorV2 , ThreadUtils , Utils }
3030
31+
3132/**
3233 * Classes that represent cleaning tasks.
3334 */
@@ -112,6 +113,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
112113 private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
113114 " spark.cleaner.referenceTracking.blocking.shuffle" , false )
114115
116+ /**
117+ * The cleaning thread size.
118+ */
119+ private val cleanupTaskThreads = sc.conf.getInt(
120+ " spark.cleaner.referenceTracking.cleanupThreadNumber" , 100 )
121+
122+ private val cleanupExecutorPool : ExecutorService =
123+ ThreadUtils .newDaemonFixedThreadPool(cleanupTaskThreads, " cleanup" )
124+
115125 @ volatile private var stopped = false
116126
117127 /** Attach a listener object to get information of when objects are cleaned. */
@@ -178,32 +188,41 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
178188 private def keepCleaning (): Unit = Utils .tryOrStopSparkContext(sc) {
179189 while (! stopped) {
180190 try {
181- val reference = Option (referenceQueue.remove(ContextCleaner .REF_QUEUE_POLL_TIMEOUT ))
182- .map(_.asInstanceOf [CleanupTaskWeakReference ])
183- // Synchronize here to avoid being interrupted on stop()
184- synchronized {
185- reference.foreach { ref =>
186- logDebug(" Got cleaning task " + ref.task)
187- referenceBuffer.remove(ref)
188- ref.task match {
189- case CleanRDD (rddId) =>
190- doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
191- case CleanShuffle (shuffleId) =>
192- doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
193- case CleanBroadcast (broadcastId) =>
194- doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
195- case CleanAccum (accId) =>
196- doCleanupAccum(accId, blocking = blockOnCleanupTasks)
197- case CleanCheckpoint (rddId) =>
198- doCleanCheckpoint(rddId)
199- }
191+ Option (referenceQueue.remove(ContextCleaner .REF_QUEUE_POLL_TIMEOUT ))
192+ .map(_.asInstanceOf [CleanupTaskWeakReference ]).foreach {
193+ r =>
194+ referenceBuffer.remove(r)
195+ runtCleanTask(r)
196+ }
197+ } catch {
198+ case ie : InterruptedException if stopped => // ignore
199+ case e : Exception => logError(" Error in cleaning main thread" , e)
200+ }
201+ }
202+ }
203+
204+ private def runtCleanTask (ref : CleanupTaskWeakReference ) = {
205+ cleanupExecutorPool.submit(new Runnable {
206+ override def run (): Unit = {
207+ try {
208+ ref.task match {
209+ case CleanRDD (rddId) =>
210+ doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
211+ case CleanShuffle (shuffleId) =>
212+ doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
213+ case CleanBroadcast (broadcastId) =>
214+ doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
215+ case CleanAccum (accId) =>
216+ doCleanupAccum(accId, blocking = blockOnCleanupTasks)
217+ case CleanCheckpoint (rddId) =>
218+ doCleanCheckpoint(rddId)
200219 }
220+ } catch {
221+ case ie : InterruptedException if stopped => // ignore
222+ case e : Exception => logError(" Error in cleaning thread" , e)
201223 }
202- } catch {
203- case ie : InterruptedException if stopped => // ignore
204- case e : Exception => logError(" Error in cleaning thread" , e)
205224 }
206- }
225+ })
207226 }
208227
209228 /** Perform RDD cleanup. */
0 commit comments