Skip to content
Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 69 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.lang.ref.{ReferenceQueue, WeakReference}
import java.lang.reflect.Field

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

Expand Down Expand Up @@ -64,12 +65,38 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

/**
* Keep track of the reference queue length and log an error if this exceeds a certain capacity.
* Unfortunately, Java's ReferenceQueue exposes neither the queue length nor the enqueue method,
* so we have to do this through reflection. This is expensive, however, so we should access
* this field only once in a while.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes will not solve the problem here. see.
BlockManagerMasterActor.scala#L165

  private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
    // Nothing to do in the BlockManagerMasterActor data structures
    import context.dispatcher
    val removeMsg = RemoveShuffle(shuffleId)
    Future.sequence(
      blockManagerInfo.values.map { bm =>
        // Here has set the akkaTimeout
        bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Boolean]
      }.toSeq
    )
  }

*/
private val queueCapacity = 10000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is not the capacity. It is just a warning threshold. it should be named accordingly.

private var queueFullErrorMessageLogged = false
private val queueLengthAccessor: Option[Field] = {
try {
val f = classOf[ReferenceQueue[AnyRef]].getDeclaredField("queueLength")
f.setAccessible(true)
Some(f)
} catch {
case e: Exception =>
logDebug("Failed to expose java.lang.ref.ReferenceQueue's queueLength field: " + e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the comment below, add a note.

None
}
}
private val logQueueLengthInterval = 1000

/**
* Whether the cleaning thread will block on cleanup tasks.
* This is set to true only for tests.
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", false)
"spark.cleaner.referenceTracking.blocking", true)

@volatile private var stopped = false

Expand Down Expand Up @@ -112,6 +139,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
var iteration = 0
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand All @@ -127,10 +155,14 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}
if (iteration % logQueueLengthInterval == 0) {
logQueueLength()
}
}
} catch {
case e: Exception => logError("Error in cleaning thread", e)
}
iteration += 1
}
}

Expand Down Expand Up @@ -171,12 +203,44 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

/**
* Log the length of the reference queue through reflection.
* This is an expensive operation and should be called sparingly.
*/
private def logQueueLength(): Unit = {
try {
queueLengthAccessor.foreach { field =>
val length = field.getLong(referenceQueue)
logDebug("Reference queue size is " + length)
if (length > queueCapacity) {
logQueueFullErrorMessage()
}
}
} catch {
case e: Exception =>
logDebug("Failed to access reference queue's length through reflection: " + e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note on why this is logDebug and not logWarning/logError.

}
}

/**
* Log an error message to indicate that the queue has exceeded its capacity. Do this only once.
*/
private def logQueueFullErrorMessage(): Unit = {
if (!queueFullErrorMessageLogged) {
queueFullErrorMessageLogged = true
logError(s"Reference queue size in ContextCleaner has exceeded $queueCapacity! " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether this should be logError. Its not like the system is immediately tipping over because of it reached this capacity. I think it should be a logWarning.

"This means the rate at which we clean up RDDs, shuffles, and/or broadcasts is too slow.")
if (blockOnCleanupTasks) {
logError("Consider setting spark.cleaner.referenceTracking.blocking to false. " +
"Note that there is a known issue (SPARK-3015) in disabling blocking, especially if " +
"the workload involves creating many RDDs in quick successions.")
}
}
}

private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

// Used for testing. These methods explicitly blocks until cleanup is completed
// to ensure that more reliable testing.
}

private object ContextCleaner {
Expand Down