diff --git a/core/pom.xml b/core/pom.xml
index 5738b7406f..0fee6e3281 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -110,6 +110,10 @@
net.liftweb
lift-json_2.9.2
+
+ com.codahale.metrics
+ metrics-ganglia
+
it.unimi.dsi
fastutil
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ae7cf2a893..73a82a63fb 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -32,7 +32,7 @@ import akka.util.Duration
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{Utils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
private[spark] sealed trait MapOutputTrackerMessage
@@ -71,7 +71,7 @@ private[spark] class MapOutputTracker extends Logging {
var cacheEpoch = epoch
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
+ val metadataCleaner = new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, "MapOutputTracker", this.cleanup)
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 29407bcd30..fc399dbb0d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -62,7 +62,7 @@ import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
@@ -117,7 +117,7 @@ class SparkContext(
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
- private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
+ private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, "SparkContext", this.cleanup)
// Initalize the Spark UI
private[spark] val ui = new SparkUI(this)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
index 93e7815ab5..fa4c6fcc03 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
@@ -39,7 +39,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
def blockId: String = "broadcast_" + id
MultiTracker.synchronized {
- SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
+ SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
}
@transient var arrayOfBlocks: Array[BroadcastBlock] = null
@@ -158,7 +158,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
SparkEnv.get.blockManager.putSingle(
- blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
+ blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
} else {
logError("Reading broadcast variable " + id + " failed")
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 9db26ae6de..9614aa4e24 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -26,7 +26,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import org.apache.spark.{HttpServer, Logging, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet}
+import org.apache.spark.util.{Utils, MetadataCleaner, MetadataCleanerType, TimeStampedHashSet}
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
@@ -37,7 +37,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
def blockId: String = "broadcast_" + id
HttpBroadcast.synchronized {
- SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
+ SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
}
if (!isLocal) {
@@ -54,7 +54,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
logInfo("Started reading broadcast variable " + id)
val start = System.nanoTime
value_ = HttpBroadcast.read[T](id)
- SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
+ SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
val time = (System.nanoTime - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")
}
@@ -82,7 +82,7 @@ private object HttpBroadcast extends Logging {
private var server: HttpServer = null
private val files = new TimeStampedHashSet[String]
- private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
+ private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, "HttpBroadcast", cleanup)
private lazy val compressionCodec = CompressionCodec.createCodec()
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
index 80c97ca073..82735c3eb4 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
@@ -36,7 +36,7 @@ extends Broadcast[T](id) with Logging with Serializable {
def blockId = "broadcast_" + id
MultiTracker.synchronized {
- SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
+ SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
}
@transient var arrayOfBlocks: Array[BroadcastBlock] = null
@@ -133,7 +133,7 @@ extends Broadcast[T](id) with Logging with Serializable {
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
SparkEnv.get.blockManager.putSingle(
- blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
+ blockId, value_, StorageLevel.MEMORY_AND_DISK, false, true)
} else {
logError("Reading broadcast variable " + id + " failed")
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3e3f04f087..148e3d12e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -30,7 +30,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -139,7 +139,7 @@ class DAGScheduler(
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]
- val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
+ val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, "DAGScheduler", this.cleanup)
// Start a thread to run the DAGScheduler event loop
def start() {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 07e8317e3a..83fd680570 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -23,7 +23,7 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDDCheckpointData
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
private[spark] object ResultTask {
@@ -32,7 +32,7 @@ private[spark] object ResultTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
+ val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, "ResultTask", serializedInfoCache.clearOldValues)
def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index d23df0dd2b..bd8ddfe9ae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.storage._
-import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner}
+import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner, MetadataCleanerType}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDDCheckpointData
@@ -37,7 +37,7 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
+ val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, "ShuffleMapTask", serializedInfoCache.clearOldValues)
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 60fdc5f2ee..828887aec8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -46,7 +46,7 @@ private[spark] class BlockManager(
maxMemory: Long)
extends Logging {
- private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
+ private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, val broadcastBlock: Boolean) {
@volatile var pending: Boolean = true
@volatile var size: Long = -1L
@volatile var initThread: Thread = null
@@ -154,7 +154,8 @@ private[spark] class BlockManager(
var heartBeatTask: Cancellable = null
- val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
+ private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.BLOCK_MANAGER, "BlockManager", this.dropOldNonBroadcastBlocks)
+ private val broadcastCleaner = new MetadataCleaner(MetadataCleanerType.BROADCAST_VARS, "BroadcastVars", this.dropOldBroadcastBlocks)
initialize()
// The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -522,11 +523,14 @@ private[spark] class BlockManager(
iter
}
- def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
- : Long = {
+ def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean): Long = {
+ put(blockId, values, level, tellMaster, false)
+ }
+
+ def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean, broadcastBlock: Boolean): Long = {
val elements = new ArrayBuffer[Any]
elements ++= values
- put(blockId, elements, level, tellMaster)
+ put(blockId, elements, level, tellMaster, broadcastBlock)
}
/**
@@ -538,7 +542,7 @@ private[spark] class BlockManager(
: BlockObjectWriter = {
val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize)
writer.registerCloseEventHandler(() => {
- val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
+ val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false, false)
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.size())
})
@@ -548,9 +552,12 @@ private[spark] class BlockManager(
/**
* Put a new block of values to the block manager. Returns its (estimated) size in bytes.
*/
- def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
- tellMaster: Boolean = true) : Long = {
+ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean = true) : Long = {
+
+ put(blockId, values, level, tellMaster, false)
+ }
+ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean, broadcastBlock: Boolean) : Long = {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
@@ -565,7 +572,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
- val tinfo = new BlockInfo(level, tellMaster)
+ val tinfo = new BlockInfo(level, tellMaster, broadcastBlock)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
@@ -667,9 +674,12 @@ private[spark] class BlockManager(
/**
* Put a new block of serialized bytes to the block manager.
*/
- def putBytes(
- blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
+ def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
+ putBytes(blockId, bytes, level, tellMaster, false)
+ }
+
+ def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean, broadcastBlock: Boolean) {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
@@ -684,7 +694,7 @@ private[spark] class BlockManager(
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = {
- val tinfo = new BlockInfo(level, tellMaster)
+ val tinfo = new BlockInfo(level, tellMaster, broadcastBlock)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
@@ -800,7 +810,11 @@ private[spark] class BlockManager(
* Write a block consisting of a single object.
*/
def putSingle(blockId: String, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
- put(blockId, Iterator(value), level, tellMaster)
+ putSingle(blockId, value, level, tellMaster, false)
+ }
+
+ def putSingle(blockId: String, value: Any, level: StorageLevel, tellMaster: Boolean, broadcastVar: Boolean) {
+ put(blockId, Iterator(value), level, tellMaster, broadcastVar)
}
/**
@@ -886,13 +900,36 @@ private[spark] class BlockManager(
}
}
- def dropOldBlocks(cleanupTime: Long) {
- logInfo("Dropping blocks older than " + cleanupTime)
+ def dropOldNonBroadcastBlocks(cleanupTime: Long) {
+ logInfo("Dropping non broadcast blocks older than " + cleanupTime)
+ val iterator = blockInfo.internalMap.entrySet().iterator()
+ while (iterator.hasNext) {
+ val entry = iterator.next()
+ val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
+ if (time < cleanupTime && ! info.broadcastBlock) {
+ info.synchronized {
+ val level = info.level
+ if (level.useMemory) {
+ memoryStore.remove(id)
+ }
+ if (level.useDisk) {
+ diskStore.remove(id)
+ }
+ iterator.remove()
+ logInfo("Dropped block " + id)
+ }
+ reportBlockStatus(id, info)
+ }
+ }
+ }
+
+ def dropOldBroadcastBlocks(cleanupTime: Long) {
+ logInfo("Dropping broaddcast blocks older than " + cleanupTime)
val iterator = blockInfo.internalMap.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
- if (time < cleanupTime) {
+ if (time < cleanupTime && info.broadcastBlock) {
info.synchronized {
val level = info.level
if (level.useMemory) {
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index a430a75451..da15c0376f 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -20,13 +20,14 @@ package org.apache.spark.util
import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
import java.util.{TimerTask, Timer}
import org.apache.spark.Logging
+import scala.collection.mutable.HashMap
/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
-class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
- private val delaySeconds = MetadataCleaner.getDelaySeconds
+class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, name: String, cleanupFunc: (Long) => Unit) extends Logging {
+ private val delaySeconds = MetadataCleaner.getDelaySeconds(cleanerType)
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)
@@ -53,9 +54,39 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
}
}
+object MetadataCleanerType extends Enumeration("map_output_tracker", "spark_context", "http_broadcast", "dag_scheduler", "result_task",
+ "shuffle_map_task", "block_manager", "broadcast_vars") {
+
+ val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, BROADCAST_VARS = Value
+
+ type MetadataCleanerType = Value
+
+ def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
+}
+
object MetadataCleaner {
+
+
+ // using only sys props for now : so that workers can also get to it while preserving earlier behavior.
def getDelaySeconds = System.getProperty("spark.cleaner.ttl", "-1").toInt
- def setDelaySeconds(delay: Int) { System.setProperty("spark.cleaner.ttl", delay.toString) }
+
+ def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
+ System.getProperty(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds.toString).toInt
+ }
+
+ def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) {
+ System.setProperty(MetadataCleanerType.systemProperty(cleanerType), delay.toString)
+ }
+
+ def setDelaySeconds(delay: Int, resetAll: Boolean = true) {
+ // override for all ?
+ System.setProperty("spark.cleaner.ttl", delay.toString)
+ if (resetAll) {
+ for (cleanerType <- MetadataCleanerType.values) {
+ System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))
+ }
+ }
+ }
}