Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd.RDD
import org.apache.spark.storage._

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
Expand Down Expand Up @@ -174,7 +174,13 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>

case Right((it, false)) =>
// big block detected when unrolling
val returnValues = it.asInstanceOf[Iterator[T]]
returnValues

case Right((it, true)) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,14 @@ private[spark] class BlockManager(
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = if (diskStore.contains(blockId)) {
// DiskStore.getBytes() always returns Some, so this .get() is guaranteed to be safe
diskStore.getBytes(blockId).get
try {
diskStore.getBytes(blockId).get
} catch {
case t: Throwable =>
logError(s"diskStore.getBytes($blockId).get failed")
throw t
}

} else {
// Remove the missing block so that its unavailability is reported to the driver
removeBlock(blockId)
Expand Down
109 changes: 96 additions & 13 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.spark.storage
import java.nio.ByteBuffer
import java.util.LinkedHashMap

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.TaskContext
import org.apache.spark.memory.MemoryManager
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
import org.apache.spark.util.{SizeEstimator, Utils}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)

Expand Down Expand Up @@ -59,6 +59,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
private val unrollMemoryThreshold: Long =
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)

// csd flag controlling whether to apply Csd's caching block size policy
private val csdCacheBlockSizeLimit: Long =
conf.getLong("spark.storage.MemoryStore.csdCacheBlockSizeLimit", Integer.MAX_VALUE.toLong)
assert(csdCacheBlockSizeLimit <= Integer.MAX_VALUE)

/** Total amount of memory available for storage, in bytes. */
private def maxMemory: Long = memoryManager.maxStorageMemory

Expand Down Expand Up @@ -173,11 +178,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val res = putArray(blockId, arrayValues, level, returnValues)
droppedBlocks ++= res.droppedBlocks
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
case Right((iteratorValues, false)) =>
// big block detected when unrolling
PutResult(0, Left(iteratorValues), droppedBlocks)
case Right((iteratorValues, true)) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
val res =
blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
PutResult(res.size, res.data, droppedBlocks)
} else {
PutResult(0, Left(iteratorValues), droppedBlocks)
Expand Down Expand Up @@ -234,6 +243,46 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
logInfo("MemoryStore cleared")
}

/**
* This api is used by CSD as a post process of [[unrollSafely]] to detect
* partitions larger than 2G in estimated object size when there is not enough memory for
* unrolling.
* This api continues fetching until we "see" 2G of object in size or values exhausted,
* with the assumption that the amount of memory specified by sizeLimit parameter are available
* in spark's user memory space per thread and per operator/RDD compute.
*
* Parameter sizeLimit is at most as large as csdCacheBlockSizeLimit (which is also upper bounded
* by Integer.MAX_VALUE), we can make sure user memory space has at least 2G available
* per thread for worst case.
*/
private[this] def fetchUntilCsdBlockSizeLimit[T](
blockId: BlockId,
inputValues: Iterator[T],
valuesSeen: SizeTrackingVector[Any]): Boolean = {
// if switch is off, do nothing
if (csdCacheBlockSizeLimit <= 0) {
true
} else {
val start = System.currentTimeMillis
var currentEstimatedSize = valuesSeen.estimateSize()
try {
var elementsExamined = 0L
val memoryCheckPeriod = 16
while (inputValues.hasNext && currentEstimatedSize <= csdCacheBlockSizeLimit) {
valuesSeen += inputValues.next()
elementsExamined += 1
if (elementsExamined % memoryCheckPeriod == 0) {
currentEstimatedSize = valuesSeen.estimateSize()
}
}
(currentEstimatedSize <= csdCacheBlockSizeLimit)
} finally {
logWarning(s"fetchUntilCsdBlockSizeLimit($blockId) duration: " +
s"${Utils.msDurationToString(System.currentTimeMillis - start)}")
}
}
}

/**
* Unroll the given block in memory safely.
*
Expand All @@ -245,12 +294,19 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*
* This method returns either an array with the contents of the entire block or an iterator
* containing the values of the block (if the array would have exceeded available memory).
*
* SPY-1394: CSD modified this API in the following way:
* 1. It returns a tuple (iterator, boolean), when short of memory.
* The boolean is an indicator on whether caller should cache to disk, based
* on detection of over-sized block.
* 2. When over-sized block is detected, terminate the unroll and tell the caller to not
* cache at all.
*/
def unrollSafely(
blockId: BlockId,
values: Iterator[Any],
droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
: Either[Array[Any], Iterator[Any]] = {
: Either[Array[Any], (Iterator[Any], Boolean)] = {

// Number of elements unrolled so far
var elementsUnrolled = 0
Expand Down Expand Up @@ -281,11 +337,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo

// Unroll this block safely, checking whether we have exceeded our threshold periodically
try {
while (values.hasNext && keepUnrolling) {
var currentSize = 0L
var shouldCache = true
while (values.hasNext && keepUnrolling && (csdCacheBlockSizeLimit <= 0 || shouldCache)) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) {
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
currentSize = vector.estimateSize()

if (csdCacheBlockSizeLimit > 0 && shouldCache && currentSize > csdCacheBlockSizeLimit) {
shouldCache = false
}

if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(
Expand All @@ -300,13 +363,25 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
elementsUnrolled += 1
}

if (keepUnrolling) {
if (keepUnrolling && shouldCache) {
// We successfully unrolled the entirety of this block
Left(vector.toArray)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
if (!shouldCache) {
logBlockSizeLimitMessage(blockId, currentSize)
Right(vector.iterator ++ values, shouldCache)
} else {
// could be false positive because we have not seen enough of the values
// continue the fetching using memory from user
shouldCache = fetchUntilCsdBlockSizeLimit(blockId, values, vector)
if (!shouldCache) {
logBlockSizeLimitMessage(blockId, vector.estimateSize())
} else {
// We ran out of space while unrolling the values for this block

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand this comment for this case. Wouldn't this be the case where it's still cacheable?
oic, this is the original comment. Maybe need to put a line before this to separate cache from unroll.

logUnrollFailureMessage(blockId, vector.estimateSize())
}
Right(vector.iterator ++ values, shouldCache)
}
}

} finally {
Expand Down Expand Up @@ -583,4 +658,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
)
logMemoryUsage()
}

private def logBlockSizeLimitMessage(blockId: BlockId, currentSize: Long): Unit = {
logWarning(
s"Block size limit reached: $blockId! " +
s"(computed ${Utils.bytesToString(currentSize)} so far)"
)
logMemoryUsage()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1146,13 +1146,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
*/
private def verifyUnroll(
expected: Iterator[Any],
result: Either[Array[Any], Iterator[Any]],
result: Either[Array[Any], (Iterator[Any], Boolean)],
shouldBeArray: Boolean): Unit = {
val actual: Iterator[Any] = result match {
case Left(arr: Array[Any]) =>
assert(shouldBeArray, "expected iterator from unroll!")
arr.iterator
case Right(it: Iterator[Any]) =>
case Right((it: Iterator[Any], shouldCache: Boolean)) =>
assert(!shouldBeArray, "expected array from unroll!")
it
case _ =>
Expand Down