-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15355] [CORE] Proactive block replication #14412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
016ea9f
16975b6
beb9eb3
275cbea
cee8e76
212baab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1130,6 +1130,34 @@ private[spark] class BlockManager( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Called for pro-active replenishment of blocks lost due to executor failures | ||
| * | ||
| * @param blockId blockId being replicate | ||
| * @param existingReplicas existing block managers that have a replica | ||
| * @param maxReplicas maximum replicas needed | ||
| */ | ||
| def replicateBlock( | ||
| blockId: BlockId, | ||
| existingReplicas: Set[BlockManagerId], | ||
| maxReplicas: Int): Unit = { | ||
| logInfo(s"Pro-actively replicating $blockId") | ||
| blockInfoManager.lockForReading(blockId).foreach { info => | ||
| val data = doGetLocalBytes(blockId, info) | ||
| val storageLevel = StorageLevel( | ||
| useDisk = info.level.useDisk, | ||
| useMemory = info.level.useMemory, | ||
| useOffHeap = info.level.useOffHeap, | ||
| deserialized = info.level.deserialized, | ||
| replication = maxReplicas) | ||
| try { | ||
| replicate(blockId, data, storageLevel, info.classTag, existingReplicas) | ||
| } finally { | ||
| releaseLock(blockId) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Replicate block to another node. Note that this is a blocking call that returns after | ||
| * the block has been replicated. | ||
|
|
@@ -1138,7 +1166,8 @@ private[spark] class BlockManager( | |
| blockId: BlockId, | ||
| data: ChunkedByteBuffer, | ||
| level: StorageLevel, | ||
| classTag: ClassTag[_]): Unit = { | ||
| classTag: ClassTag[_], | ||
| existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { | ||
|
|
||
| val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) | ||
| val tLevel = StorageLevel( | ||
|
|
@@ -1152,20 +1181,22 @@ private[spark] class BlockManager( | |
|
|
||
| val startTime = System.nanoTime | ||
|
|
||
| var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId] | ||
| var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas | ||
| var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId] | ||
| var numFailures = 0 | ||
|
|
||
| val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_)) | ||
|
|
||
| var peersForReplication = blockReplicationPolicy.prioritize( | ||
| blockManagerId, | ||
| getPeers(false), | ||
| mutable.HashSet.empty, | ||
| initialPeers, | ||
| peersReplicatedTo, | ||
| blockId, | ||
| numPeersToReplicateTo) | ||
|
|
||
| while(numFailures <= maxReplicationFailures && | ||
| !peersForReplication.isEmpty && | ||
| peersReplicatedTo.size != numPeersToReplicateTo) { | ||
| !peersForReplication.isEmpty && | ||
| peersReplicatedTo.size < numPeersToReplicateTo) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While I think it's still valid to replace the inequality with a strictly-less-than check, but just out of curiosity, can the number of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One scenario I can think of is if an executor with the block being replicated is lost (due to say a delayed heartbeat) and joins back again. The current implementation would recognize the block manager needs to reregister and will report all blocks. The probability of this happening increases with pro-active replication, I think. |
||
| val peer = peersForReplication.head | ||
| try { | ||
| val onePeerStartTime = System.nanoTime | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ import java.util.{HashMap => JHashMap} | |
| import scala.collection.mutable | ||
| import scala.collection.JavaConverters._ | ||
| import scala.concurrent.{ExecutionContext, Future} | ||
| import scala.util.Random | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.annotation.DeveloperApi | ||
|
|
@@ -65,6 +66,8 @@ class BlockManagerMasterEndpoint( | |
| mapper | ||
| } | ||
|
|
||
| val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please document this new configuration in |
||
|
|
||
| logInfo("BlockManagerMasterEndpoint up") | ||
|
|
||
| override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { | ||
|
|
@@ -195,17 +198,38 @@ class BlockManagerMasterEndpoint( | |
|
|
||
| // Remove it from blockManagerInfo and remove all the blocks. | ||
| blockManagerInfo.remove(blockManagerId) | ||
|
|
||
| val iterator = info.blocks.keySet.iterator | ||
| while (iterator.hasNext) { | ||
| val blockId = iterator.next | ||
| val locations = blockLocations.get(blockId) | ||
| locations -= blockManagerId | ||
| // De-register the block if none of the block managers have it. Otherwise, if pro-active | ||
| // replication is enabled, and a block is either an RDD or a test block (the latter is used | ||
| // for unit testing), we send a message to a randomly chosen executor location to replicate | ||
| // the given block. Note that we ignore other block types (such as broadcast/shuffle blocks | ||
| // etc.) as replication doesn't make much sense in that context. | ||
| if (locations.size == 0) { | ||
| blockLocations.remove(blockId) | ||
| logWarning(s"No more replicas available for $blockId !") | ||
| } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) { | ||
| // As a heursitic, assume single executor failure to find out the number of replicas that | ||
| // existed before failure | ||
| val maxReplicas = locations.size + 1 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if multiple executors are removed simultaneously? Depending on the invocation sequence, is it possible for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's a tough one. So the way replication is implemented, the correct storage level is only available with one of the blocks at BlockManager layer (we don't have access to RDD that this block is a part of, so we can't extract information from there). The remaining blocks all have storage levels set to 1. So I use the locations size to get an approximation for the storage level. |
||
| val i = (new Random(blockId.hashCode)).nextInt(locations.size) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to use a fixed random seed here? Testing? Also, isn't there a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scala Random api doesn't have a choice method. And Spark Utils class has methods to shuffle, but not a random choice. |
||
| val blockLocations = locations.toSeq | ||
| val candidateBMId = blockLocations(i) | ||
| blockManagerInfo.get(candidateBMId).foreach { bm => | ||
| val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible for this list to be empty in certain corner-cases? What happens if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are at this point, there would be atleast one location with the block which will get chosen as the candidate here. |
||
| val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) | ||
| bm.slaveEndpoint.ask[Boolean](replicateMsg) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) | ||
| logInfo(s"Removing block manager $blockManagerId") | ||
|
|
||
| } | ||
|
|
||
| private def removeExecutor(execId: String) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 4 spaces (https://github.com/databricks/scala-style-guide/#spacing-and-indentation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this still needs fixing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sameeragarwal This code is being removed as a part of this PR. Code replacing this has this fixed.