diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 4d20c7369376e..f4d5d8f1c9a16 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -17,12 +17,12 @@ package org.apache.spark -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.rdd.RDD import org.apache.spark.storage._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + /** * Spark class responsible for passing RDDs partition contents to the BlockManager and making * sure a node doesn't load two copies of an RDD at once. @@ -174,7 +174,13 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { updatedBlocks ++= blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) arr.iterator.asInstanceOf[Iterator[T]] - case Right(it) => + + case Right((it, false)) => + // big block detected when unrolling + val returnValues = it.asInstanceOf[Iterator[T]] + returnValues + + case Right((it, true)) => // There is not enough space to cache this partition in memory val returnValues = it.asInstanceOf[Iterator[T]] if (putLevel.useDisk) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1fc6f3928ecdc..59af3e5a56857 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -509,7 +509,14 @@ private[spark] class BlockManager( logDebug(s"Getting block $blockId from disk") val bytes: ByteBuffer = if (diskStore.contains(blockId)) { // DiskStore.getBytes() always returns Some, so this .get() is guaranteed to be safe - diskStore.getBytes(blockId).get + try { + diskStore.getBytes(blockId).get + } catch { + case t: Throwable => + logError(s"diskStore.getBytes($blockId).get failed") + throw t + } + } else { // Remove the missing block so that its unavailability is reported to the driver removeBlock(blockId) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 1113160f930e6..3eda4e8f96222 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -20,13 +20,13 @@ package org.apache.spark.storage import java.nio.ByteBuffer import java.util.LinkedHashMap -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.TaskContext import org.apache.spark.memory.MemoryManager -import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector +import org.apache.spark.util.{SizeEstimator, Utils} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) @@ -59,6 +59,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo private val unrollMemoryThreshold: Long = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) + // csd flag controlling whether to apply Csd's caching block size policy + private val csdCacheBlockSizeLimit: Long = + conf.getLong("spark.storage.MemoryStore.csdCacheBlockSizeLimit", Integer.MAX_VALUE.toLong) + assert(csdCacheBlockSizeLimit <= Integer.MAX_VALUE) + /** Total amount of memory available for storage, in bytes. */ private def maxMemory: Long = memoryManager.maxStorageMemory @@ -173,11 +178,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val res = putArray(blockId, arrayValues, level, returnValues) droppedBlocks ++= res.droppedBlocks PutResult(res.size, res.data, droppedBlocks) - case Right(iteratorValues) => + case Right((iteratorValues, false)) => + // big block detected when unrolling + PutResult(0, Left(iteratorValues), droppedBlocks) + case Right((iteratorValues, true)) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") - val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) + val res = + blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) PutResult(res.size, res.data, droppedBlocks) } else { PutResult(0, Left(iteratorValues), droppedBlocks) @@ -234,6 +243,43 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo logInfo("MemoryStore cleared") } + /** + * This api is used by CSD as a post process of [[unrollSafely]] to detect + * large size partitions under shortage of unroll memory. + * This api continues fetching into user memory space until we "see" total object size + * exceeds csdCacheBlockSizeLimit or when inputValues exhausted. + * In practice, we need to make sure we have at least csdCacheBlockSizeLimit + * amount of memory space available from user space per thread, for worst case. + * We can also tune csdCacheBlockSizeLimit down according to our need. + */ + private[this] def fetchUntilCsdBlockSizeLimit[T]( + blockId: BlockId, + inputValues: Iterator[T], + valuesSeen: SizeTrackingVector[Any]): Boolean = { + // if switch is off, do nothing + if (csdCacheBlockSizeLimit <= 0) { + true + } else { + val start = System.currentTimeMillis + var currentEstimatedSize = valuesSeen.estimateSize() + try { + var elementsExamined = 0L + val memoryCheckPeriod = 16 + while (inputValues.hasNext && currentEstimatedSize <= csdCacheBlockSizeLimit) { + valuesSeen += inputValues.next() + elementsExamined += 1 + if (elementsExamined % memoryCheckPeriod == 0) { + currentEstimatedSize = valuesSeen.estimateSize() + } + } + (currentEstimatedSize <= csdCacheBlockSizeLimit) + } finally { + logWarning(s"fetchUntilCsdBlockSizeLimit($blockId) duration: " + + s"${Utils.msDurationToString(System.currentTimeMillis - start)}") + } + } + } + /** * Unroll the given block in memory safely. * @@ -245,12 +291,19 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * * This method returns either an array with the contents of the entire block or an iterator * containing the values of the block (if the array would have exceeded available memory). + * + * SPY-1394: CSD modified this API in the following way: + * 1. It returns a tuple (iterator, boolean), when short of memory. + * The boolean is an indicator on whether caller should cache to disk, based + * on detection of over-sized block. + * 2. When over-sized block is detected, terminate the unroll and tell the caller to not + * cache at all. */ def unrollSafely( blockId: BlockId, values: Iterator[Any], droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) - : Either[Array[Any], Iterator[Any]] = { + : Either[Array[Any], (Iterator[Any], Boolean)] = { // Number of elements unrolled so far var elementsUnrolled = 0 @@ -281,11 +334,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Unroll this block safely, checking whether we have exceeded our threshold periodically try { - while (values.hasNext && keepUnrolling) { + var currentSize = 0L + var shouldCache = true + while (values.hasNext && keepUnrolling && (csdCacheBlockSizeLimit <= 0 || shouldCache)) { vector += values.next() if (elementsUnrolled % memoryCheckPeriod == 0) { // If our vector's size has exceeded the threshold, request more memory - val currentSize = vector.estimateSize() + currentSize = vector.estimateSize() + + if (csdCacheBlockSizeLimit > 0 && shouldCache && currentSize > csdCacheBlockSizeLimit) { + shouldCache = false + } + if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = reserveUnrollMemoryForThisTask( @@ -300,13 +360,29 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo elementsUnrolled += 1 } - if (keepUnrolling) { + if (keepUnrolling && shouldCache) { // We successfully unrolled the entirety of this block Left(vector.toArray) } else { - // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, vector.estimateSize()) - Right(vector.iterator ++ values) + if (!shouldCache) { + logBlockSizeLimitMessage(blockId, currentSize) + Right(vector.iterator ++ values, shouldCache) + } else { + // When we ran out of unroll memory from spark's storage space, + // a true value of shouldCache be false positive because we have + // not seen enough of the values. + // Try continue the fetching using memory from user space (assuming that + // enough memory is available). see [[fetchUntilCsdBlockSizeLimit]] + shouldCache = fetchUntilCsdBlockSizeLimit(blockId, values, vector) + if (!shouldCache) { + logBlockSizeLimitMessage(blockId, vector.estimateSize()) + } else { + // spark's original logging message indicating insufficient Unroll memory + // and caller will consider drop it to disk if applicable. + logUnrollFailureMessage(blockId, vector.estimateSize()) + } + Right(vector.iterator ++ values, shouldCache) + } } } finally { @@ -583,4 +659,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo ) logMemoryUsage() } + + /** + * Log a warning for when a over size block is detected. + * + * @param blockId ID of the block we are trying to unroll. + * @param finalVectorSize Final size of the vector when the block size limit is reached. + */ + private def logBlockSizeLimitMessage(blockId: BlockId, finalVectorSize: Long): Unit = { + logWarning( + s"Block size limit reached: $blockId! " + + s"(computed ${Utils.bytesToString(finalVectorSize)} so far)" + ) + logMemoryUsage() + } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index fc76b91c186c3..1a8d8a5c98dc1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1146,13 +1146,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE */ private def verifyUnroll( expected: Iterator[Any], - result: Either[Array[Any], Iterator[Any]], + result: Either[Array[Any], (Iterator[Any], Boolean)], shouldBeArray: Boolean): Unit = { val actual: Iterator[Any] = result match { case Left(arr: Array[Any]) => assert(shouldBeArray, "expected iterator from unroll!") arr.iterator - case Right(it: Iterator[Any]) => + case Right((it: Iterator[Any], shouldCache: Boolean)) => assert(!shouldBeArray, "expected array from unroll!") it case _ =>