Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,14 @@ package object config {
.intConf
.createWithDefault(1)

private[spark] val STORAGE_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.enabled")
.booleanConf.createWithDefault(false)
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE =
ConfigBuilder("spark.storage.decommission.maxReplicationFailures")
.intConf.createWithDefault(3)
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated

private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
logInfo(s"Finished decommissioning executor $executorId.")

if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
try {
logInfo(s"Starting decommissioning block manager corresponding to " +
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
s"executor $executorId.")
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
} catch {
case e: Exception =>
logError(s"Unexpected error during block manager " +
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
s"decommissioning for executor $executorId: ${e.toString}", e)
}
logInfo(s"Finished decommissioning block manager corresponding to $executorId.")
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
}
} else {
logInfo(s"Skipping decommissioning of executor $executorId.")
}
Expand Down Expand Up @@ -568,7 +581,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
private[spark] def decommissionExecutor(executorId: String): Unit = {
if (driverEndpoint != null) {
logInfo("Propegating executor decommission to driver.")
logInfo("Propogating executor decommission to driver.")
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
driverEndpoint.send(DecommissionExecutor(executorId))
}
}
Expand Down Expand Up @@ -652,7 +665,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* @param resourceProfileToNumExecutors The total number of executors we'd like to have per
Comment thread
prakharjain09 marked this conversation as resolved.
* @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
* ResourceProfile. The cluster manager shouldn't kill any
* running executor to reach this number, but, if all
* existing executors were to die, this is the number
Expand Down
122 changes: 113 additions & 9 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter}
import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
Expand Down Expand Up @@ -241,6 +242,9 @@ private[spark] class BlockManager(

private var blockReplicationPolicy: BlockReplicationPolicy = _

private var blockManagerDecommissioning: Boolean = false
private var decommissionManager: Option[BlockManagerDecommissionManager] = None

// A DownloadFileManager used to track all the files of remote blocks which are above the
// specified memory threshold. Files will be deleted automatically based on weak reference.
// Exposed for test
Expand Down Expand Up @@ -1281,6 +1285,9 @@ private[spark] class BlockManager(

require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
if (blockManagerDecommissioning && blockId.isRDD) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should reject the block?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added overall design to explain why I went ahead with this - #27864 (comment) .

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not sure about this part of the design. I'd like to see a test to make sure this won't cause task failures.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Fixed this. Removed the code for not accepting new RDD cache blocks on Decommissioning executor. Also added a test to show that ongoing tasks which are going to cache data in future succeeds as part of BlockManagerDecommissionSuite.

throw new RDDBlockSavedOnDecommissionedBlockManagerException(blockId.asRDDId.get)
}

val putBlockInfo = {
val newInfo = new BlockInfo(level, classTag, tellMaster)
Expand Down Expand Up @@ -1560,26 +1567,30 @@ private[spark] class BlockManager(
def replicateBlock(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit = {
maxReplicas: Int,
forceFetch: Boolean = true,
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
maxReplicationFailures: Option[Int] = None): Boolean = {
var replicatedSuccessfully = true
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
blockInfoManager.lockForReading(blockId).foreach { info =>
replicatedSuccessfully = false
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
useDisk = info.level.useDisk,
useMemory = info.level.useMemory,
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
// we know we are called as a result of an executor removal, so we refresh peer cache
// this way, we won't try to replicate to a missing executor with a stale reference
getPeers(forceFetch = true)
getPeers(forceFetch)
try {
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
replicatedSuccessfully = replicate(
blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures)
} finally {
logDebug(s"Releasing lock for $blockId")
releaseLockAndDispose(blockId, data)
}
}
replicatedSuccessfully
}

/**
Expand All @@ -1591,9 +1602,11 @@ private[spark] class BlockManager(
data: BlockData,
level: StorageLevel,
classTag: ClassTag[_],
existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
existingReplicas: Set[BlockManagerId] = Set.empty,
maxReplicationFailures: Option[Int] = None): Boolean = {

val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
val maxReplicationFailureCount = maxReplicationFailures.getOrElse(
conf.get(config.STORAGE_MAX_REPLICATION_FAILURE))
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
Expand All @@ -1617,7 +1630,7 @@ private[spark] class BlockManager(
blockId,
numPeersToReplicateTo)

while(numFailures <= maxReplicationFailures &&
while(numFailures <= maxReplicationFailureCount &&
!peersForReplication.isEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
val peer = peersForReplication.head
Expand Down Expand Up @@ -1665,9 +1678,11 @@ private[spark] class BlockManager(
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
return false
}

logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
logInfo(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
return true
}

/**
Expand Down Expand Up @@ -1761,6 +1776,57 @@ private[spark] class BlockManager(
blocksToRemove.size
}

def decommissionBlockManager(): Unit = {
if (!blockManagerDecommissioning) {
Comment thread
prakharjain09 marked this conversation as resolved.
logInfo("Starting block manager decommissioning process")
blockManagerDecommissioning = true
decommissionManager = Some(new BlockManagerDecommissionManager)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm not sure why were doing this in this manner? It seems like if we did this as a blocking call and returned true when we were done we could move on to a next phase of decommissioning once the blocks were replicated in the driver?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk I have added the overall design for this change. Please review the same and provide your suggestions/feedback.

decommissionManager.foreach(_.start())
}
}

/**
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
* Visible for testing
*/
def offloadRddCacheBlocks(): Unit = {
val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId)

if (replicateBlocksInfo.nonEmpty) {
logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
s"for block manager decommissioning")
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
// Refresh peer list once before starting replication
getPeers(true)
}

// Maximum number of storage replication failure which replicateBlock can handle
// before giving up for one block

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We had better have this message at the configuration declaration part.
  2. We may need to revise the conf name to explicitly show this clause, before giving up for one block.

@prakharjain09 prakharjain09 Apr 2, 2020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun

  1. Added message as part of description of conf.
  2. Renamed to "spark.storage.decommission.maxReplicationFailuresPerBlock". Any suggestions on some other name?

val maxReplicationFailures = conf.get(
config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE)

val blocksFailedReplication = replicateBlocksInfo.filterNot {
case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
val replicatedSuccessfully = replicateBlock(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we need it but replicateBlock is blocking and it seems like maybe async + futures might help us migrate more blocks? Especially if one host is underload we might block on sending a block to that host before we move forward.

@prakharjain09 prakharjain09 Apr 23, 2020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk I am not sure how this will behave when multiple executors on same host machine are decommissioning. And each one of them is doing it in parallel - may cause some sort of network congestion? I have updated code to do replication in ThreadPool of size 4. Maybe we should make this configurable? any suggestions?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Network congestion is certainly a possibility, I think that for now this strike a good balance between simple code and avoiding hanging all transfers if we have one slow target host. We can revisit this in the future if it turns out we need more control in production environments. Sound good?

blockId,
existingReplicas.toSet,
maxReplicas,
forceFetch = false,
maxReplicationFailures = Some(maxReplicationFailures))
if (replicatedSuccessfully) {
logInfo(s"Block $blockId offloaded successfully, Removing block now")
removeBlock(blockId)
logInfo(s"Block $blockId removed")
} else {
logWarning(s"Failed to offload block $blockId")
}
replicatedSuccessfully
}
if (blocksFailedReplication.nonEmpty) {
logWarning(s"Blocks failed replication in cache decommissioning " +
s"process: ${blocksFailedReplication.mkString(",")}")
}
}

/**
* Remove all blocks belonging to the given broadcast.
*/
Expand Down Expand Up @@ -1829,7 +1895,45 @@ private[spark] class BlockManager(
data.dispose()
}

class BlockManagerDecommissionManager {
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
@volatile private var stopped = false
private val cacheReplicationThread = new Thread {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about the thread up above, I'm not sure why we need to be in a separate thread (but maybe I've missed something).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to explain why we have created a thread here - #27864 (comment). Basically this BlockManager will keep on retrying offloading of cache blocks every 30 seconds.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I guess we can just (later on) send a message back once all the blocks are done migrating.

@prakharjain09 prakharjain09 Apr 6, 2020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When all the blocks are done migrating - Driver will automatically know. BlockManagerMaster is continuously receiving BlockUpdates from this BlockManager (using the UpdateBlockInfo message). So when all blocks are offloaded - Dynamic allocation will atomatically know that this executor doesn't have cache data and If other criteria are met (like executorIdleTimeout etc), then the executor will automatically be removed by ExecutorMonitor class of DynamicAllocation.

override def run(): Unit = {
while (blockManagerDecommissioning && !stopped) {
try {
logDebug(s"Attempting to replicate all cached RDD blocks")
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
offloadRddCacheBlocks()
logInfo(s"Attempt to replicate all cached blocks done")
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
Thread.sleep(30000)
} catch {
case _: InterruptedException =>
// no-op
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
case NonFatal(e) =>
logError("Error occurred while trying to " +
"replicate cached RDD blocks for block manager decommissioning", e)
}
}
}
}
cacheReplicationThread.setDaemon(true)
cacheReplicationThread.setName("cache-replication-thread")

def start(): Unit = {
Comment thread
prakharjain09 marked this conversation as resolved.
cacheReplicationThread.start()
}

def stop(): Unit = {
if (!stopped) {
stopped = true
logInfo("Stopping cache replication thread")
cacheReplicationThread.interrupt()
cacheReplicationThread.join()
}
}
}

def stop(): Unit = {
decommissionManager.foreach(_.stop())
blockTransferService.close()
if (blockStoreClient ne blockTransferService) {
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ class BlockManagerMaster(
logInfo("Removed " + execId + " successfully in removeExecutor")
}

/** Decommission block managers corresponding to given set of executors */
def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
}

/** Get Replication Info for all the RDD blocks stored in given blockManagerId */
def getReplicateInfoForRDDBlocks(
blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
driverEndpoint.askSync[Seq[ReplicateBlock]](
GetReplicateInfoForRDDBlocks(blockManagerId))
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
}


Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
/** Request removal of a dead executor from the driver endpoint.
* This is only called on the driver side. Non-blocking
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class BlockManagerMasterEndpoint(
// Mapping from executor ID to block manager ID.
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

// Set of block managers which are decommissioning
private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId]


Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

Expand Down Expand Up @@ -153,6 +157,14 @@ class BlockManagerMasterEndpoint(
removeExecutor(execId)
context.reply(true)

case DecommissionBlockManagers(executorIds) =>
val bmIds = executorIds.flatMap(blockManagerIdByExecutor.get)
decommissionBlockManagers(bmIds)
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
context.reply(true)

case GetReplicateInfoForRDDBlocks(blockManagerId) =>
context.reply(getReplicateInfoForRDDBlocks(blockManagerId))

case StopBlockManagerMaster =>
context.reply(true)
stop()
Expand Down Expand Up @@ -257,6 +269,7 @@ class BlockManagerMasterEndpoint(

// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId
decommissioningBlockManagerSet.remove(blockManagerId)

// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
Expand Down Expand Up @@ -299,6 +312,42 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}

/**
* Decommission the given Seq of blockmanagers
* - Adds these block managers to decommissioningBlockManagerSet Set
* - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]]
*/
def decommissionBlockManagers(
blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = {

Comment thread
prakharjain09 marked this conversation as resolved.
Outdated
val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet)
val futures = newBlockManagersToDecommission.map { blockManagerId =>
decommissioningBlockManagerSet.add(blockManagerId)
val info = blockManagerInfo(blockManagerId)
info.slaveEndpoint.ask[Unit](DecommissionBlockManager)
}
Future.sequence{ futures.toSeq }
}

/**
* Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId
* @param blockManagerId - block manager id for which ReplicateBlock info is needed
* @return Seq of ReplicateBlock
*/
private def getReplicateInfoForRDDBlocks(
blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
val info = blockManagerInfo(blockManagerId)

val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
replicateMsg
}.toSeq
}

// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlockFromWorkers(blockId: BlockId): Unit = {
Expand Down Expand Up @@ -536,7 +585,11 @@ class BlockManagerMasterEndpoint(
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
blockManagerIds
.filterNot { _.isDriver }
.filterNot { _ == blockManagerId }
.diff(decommissioningBlockManagerSet)
.toSeq
} else {
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ private[spark] object BlockManagerMessages {
case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
extends ToBlockManagerSlave

case object DecommissionBlockManager
extends ToBlockManagerSlave
Comment thread
prakharjain09 marked this conversation as resolved.
Outdated

// Remove all blocks belonging to a specific RDD.
case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave

Expand Down Expand Up @@ -125,6 +128,11 @@ private[spark] object BlockManagerMessages {

case object GetStorageStatus extends ToBlockManagerMaster

case class DecommissionBlockManagers(executorIds: Seq[String]) extends ToBlockManagerMaster

case class GetReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId)
extends ToBlockManagerMaster

case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
extends ToBlockManagerMaster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class BlockManagerSlaveEndpoint(
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
}

case DecommissionBlockManager =>
context.reply(blockManager.decommissionBlockManager())

case RemoveBroadcast(broadcastId, _) =>
doAsync[Int]("removing broadcast " + broadcastId, context) {
blockManager.removeBroadcast(broadcastId, tellMaster = true)
Expand Down
Loading