Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@
<groupId>net.liftweb</groupId>
<artifactId>lift-json_2.9.2</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-ganglia</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import akka.util.Duration

import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{Utils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}


private[spark] sealed trait MapOutputTrackerMessage
Expand Down Expand Up @@ -71,7 +71,7 @@ private[spark] class MapOutputTracker extends Logging {
var cacheEpoch = epoch
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]

val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, "MapOutputTracker", this.cleanup)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that these take both an enum type and a string name as arguments -- let's have them take just the enum.


// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
Expand Down Expand Up @@ -117,7 +117,7 @@ class SparkContext(

// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, "SparkContext", this.cleanup)

// Initalize the Spark UI
private[spark] val ui = new SparkUI(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
def blockId: String = "broadcast_" + id

MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
}

@transient var arrayOfBlocks: Array[BroadcastBlock] = null
Expand Down Expand Up @@ -158,7 +158,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
} else {
logError("Reading broadcast variable " + id + " failed")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import org.apache.spark.{HttpServer, Logging, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet}
import org.apache.spark.util.{Utils, MetadataCleaner, MetadataCleanerType, TimeStampedHashSet}


private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
Expand All @@ -37,7 +37,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
def blockId: String = "broadcast_" + id

HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
}

if (!isLocal) {
Expand All @@ -54,7 +54,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
logInfo("Started reading broadcast variable " + id)
val start = System.nanoTime
value_ = HttpBroadcast.read[T](id)
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
val time = (System.nanoTime - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")
}
Expand Down Expand Up @@ -82,7 +82,7 @@ private object HttpBroadcast extends Logging {
private var server: HttpServer = null

private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, "HttpBroadcast", cleanup)

private lazy val compressionCodec = CompressionCodec.createCodec()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extends Broadcast[T](id) with Logging with Serializable {
def blockId = "broadcast_" + id

MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
}

@transient var arrayOfBlocks: Array[BroadcastBlock] = null
Expand Down Expand Up @@ -133,7 +133,7 @@ extends Broadcast[T](id) with Logging with Serializable {
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
} else {
logError("Reading broadcast variable " + id + " failed")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}

/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
Expand Down Expand Up @@ -139,7 +139,7 @@ class DAGScheduler(
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]

val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, "DAGScheduler", this.cleanup)

// Start a thread to run the DAGScheduler event loop
def start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDDCheckpointData
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}

private[spark] object ResultTask {

Expand All @@ -32,7 +32,7 @@ private[spark] object ResultTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]

val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, "ResultTask", serializedInfoCache.clearOldValues)

def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.storage._
import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner}
import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner, MetadataCleanerType}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDDCheckpointData

Expand All @@ -37,7 +37,7 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]

val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, "ShuffleMapTask", serializedInfoCache.clearOldValues)

def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
Expand Down
69 changes: 53 additions & 16 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] class BlockManager(
maxMemory: Long)
extends Logging {

private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, val broadcastBlock: Boolean) {
@volatile var pending: Boolean = true
@volatile var size: Long = -1L
@volatile var initThread: Thread = null
Expand Down Expand Up @@ -154,7 +154,8 @@ private[spark] class BlockManager(

var heartBeatTask: Cancellable = null

val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.BLOCK_MANAGER, "BlockManager", this.dropOldNonBroadcastBlocks)
private val broadcastCleaner = new MetadataCleaner(MetadataCleanerType.BROADCAST_VARS, "BroadcastVars", this.dropOldBroadcastBlocks)
initialize()

// The compression codec to use. Note that the "lazy" val is necessary because we want to delay
Expand Down Expand Up @@ -522,11 +523,14 @@ private[spark] class BlockManager(
iter
}

def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
: Long = {
def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean): Long = {
put(blockId, values, level, tellMaster, false)
}

def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean, broadcastBlock: Boolean): Long = {
val elements = new ArrayBuffer[Any]
elements ++= values
put(blockId, elements, level, tellMaster)
put(blockId, elements, level, tellMaster, broadcastBlock)
}

/**
Expand All @@ -538,7 +542,7 @@ private[spark] class BlockManager(
: BlockObjectWriter = {
val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize)
writer.registerCloseEventHandler(() => {
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false, false)
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.size())
})
Expand All @@ -548,9 +552,12 @@ private[spark] class BlockManager(
/**
* Put a new block of values to the block manager. Returns its (estimated) size in bytes.
*/
def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
tellMaster: Boolean = true) : Long = {
def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean = true) : Long = {

put(blockId, values, level, tellMaster, false)
}

