Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 54 additions & 20 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.Logging
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}


Expand All @@ -29,7 +30,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one of these per JVM.
*/
private[spark] abstract class MemoryManager {
private[spark] abstract class MemoryManager extends Logging {

// The memory store used to evict cached blocks
private var _memoryStore: MemoryStore = _
Expand All @@ -40,6 +41,10 @@ private[spark] abstract class MemoryManager {
_memoryStore
}

// Amount of execution/storage memory in use, accesses must be synchronized on `this`
protected var _executionMemoryUsed: Long = 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here is protected?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pzz2011 you're making several comments on old PRs. Generally people won't see that and it's not the place for discussion anyway. If you can formulate a specific question beyond "why is the code this way?" ask on user@.

protected var _storageMemoryUsed: Long = 0

/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
Expand All @@ -49,10 +54,23 @@ private[spark] abstract class MemoryManager {
}

/**
* Acquire N bytes of memory for execution.
* Total available memory for execution, in bytes.
*/
def maxExecutionMemory: Long

/**
* Total available memory for storage, in bytes.
*/
def maxStorageMemory: Long

/**
* Acquire N bytes of memory for execution, evicting cached blocks if necessary.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (<= N).
*/
def acquireExecutionMemory(numBytes: Long): Long
def acquireExecutionMemory(
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
Expand All @@ -72,46 +90,62 @@ private[spark] abstract class MemoryManager {
def acquireUnrollMemory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that acquireUnrollMemory appears to act as a synonym for acquireStorageMemory in the current implementation, it might be worth adding a brief comment above this method to explain that this extra method exists in order to give us the future flexibility to account for unroll memory differently in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, it's more than a synonym. In StaticMemoryManager it's required to preserve existing behavior where unrolling doesn't evict all the blocks.

blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
acquireStorageMemory(blockId, numBytes, evictedBlocks)
}

/**
* Release N bytes of execution memory.
*/
def releaseExecutionMemory(numBytes: Long): Unit
def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _executionMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of execution " +
s"memory when we only have ${_executionMemoryUsed} bytes")
_executionMemoryUsed = 0
} else {
_executionMemoryUsed -= numBytes
}
}

/**
* Release N bytes of storage memory.
*/
def releaseStorageMemory(numBytes: Long): Unit
def releaseStorageMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _storageMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of storage " +
s"memory when we only have ${_storageMemoryUsed} bytes")
_storageMemoryUsed = 0
} else {
_storageMemoryUsed -= numBytes
}
}

/**
* Release all storage memory acquired.
*/
def releaseStorageMemory(): Unit
def releaseStorageMemory(): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit (which I guess existed in the old code), but what do you think about naming this releaseAllStorageMemory?

_storageMemoryUsed = 0
}

/**
* Release N bytes of unroll memory.
*/
def releaseUnrollMemory(numBytes: Long): Unit

/**
* Total available memory for execution, in bytes.
*/
def maxExecutionMemory: Long

/**
* Total available memory for storage, in bytes.
*/
def maxStorageMemory: Long
def releaseUnrollMemory(numBytes: Long): Unit = {
releaseStorageMemory(numBytes)
}

/**
* Execution memory currently in use, in bytes.
*/
def executionMemoryUsed: Long
final def executionMemoryUsed: Long = synchronized {
_executionMemoryUsed
}

/**
* Storage memory currently in use, in bytes.
*/
def storageMemoryUsed: Long
final def storageMemoryUsed: Long = synchronized {
_storageMemoryUsed
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.memory

import scala.collection.mutable

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}


Expand All @@ -34,17 +34,7 @@ private[spark] class StaticMemoryManager(
conf: SparkConf,
override val maxExecutionMemory: Long,
override val maxStorageMemory: Long)
extends MemoryManager with Logging {

// Max number of bytes worth of blocks to evict when unrolling
private val maxMemoryToEvictForUnroll: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

// Amount of execution / storage memory in use
// Accesses must be synchronized on `this`
private var _executionMemoryUsed: Long = 0
private var _storageMemoryUsed: Long = 0
extends MemoryManager {

def this(conf: SparkConf) {
this(
Expand All @@ -53,11 +43,19 @@ private[spark] class StaticMemoryManager(
StaticMemoryManager.getMaxStorageMemory(conf))
}

// Max number of bytes worth of blocks to evict when unrolling
private val maxMemoryToEvictForUnroll: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

/**
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
override def acquireExecutionMemory(numBytes: Long): Long = synchronized {
override def acquireExecutionMemory(
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = {
assert(numBytes >= 0)
assert(_executionMemoryUsed <= maxExecutionMemory)
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
_executionMemoryUsed += bytesToGrant
Expand Down Expand Up @@ -109,6 +107,8 @@ private[spark] class StaticMemoryManager(
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
// Note: Keep this outside synchronized block to avoid potential deadlocks!
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
synchronized {
Expand All @@ -121,60 +121,6 @@ private[spark] class StaticMemoryManager(
}
}

/**
* Release N bytes of execution memory.
*/
override def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _executionMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of execution " +
s"memory when we only have ${_executionMemoryUsed} bytes")
_executionMemoryUsed = 0
} else {
_executionMemoryUsed -= numBytes
}
}

/**
* Release N bytes of storage memory.
*/
override def releaseStorageMemory(numBytes: Long): Unit = synchronized {
if (numBytes > _storageMemoryUsed) {
logWarning(s"Attempted to release $numBytes bytes of storage " +
s"memory when we only have ${_storageMemoryUsed} bytes")
_storageMemoryUsed = 0
} else {
_storageMemoryUsed -= numBytes
}
}

/**
* Release all storage memory acquired.
*/
override def releaseStorageMemory(): Unit = synchronized {
_storageMemoryUsed = 0
}

/**
* Release N bytes of unroll memory.
*/
override def releaseUnrollMemory(numBytes: Long): Unit = {
releaseStorageMemory(numBytes)
}

/**
* Amount of execution memory currently in use, in bytes.
*/
override def executionMemoryUsed: Long = synchronized {
_executionMemoryUsed
}

/**
* Amount of storage memory currently in use, in bytes.
*/
override def storageMemoryUsed: Long = synchronized {
_storageMemoryUsed
}

}


Expand Down
Loading