Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,18 @@ private[spark] abstract class MemoryManager(
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
* the MemoryManager implementation.
* Total available on heap memory for storage, in bytes. This amount can vary over time,
* depending on the MemoryManager implementation.
* In this model, this is equivalent to the amount of memory not occupied by execution.
*/
def maxOnHeapStorageMemory: Long

/**
* Total available off heap memory for storage, in bytes. This amount can vary over time,
* depending on the MemoryManager implementation.
*/
def maxOffHeapStorageMemory: Long

/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ private[spark] class StaticMemoryManager(
(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

override def maxOffHeapStorageMemory: Long = 0L

override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

override def maxOffHeapStorageMemory: Long = synchronized {
maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}

/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
* number of bytes obtained, or 0 if none can be allocated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxMemory = memoryManager.maxOnHeapStorageMemory
private val maxMemory =
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory

// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
Expand Down Expand Up @@ -802,7 +803,7 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
if (tellMaster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ private[spark] class MemoryStore(
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)

/** Total amount of memory available for storage, in bytes. */
private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory
private def maxMemory: Long = {
memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
}

if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TestMemoryManager(conf: SparkConf)
}
override def maxOnHeapStorageMemory: Long = Long.MaxValue

override def maxOffHeapStorageMemory: Long = 0L

private var oomOnce = false
private var available = Long.MaxValue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE

// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
assert(memStatus._1 == 40000L, "total memory " + memStatus._1 + " should equal 40000")
assert(memStatus._2 <= 32000L, "remaining memory " + memStatus._2 + " should <= 12000")
assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store")
Expand Down Expand Up @@ -269,8 +269,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
val memStatus = master.getMemoryStatus.head._2
memStatus._1 should equal (20000L)
memStatus._2 should equal (20000L)
memStatus._1 should equal (40000L)
memStatus._2 should equal (40000L)
}
}

Expand Down