Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ab0e38c
[SPARK-20732][CORE] Decommission cache blocks to other executors when…
prakharjain09 Mar 10, 2020
6a47615
changes
prakharjain09 Mar 10, 2020
622e1ba
minor changes
prakharjain09 Mar 13, 2020
d792092
review comments addressed
prakharjain09 Mar 13, 2020
b9906c2
review comments
prakharjain09 Mar 29, 2020
076dd67
fix comment
prakharjain09 Apr 2, 2020
d12dbff
accept new blocks even on a decommissiong block manager
prakharjain09 Apr 6, 2020
4c67660
Merge remote-tracking branch 'apache/master' into SPARK-20732-rddcache-1
prakharjain09 Apr 7, 2020
f6b4f7c
review comments addressed
prakharjain09 Apr 8, 2020
9c6bdb6
remove extra space
prakharjain09 Apr 8, 2020
bb324f9
parallely replicate blocks
prakharjain09 Apr 23, 2020
12e865c
Merge remote-tracking branch 'apache/master' into SPARK-20732-rddcache-1
prakharjain09 Apr 25, 2020
5847c1c
Merge remote-tracking branch 'apache/master' into SPARK-20732-rddcache-1
prakharjain09 Apr 28, 2020
a2a81f6
commenting code to debug test failure
prakharjain09 Apr 30, 2020
3a14320
Merge remote-tracking branch 'apache/master' into SPARK-20732-rddcache-1
prakharjain09 May 2, 2020
6ab11e3
more changes done to debug test failures
prakharjain09 May 2, 2020
c645582
test debug: use two execs instead of 3
prakharjain09 May 3, 2020
37ad189
cherry-pick fixes from https://github.com/holdenk/spark/tree/SPARK-20…
prakharjain09 May 14, 2020
b365921
empty commit to trigger test
prakharjain09 May 14, 2020
75d6daa
added Thread.interrupted check and max failure limit
prakharjain09 May 14, 2020
c343056
Add set -ex in dev/test-dependencies.sh for debugging
prakharjain09 May 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,34 @@ package object config {
.intConf
.createWithDefault(1)

private[spark] val STORAGE_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.enabled")
.doc("Whether to decommission the block manager when decommissioning executor")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just use spark.storage.maxReplicationFailures directly. Less configurations contribute to better UX.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not sure that's a great idea. Looking at maxReplicationFailures the default is set to one, which certainly makes sense in the situation where we don't expect the host to be exiting. But this situation is different, we know the current block is going to disappear soon so it makes sense to more aggressively try and copy the block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for your explanation.

.internal()
.doc("Maximum number of failures which can be handled for the replication of " +
"one RDD block when block manager is decommissioning and trying to move its " +
"existing blocks.")
.version("3.1.0")
.intConf
.createWithDefault(3)

private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL =
ConfigBuilder("spark.storage.decommission.replicationReattemptInterval")
.internal()
.doc("The interval of time between consecutive cache block replication reattempts " +
"happening on each decommissioning executor (due to storage decommissioning).")
.version("3.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ > 0, "Time interval between two consecutive attempts of " +
"cache block replication should be positive.")
.createWithDefaultString("30s")

private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
logInfo(s"Finished decommissioning executor $executorId.")

if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
try {
logInfo("Starting decommissioning block manager corresponding to " +
s"executor $executorId.")
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
} catch {
case e: Exception =>
logError("Unexpected error during block manager " +
s"decommissioning for executor $executorId: ${e.toString}", e)
}
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
}
} else {
logInfo(s"Skipping decommissioning of executor $executorId.")
}
Expand Down Expand Up @@ -574,7 +587,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
private[spark] def decommissionExecutor(executorId: String): Unit = {
if (driverEndpoint != null) {
logInfo("Propegating executor decommission to driver.")
logInfo("Propagating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId))
}
}
Expand Down Expand Up @@ -658,7 +671,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* @param resourceProfileToNumExecutors The total number of executors we'd like to have per
* @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
* ResourceProfile. The cluster manager shouldn't kill any
* running executor to reach this number, but, if all
* existing executors were to die, this is the number
Expand Down
129 changes: 120 additions & 9 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter}
import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
Expand Down Expand Up @@ -241,6 +242,9 @@ private[spark] class BlockManager(

private var blockReplicationPolicy: BlockReplicationPolicy = _

private var blockManagerDecommissioning: Boolean = false
private var decommissionManager: Option[BlockManagerDecommissionManager] = None

// A DownloadFileManager used to track all the files of remote blocks which are above the
// specified memory threshold. Files will be deleted automatically based on weak reference.
// Exposed for test
Expand Down Expand Up @@ -1551,30 +1555,36 @@ private[spark] class BlockManager(
}

/**
* Called for pro-active replenishment of blocks lost due to executor failures
* Replicates a block to peer block managers based on existingReplicas and maxReplicas
*
* @param blockId blockId being replicate
* @param existingReplicas existing block managers that have a replica
* @param maxReplicas maximum replicas needed
* @param maxReplicationFailures number of replication failures to tolerate before
* giving up.
* @return whether block was successfully replicated or not
*/
def replicateBlock(
Copy link

@agrawaldevesh agrawaldevesh Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prakharjain09 / @holdenk, Thank you for this improvement. I had a question please:

(I am still new to this code paths and I am not totally sure of what I am talking about. So if there is something I am missing please help me fill the gaps :-). )

I notice that replicateBlock is already called during the executor removal codepath. ie, org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager does the replication to other peers if spark.storage.replication.proactive=true. This seemed to have been implemented in SPARK-15355. And org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager is triggered when the executor is "eventually" lost.

I understand it is a bit late to do the replication when the executor is indeed lost: Since decommissioning as implemented in #26440 does not really trigger eager executor loss. We instead merely stop scheduling on the decom'd executor and let it be shot down out of band. Which means that the replication triggered in SPARK-15355 would be too late.

I like the approach taken in this PR to eagerly tell the executor (block-manager) to start replication when the decom is first initiated, to give it more time to be useful. But I wonder if you implemented this somewhat differently by leveraging the existing eager replication loop ?.

Thanks !

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the existing block replication is for the case where blocks we stored on two machines and due to executor loss are now down to one machine so they are replicated. It's not useless but it doesn't solve the same core problem.

blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit = {
maxReplicas: Int,
maxReplicationFailures: Option[Int] = None): Boolean = {
logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
blockInfoManager.lockForReading(blockId).foreach { info =>
blockInfoManager.lockForReading(blockId).forall { info =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use map?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using map would give us back an Option[Boolean] and we just want a boolean

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
useDisk = info.level.useDisk,
useMemory = info.level.useMemory,
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
// we know we are called as a result of an executor removal, so we refresh peer cache
// this way, we won't try to replicate to a missing executor with a stale reference
// we know we are called as a result of an executor removal or because the current executor
// is getting decommissioned. so we refresh peer cache before trying replication, we won't
// try to replicate to a missing executor/another decommissioning executor
getPeers(forceFetch = true)
try {
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
replicate(
blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures)
} finally {
logDebug(s"Releasing lock for $blockId")
releaseLockAndDispose(blockId, data)
Expand All @@ -1591,9 +1601,11 @@ private[spark] class BlockManager(
data: BlockData,
level: StorageLevel,
classTag: ClassTag[_],
existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
existingReplicas: Set[BlockManagerId] = Set.empty,
maxReplicationFailures: Option[Int] = None): Boolean = {

val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
val maxReplicationFailureCount = maxReplicationFailures.getOrElse(
conf.get(config.STORAGE_MAX_REPLICATION_FAILURE))
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
Expand All @@ -1617,7 +1629,7 @@ private[spark] class BlockManager(
blockId,
numPeersToReplicateTo)

while(numFailures <= maxReplicationFailures &&
while(numFailures <= maxReplicationFailureCount &&
!peersForReplication.isEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
val peer = peersForReplication.head
Expand Down Expand Up @@ -1665,9 +1677,11 @@ private[spark] class BlockManager(
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
return false
}

logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
return true
}

/**
Expand Down Expand Up @@ -1761,6 +1775,58 @@ private[spark] class BlockManager(
blocksToRemove.size
}

def decommissionBlockManager(): Unit = {
if (!blockManagerDecommissioning) {
logInfo("Starting block manager decommissioning process")
blockManagerDecommissioning = true
decommissionManager = Some(new BlockManagerDecommissionManager(conf))
decommissionManager.foreach(_.start())
} else {
logDebug("Block manager already in decommissioning state")
}
}

/**
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
* Visible for testing
*/
def decommissionRddCacheBlocks(): Unit = {
val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId)

if (replicateBlocksInfo.nonEmpty) {
logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
"for block manager decommissioning")
}

// Maximum number of storage replication failure which replicateBlock can handle
val maxReplicationFailures = conf.get(
config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)

// TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
// so that we end up prioritize them over each other
val blocksFailedReplication = ThreadUtils.parmap(
replicateBlocksInfo, "decommissionRddCacheBlocks", 4) {
case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
val replicatedSuccessfully = replicateBlock(
blockId,
existingReplicas.toSet,
maxReplicas,
maxReplicationFailures = Some(maxReplicationFailures))
if (replicatedSuccessfully) {
logInfo(s"Block $blockId offloaded successfully, Removing block now")
removeBlock(blockId)
logInfo(s"Block $blockId removed")
} else {
logWarning(s"Failed to offload block $blockId")
}
(blockId, replicatedSuccessfully)
}.filterNot(_._2).map(_._1)
if (blocksFailedReplication.nonEmpty) {
logWarning("Blocks failed replication in cache decommissioning " +
s"process: ${blocksFailedReplication.mkString(",")}")
}
}

/**
* Remove all blocks belonging to the given broadcast.
*/
Expand Down Expand Up @@ -1829,7 +1895,52 @@ private[spark] class BlockManager(
data.dispose()
}

/**
* Class to handle block manager decommissioning retries
* It creates a Thread to retry offloading all RDD cache blocks
*/
private class BlockManagerDecommissionManager(conf: SparkConf) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a wrapped manager class? It seems overkill to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first part I'm ambivalent, but given that we also want to migrate shuffle blocks after I think having a manager is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should implement it step by step and could always do refactor later. Or, we should at least add a todo ticket to explain why we need this and what we plan to do next. Otherwise, I am really -1 on this kind of change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are two conversations I want to have about this with you @Ngone51 now to make sure I'm understanding what you're trying to express.

There already is a second follow up PR that extends the BlockManagerDecommissionManager already exists, so I'm not sure I agree with your reasoning. If it was only just for some possible future implementation that didn't already exist I'd be more inclined to simplify. Maybe you can take a look at https://issues.apache.org/jira/browse/SPARK-20624 , https://issues.apache.org/jira/browse/SPARK-20629 and #28331 for context.

I want to understand your -1 here because that has some pretty strong meanings in the context of a code change. A -1 is generally viewed as expressing a veto, which I don't believe you have in the project (of course I was out for a month in the hospital last year so if you do please let point me to thread). Even if you don't have a veto in the project is it your intention to say that if you did have a veto you would block this code change? A veto is generally a very strong expression, and I'm worried I'm not understanding your reasoning since this seems like a relatively minor issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I understand text-only communication can have more misunderstandings, if you want to find a time this week when we're both free to jump on a call to clarify this (and we can write back our understanding here so it's recorded for people to understand what we talked about), I'd be more than happy to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a new reviewer (e.g. me) on a big topic, it's not always possible to know every detail(even worse, when there's no design doc). So it's the author's responsibility to give more context. For example, leaving todo JIRA tickets in the code comment or reply to give more information. But without sufficient context here, I really think "this change", wrapping a manager around a thread, doesn't make sense to me.

As for "-1", it really represents my personal opinion. I should say "I don't like this change" if "-1" means a lot for the community.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a reviewer it’s expected that you would read the issue before asking for a follow up issue in a blocking manner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I did. But I still don't get it and I think it' not always possible that a reviewer could know the sub-issue is mean to be a follow up for some specific codes without design document/code comments around here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if you look at the parent issue you can see there is another sub issue that says migrate shuffle blocks. It’s ok to ask for a follow up even if there is one (we all miss things in reading), but attempt to vote a -1 has a higher bar than just asking for something.

@volatile private var stopped = false
private val blockReplicationThread = new Thread {
override def run(): Unit = {
while (blockManagerDecommissioning && !stopped) {
try {
logDebug("Attempting to replicate all cached RDD blocks")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add attempt number to the log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decommissionRddCacheBlocks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to set stop=true here?

Or you mean we need to do multiple time decommissionRddCacheBlocks? If so, why we need to do it for multiple times? There should be no rdd blocks change after decommissioning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't set stop=true here because we loop through this multiple times. It is possible that not all blocks will replicate in the first iteration and also possible that more blocks are stored while were decommissioning (e.g. any existing running tasks which have a persist).

logInfo("Attempt to replicate all cached blocks done")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempt number?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d say fine to do in a follow up but if we want to add the attempt number here go for it (but I won’t hold off on merging for that).

val sleepInterval = conf.get(
config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
Thread.sleep(sleepInterval)
} catch {
case _: InterruptedException =>
// no-op
case NonFatal(e) =>
logError("Error occurred while trying to " +
"replicate cached RDD blocks for block manager decommissioning", e)
}
}
}
}
blockReplicationThread.setDaemon(true)
blockReplicationThread.setName("block-replication-thread")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Runnable for the decommissioning and ThreadUtils to execute the Runnable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at our code we seem to be roughly split on Thread versus Runnable usage. I think Runnable would make more sense if we were submitting this to an execution pool, but since we have a single thread and there is no reason to scale up the number of threads I don't see the need for that change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always use ThreadUtils.newDaemonSingleThreadExecutor for the single runnable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grep -r "new Thread" ./core/src/main | wc -l returns 36
grep -r ThreadUtils.newDaemonSingleThreadExecutor ./core/src/main |wc -l returns 4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah..that's good point but just wondering how many of them are chosen after realizing ThreadUtils.newDaemonSingleThreadExecutor.

BTW, you'd better grep "new Thread(" to exclude ThreadLocal declaration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even that returns 36 in core.


def start(): Unit = {
logInfo("Starting block replication thread")
blockReplicationThread.start()
}

def stop(): Unit = {
if (!stopped) {
stopped = true
logInfo("Stopping block replication thread")
blockReplicationThread.interrupt()
blockReplicationThread.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is joining the thread here might block if the interrupt didn't work as we want it to, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk This entire BlockManagerDecommissionManager#stop() method is called by decommissionManager.foreach(_.stop()) inside BlockManager#stop().

decommissionManager is None by default (as storage decommissioning feature is behind a config and by default disabled). So this code shouldn't event trigger.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but when it is turned on (in your test case) this might keep the worker process from exiting when we ask it to stop?

Copy link
Contributor Author

@prakharjain09 prakharjain09 Apr 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Yeah - But all the tests that are failing in jenkins build are not the ones written in this PR. So that means all those tests must be running with storage-decommissioning-flag disabled?

  1. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121988/testReport/
  2. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121917/testReport/

Is the same Spark application going to get used across all these tests? I was assuming that new SparkApp will be created and destroyed for my specific tests (as BlockManagerDecommissionSuite creates new SparkContext as part of test).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it's hanging with remaining alive workers. Try taking this out and see if it makes a difference.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is joining the thread here might block if the interrupt didn't work as we want it to, what do you think?

I think the initial concern is regarding join waiting infinitely as the "stop" is not percolated deep enough to stop early.... May be it needs to be refactored?

}
}
}

def stop(): Unit = {
decommissionManager.foreach(_.stop())
blockTransferService.close()
if (blockStoreClient ne blockTransferService) {
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ class BlockManagerMaster(
logInfo("Removed " + execId + " successfully in removeExecutor")
}

/** Decommission block managers corresponding to given set of executors */
def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
}

/** Get Replication Info for all the RDD blocks stored in given blockManagerId */
def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId))
}

/** Request removal of a dead executor from the driver endpoint.
* This is only called on the driver side. Non-blocking
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class BlockManagerMasterEndpoint(
// Mapping from executor ID to block manager ID.
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

// Set of block managers which are decommissioning
private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId]

// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

Expand Down Expand Up @@ -153,6 +156,13 @@ class BlockManagerMasterEndpoint(
removeExecutor(execId)
context.reply(true)

case DecommissionBlockManagers(executorIds) =>
decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get))
context.reply(true)

case GetReplicateInfoForRDDBlocks(blockManagerId) =>
context.reply(getReplicateInfoForRDDBlocks(blockManagerId))

case StopBlockManagerMaster =>
context.reply(true)
stop()
Expand Down Expand Up @@ -257,6 +267,7 @@ class BlockManagerMasterEndpoint(

// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId
decommissioningBlockManagerSet.remove(blockManagerId)

// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
Expand Down Expand Up @@ -299,6 +310,39 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}

/**
* Decommission the given Seq of blockmanagers
* - Adds these block managers to decommissioningBlockManagerSet Set
* - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]]
*/
def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = {
val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet)
val futures = newBlockManagersToDecommission.map { blockManagerId =>
decommissioningBlockManagerSet.add(blockManagerId)
val info = blockManagerInfo(blockManagerId)
info.slaveEndpoint.ask[Unit](DecommissionBlockManager)
}
Future.sequence{ futures.toSeq }
}

/**
* Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId
* @param blockManagerId - block manager id for which ReplicateBlock info is needed
* @return Seq of ReplicateBlock
*/
private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
val info = blockManagerInfo(blockManagerId)

val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
rddBlocks.map { blockId =>
val currentBlockLocations = blockLocations.get(blockId)
val maxReplicas = currentBlockLocations.size + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some comments to explain why we need "+1"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that we’re decommissioning here makes this self evident

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method itself does not declare that it's used for decommissioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable then to add a comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this an interface/trait that is implemented by some entity, which holistically decides the location for replication?
I suggest this model, which will help to plugin code which YARN based cluster can extend based on node decommissioning(this PR deals with executor decommissioning) or some strategy to equally distribute the other block managers or external storage etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the logic exists inside of blockReplicationPolicy so that would be the place to explore that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be plugable, so if you wanted to specify your own policy you could specify spark.storage.replication.policy

Comment on lines +339 to +341
Copy link
Member

@Ngone51 Ngone51 May 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, there's no need to do replication if remainingLocations is equal to currentBlockLocations after filtering the blockManagerId, which means we've already successfully done decommission for the block in previous decommission round.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we currently remove the block on successful decommissioning. But it's possible it somehow is sufficiently replicated we don't need to do anything so I've added this to https://issues.apache.org/jira/browse/SPARK-31555 for tracking.

replicateMsg
}.toSeq
}

// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlockFromWorkers(blockId: BlockId): Unit = {
Expand Down Expand Up @@ -536,7 +580,11 @@ class BlockManagerMasterEndpoint(
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
blockManagerIds
.filterNot { _.isDriver }
.filterNot { _ == blockManagerId }
.diff(decommissioningBlockManagerSet)
.toSeq
} else {
Seq.empty
}
Expand Down
Loading