Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/**
* Whether the cleaning thread will block on cleanup tasks.
* This is set to true only for tests.
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes will not solve the problem here. see.
BlockManagerMasterActor.scala#L165

  private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
    // Nothing to do in the BlockManagerMasterActor data structures
    import context.dispatcher
    val removeMsg = RemoveShuffle(shuffleId)
    Future.sequence(
      blockManagerInfo.values.map { bm =>
        // Here has set the akkaTimeout
        bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Boolean]
      }.toSeq
    )
  }

* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", false)
"spark.cleaner.referenceTracking.blocking", true)

@volatile private var stopped = false

Expand Down Expand Up @@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

// Used for testing. These methods explicitly blocks until cleanup is completed
// to ensure that more reliable testing.
}

private object ContextCleaner {
Expand Down