Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1130,15 +1130,48 @@ private[spark] class BlockManager(
}
}

/**
* Called for pro-active replenishment of blocks lost due to executor failures
*
* @param blockId blockId being replicate
* @param replicas existing block managers that have a replica
* @param maxReps maximum replicas needed
* @return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can omit this @return since this method doesn't have a return value.

*/
def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], maxReps: Int): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like this for better readability?

def replicateBlock(blockId: BlockId, existingReplicas: Set[BlockManagerId], maxReplicas: Int)

Also, is there a reason this returns a Boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to return a boolean. Changing the return type to Unit. Also changing the variable names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this still needs fixing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sameeragarwal This code is being removed as a part of this PR. Code replacing this has this fixed.

logInfo(s"Pro-actively replicating $blockId")
val infoForReplication = blockInfoManager.lockForReading(blockId).map { info =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call acquires a read lock on the block, but when is that lock released? Per the Scaladoc of doGetLocalBytes, you need to be holding a read lock before calling that method, but upon successful return from that method the read lock will still be held by the caller.

I think what you want to do is acquire the lock, immediately call doGetLocalBytes, then begin a try-finally statement to call replicate() and unlock / release the lock in the finally block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I don't think there's a need to have separate .map and .foreach calls over the option. Instead, I think it would be clearer to avoid the assignment to the infoForReplication variable and just perform all of the work inside of a .foreach call on the Option with the block info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Can we also assert that all locks are released somewhere in testProactiveReplication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can set spark.storage.exceptionOnPinLeak to true in SparkConf to do this.

val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
info.level.useDisk,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, but a problem with the StorageLevel constructor is that it has a bunch of adjacent boolean parameters, so in such cases I'd usually prefer to name all of the parameters explicitly at the call site in order to avoid errors should these lines get permuted / to convince readers that the API is being used correctly.

Thus I'd probably write each line like useDisk = info.level.useDisk, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

info.level.useMemory,
info.level.useOffHeap,
info.level.deserialized,
maxReps)
(data, storageLevel, info.classTag)
}
infoForReplication.foreach { case (data, storageLevel, classTag) =>
replicate(blockId, data, storageLevel, classTag, replicas)
}
true
}

/**
* Replicate block to another node. Note that this is a blocking call that returns after
* the block has been replicated.
*
* @param blockId
* @param data
* @param level
* @param classTag
* @param existingReplicas
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's either document these params or just remove them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing these.

*/
private def replicate(
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_],
existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {

val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val tLevel = StorageLevel(
Expand All @@ -1152,20 +1185,25 @@ private[spark] class BlockManager(

val startTime = System.nanoTime

var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0

val initialPeers = {
Copy link
Member

@sameeragarwal sameeragarwal Jan 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this not be just:

val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val peers = getPeers(false)
if(existingReplicas.isEmpty) peers else peers.filter(!existingReplicas.contains(_))
}

var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
getPeers(false),
mutable.HashSet.empty,
initialPeers,
peersReplicatedTo,
blockId,
numPeersToReplicateTo)

while(numFailures <= maxReplicationFailures &&
!peersForReplication.isEmpty &&
peersReplicatedTo.size != numPeersToReplicateTo) {
!peersForReplication.isEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think it's still valid to replace the inequality with a strictly-less-than check, but just out of curiosity, can the number of peersReplicatedTo ever exceed numPeersToReplicateTo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One scenario I can think of is if an executor with the block being replicated is lost (due to say a delayed heartbeat) and joins back again. The current implementation would recognize the block manager needs to reregister and will report all blocks. The probability of this happening increases with pro-active replication, I think.

val peer = peersForReplication.head
try {
val onePeerStartTime = System.nanoTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
}

private def removeBlockManager(blockManagerId: BlockManagerId) {
val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can probably move this out of the function block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed from the function.


val info = blockManagerInfo(blockManagerId)

// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId
Copy link
Member

@sameeragarwal sameeragarwal Jan 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you move this at the end (i.e., after replicating the blocks and updating blockLocations)?


// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.size == 0) {
blockLocations.remove(blockId)
logWarning(s"No more replicas available for $blockId !")
} else if ((blockId.isRDD || blockId.isInstanceOf[TestBlockId]) && proactivelyReplicate) {
// we only need to proactively replicate RDD blocks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense overall but it'd be great to add some more comments about why are we only concerned with RDD blocks and not others.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, might be nicer to make proactivelyReplicate the first check for better short-circuiting.

// we also need to replicate this behavior for test blocks for unit tests
// we send a message to a randomly chosen executor location to replicate block
// assuming single executor failure, we find out how many replicas existed before failure
val maxReplicas = locations.size + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if multiple executors are removed simultaneously? Depending on the invocation sequence, is it possible for maxReplicas to be significantly less than the original number of replicas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a tough one. So the way replication is implemented, the correct storage level is only available with one of the blocks at BlockManager layer (we don't have access to RDD that this block is a part of, so we can't extract information from there). The remaining blocks all have storage levels set to 1. So I use the locations size to get an approximation for the storage level.


val i = (new Random(blockId.hashCode)).nextInt(locations.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use a fixed random seed here? Testing?

Also, isn't there a Random.choice() that you can use for this? Or a method like that in our own Utils class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala Random api doesn't have a choice method. And Spark Utils class has methods to shuffle, but not a random choice.

val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i)
val blockManager = blockManagerInfo.get(candidateBMId)
if(blockManager.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: space after if.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're not going to have an else branch here then you might as well just forEach over the result of blockManagerInfo.get(candidateBMId).

val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for this list to be empty in certain corner-cases? What happens if ReplicateBlock is called with an empty set of locations? Is it just a no-op in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are at this point, there would be atleast one location with the block which will get chosen as the candidate here. remainingLocations just tells the replication logic where other replicas are present (if any, so it can be an empty set), so it can use that info while choosing candidate executors for replication.

val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
blockManager.get.slaveEndpoint.ask[Boolean](replicateMsg)
}
}
}
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)

listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
logInfo(s"Removing block manager $blockManagerId")

}