def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean, broadcastBlock: Boolean) : Long = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far we've been using the blockID instead of a flag to determine whether a block is a broadcast, an RDD, or something else. Please do it that way here instead of adding a new boolean flag just for broadcasts. In the cleanup part you can just check if the block ID starts with "broadcast_".

if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
Expand All @@ -565,7 +572,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
val tinfo = new BlockInfo(level, tellMaster)
val tinfo = new BlockInfo(level, tellMaster, broadcastBlock)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)

Expand Down Expand Up @@ -667,9 +674,12 @@ private[spark] class BlockManager(
/**
* Put a new block of serialized bytes to the block manager.
*/
def putBytes(
blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {

putBytes(blockId, bytes, level, tellMaster, false)
}

def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean, broadcastBlock: Boolean) {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
Expand All @@ -684,7 +694,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
val tinfo = new BlockInfo(level, tellMaster)
val tinfo = new BlockInfo(level, tellMaster, broadcastBlock)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)

Expand Down Expand Up @@ -800,7 +810,11 @@ private[spark] class BlockManager(
* Write a block consisting of a single object.
*/
def putSingle(blockId: String, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
put(blockId, Iterator(value), level, tellMaster)
putSingle(blockId, value, level, tellMaster, false)
}

def putSingle(blockId: String, value: Any, level: StorageLevel, tellMaster: Boolean, broadcastVar: Boolean) {
put(blockId, Iterator(value), level, tellMaster, broadcastVar)
}

/**
Expand Down Expand Up @@ -886,13 +900,36 @@ private[spark] class BlockManager(
}
}

def dropOldBlocks(cleanupTime: Long) {
logInfo("Dropping blocks older than " + cleanupTime)
def dropOldNonBroadcastBlocks(cleanupTime: Long) {
logInfo("Dropping non broadcast blocks older than " + cleanupTime)
val iterator = blockInfo.internalMap.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
if (time < cleanupTime && ! info.broadcastBlock) {
info.synchronized {
val level = info.level
if (level.useMemory) {
memoryStore.remove(id)
}
if (level.useDisk) {
diskStore.remove(id)
}
iterator.remove()
logInfo("Dropped block " + id)
}
reportBlockStatus(id, info)
}
}
}

def dropOldBroadcastBlocks(cleanupTime: Long) {
logInfo("Dropping broaddcast blocks older than " + cleanupTime)
val iterator = blockInfo.internalMap.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
if (time < cleanupTime) {
if (time < cleanupTime && info.broadcastBlock) {
info.synchronized {
val level = info.level
if (level.useMemory) {
Expand Down
37 changes: 34 additions & 3 deletions core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.util
import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
import java.util.{TimerTask, Timer}
import org.apache.spark.Logging
import scala.collection.mutable.HashMap


/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
private val delaySeconds = MetadataCleaner.getDelaySeconds
class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, name: String, cleanupFunc: (Long) => Unit) extends Logging {
private val delaySeconds = MetadataCleaner.getDelaySeconds(cleanerType)
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)

Expand All @@ -53,9 +54,39 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
}
}

object MetadataCleanerType extends Enumeration("map_output_tracker", "spark_context", "http_broadcast", "dag_scheduler", "result_task",
"shuffle_map_task", "block_manager", "broadcast_vars") {

val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, BROADCAST_VARS = Value

type MetadataCleanerType = Value

def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid underscores in the system properties -- use camelCase to match our other ones. I'd also be okay if these were full class names (MapOutputTracker, SparkContext, etc).

}


object MetadataCleaner {


// using only sys props for now : so that workers can also get to it while preserving earlier behavior.
def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this getDefaultDelaySeconds or something like that; I'm not sure it gets called directly anymore after this change

def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) }

def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
System.getProperty(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt
}

def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) {
System.setProperty(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
}

def setDelaySeconds(delay: Int, resetAll: Boolean = true) {
// override for all ?
System.setProperty("spark.cleaner.ttl", delay.toString)
if (resetAll) {
for (cleanerType <- MetadataCleanerType.values) {
System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
}
}
}
}