private def removeExecutor(execId: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ private[spark] object BlockManagerMessages {
// blocks that the master knows about.
case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave

// Replicate blocks that were lost due to executor failure
case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
extends ToBlockManagerSlave

// Remove all blocks belonging to a specific RDD.
case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class BlockManagerSlaveEndpoint(

case TriggerThreadDump =>
context.reply(Utils.getThreadDump())

case ReplicateBlock(blockId, replicas, maxReplicas) =>
context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))

}

private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,31 @@ import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._

/** Testsuite that tests block replication in BlockManager */
class BlockManagerReplicationSuite extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {

private val conf = new SparkConf(false).set("spark.app.id", "test")
private var rpcEnv: RpcEnv = null
private var master: BlockManagerMaster = null
private val securityMgr = new SecurityManager(conf)
private val bcastManager = new BroadcastManager(true, conf, securityMgr)
private val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
private val shuffleManager = new SortShuffleManager(conf)
trait BlockManagerReplicationBehavior extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {

val conf: SparkConf
protected var rpcEnv: RpcEnv = null
protected var master: BlockManagerMaster = null
protected lazy val securityMgr = new SecurityManager(conf)
protected lazy val bcastManager = new BroadcastManager(true, conf, securityMgr)
protected lazy val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
protected lazy val shuffleManager = new SortShuffleManager(conf)

// List of block manager created during an unit test, so that all of the them can be stopped
// after the unit test.
private val allStores = new ArrayBuffer[BlockManager]
protected val allStores = new ArrayBuffer[BlockManager]

// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer", "1m")
private val serializer = new KryoSerializer(conf)

protected lazy val serializer = new KryoSerializer(conf)

// Implicitly convert strings to BlockIds for test clarity.
private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)

private def makeBlockManager(
protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
Expand Down Expand Up @@ -355,7 +354,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
* is correct. Then it also drops the block from memory of each store (using LRU) and
* again checks whether the master's knowledge gets updated.
*/
private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
protected def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
import org.apache.spark.storage.StorageLevel._

assert(maxReplication > 1,
Expand Down Expand Up @@ -448,3 +447,52 @@ class BlockManagerReplicationSuite extends SparkFunSuite
}
}
}

class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior {
val conf = new SparkConf(false).set("spark.app.id", "test")
conf.set("spark.kryoserializer.buffer", "1m")
}

class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior {
val conf = new SparkConf(false).set("spark.app.id", "test")
conf.set("spark.kryoserializer.buffer", "1m")
conf.set("spark.storage.replication.proactive", "true")

(2 to 5).foreach{ i =>
test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
testProactiveReplication(i)
}
}

def testProactiveReplication(replicationFactor: Int) {
val blockSize = 1000
val storeSize = 10000
val initialStores = (1 to 10).map { i => makeBlockManager(storeSize, s"store$i") }

val blockId = "a1"

val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)

val blockLocations = master.getLocations(blockId)
logInfo(s"Initial locations : $blockLocations")

assert(blockLocations.size === replicationFactor)

// remove a random blockManager
val executorsToRemove = blockLocations.take(replicationFactor - 1)
logInfo(s"Removing $executorsToRemove")
executorsToRemove.foreach{exec =>
master.removeExecutor(exec.executorId)
// giving enough time for replication to happen and new block be reported to master
Thread.sleep(200)
}

val newLocations = master.getLocations(blockId).toSet
logInfo(s"New locations : $newLocations")
assert(newLocations.size === replicationFactor)
// there should only be one common block manager between initial and new locations
assert(newLocations.intersect(blockLocations.toSet).size === 1)

}
}