@@ -139,6 +139,11 @@ private[spark] class BlockManager(
139139 private val broadcastCleaner = new MetadataCleaner (
140140 MetadataCleanerType .BROADCAST_VARS , this .dropOldBroadcastBlocks, conf)
141141
142+ // Field related to peer block managers that are necessary for block replication
143+ @ volatile private var cachedPeers : Seq [BlockManagerId ] = _
144+ private val peerFetchLock = new Object
145+ private var lastPeerFetchTime = 0L
146+
142147 initialize()
143148
144149 /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -822,28 +827,111 @@ private[spark] class BlockManager(
822827 }
823828
824829 /**
825- * Replicate block to another node.
830+ * Get peer block managers in the system.
831+ */
832+ private def getPeers (forceFetch : Boolean ): Seq [BlockManagerId ] = {
833+ peerFetchLock.synchronized {
834+ val cachedPeersTtl = conf.getInt(" spark.storage.cachedPeersTtl" , 60 * 1000 ) // milliseconds
835+ val timeout = System .currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
836+ if (cachedPeers == null || forceFetch || timeout) {
837+ cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
838+ lastPeerFetchTime = System .currentTimeMillis
839+ logDebug(" Fetched peers from master: " + cachedPeers.mkString(" [" , " ," , " ]" ))
840+ }
841+ cachedPeers
842+ }
843+ }
844+
845+ /**
846+ * Replicate block to another node. Not that this is a blocking call that returns after
847+ * the block has been replicated.
826848 */
827- @ volatile var cachedPeers : Seq [BlockManagerId ] = null
828849 private def replicate (blockId : BlockId , data : ByteBuffer , level : StorageLevel ): Unit = {
850+ val maxReplicationFailures = conf.getInt(" spark.storage.maxReplicationFailures" , 1 )
851+ val numPeersToReplicateTo = level.replication - 1
852+ val peersForReplication = new ArrayBuffer [BlockManagerId ]
853+ val peersReplicatedTo = new ArrayBuffer [BlockManagerId ]
854+ val peersFailedToReplicateTo = new ArrayBuffer [BlockManagerId ]
829855 val tLevel = StorageLevel (
830856 level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1 )
831- if (cachedPeers == null ) {
832- cachedPeers = master.getPeers(blockManagerId, level.replication - 1 )
857+ val startTime = System .currentTimeMillis
858+ val random = new Random (blockId.hashCode)
859+
860+ var replicationFailed = false
861+ var failures = 0
862+ var done = false
863+
864+ // Get cached list of peers
865+ peersForReplication ++= getPeers(forceFetch = false )
866+
867+ // Get a random peer. Note that this selection of a peer is deterministic on the block id.
868+ // So assuming the list of peers does not change and no replication failures,
869+ // if there are multiple attempts in the same node to replicate the same block,
870+ // the same set of peers will be selected.
871+ def getRandomPeer (): Option [BlockManagerId ] = {
872+ // If replication had failed, then force update the cached list of peers and remove the peers
873+ // that have been already used
874+ if (replicationFailed) {
875+ peersForReplication.clear()
876+ peersForReplication ++= getPeers(forceFetch = true )
877+ peersForReplication --= peersReplicatedTo
878+ peersForReplication --= peersFailedToReplicateTo
879+ }
880+ if (! peersForReplication.isEmpty) {
881+ Some (peersForReplication(random.nextInt(peersForReplication.size)))
882+ } else {
883+ None
884+ }
833885 }
834- for (peer : BlockManagerId <- cachedPeers) {
835- val start = System .nanoTime
836- data.rewind()
837- logDebug(s " Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
838- s " To node: $peer" )
839- val putBlock = PutBlock (blockId, data, tLevel)
840- val cmId = new ConnectionManagerId (peer.host, peer.port)
841- val syncPutBlockSuccess = BlockManagerWorker .syncPutBlock(putBlock, cmId)
842- if (! syncPutBlockSuccess) {
843- logError(s " Failed to call syncPutBlock to $peer" )
886+
887+ // One by one choose a random peer and try uploading the block to it
888+ // If replication fails (e.g., target peer is down), force the list of cached peers
889+ // to be re-fetched from driver and then pick another random peer for replication. Also
890+ // temporarily black list the peer for which replication failed.
891+ //
892+ // This selection of a peer and replication is continued in a loop until one of the
893+ // following 3 conditions is fulfilled:
894+ // (i) specified number of peers have been replicated to
895+ // (ii) too many failures in replicating to peers
896+ // (iii) no peer left to replicate to
897+ //
898+ while (! done) {
899+ getRandomPeer() match {
900+ case Some (peer) =>
901+ val onePeerStartTime = System .currentTimeMillis
902+ data.rewind()
903+ logTrace(s " Trying to replicate $blockId of ${data.limit()} bytes to $peer" )
904+ val putBlock = PutBlock (blockId, data, tLevel)
905+ val cmId = new ConnectionManagerId (peer.host, peer.port)
906+ val syncPutBlockSuccess = BlockManagerWorker .syncPutBlock(putBlock, cmId)
907+ if (syncPutBlockSuccess) {
908+ logTrace(s " Replicated $blockId of ${data.limit()} bytes to $peer in %d ms "
909+ .format((System .currentTimeMillis - onePeerStartTime)))
910+ peersReplicatedTo += peer
911+ peersForReplication -= peer
912+ replicationFailed = false
913+ if (peersReplicatedTo.size == numPeersToReplicateTo) {
914+ done = true // specified number of peers have been replicated to
915+ }
916+ } else {
917+ logWarning(s " Failed to replicate $blockId to $peer, failure # $failures" )
918+ failures += 1
919+ replicationFailed = true
920+ peersFailedToReplicateTo += peer
921+ if (failures > maxReplicationFailures) { // too many failures in replicating to peers
922+ done = true
923+ }
924+ }
925+ case None => // no peer left to replicate to
926+ done = true
844927 }
845- logDebug(" Replicating BlockId %s once used %fs; The size of the data is %d bytes."
846- .format(blockId, (System .nanoTime - start) / 1e6 , data.limit()))
928+ }
929+ val timeTakeMs = (System .currentTimeMillis - startTime)
930+ logTrace(s " Replicating $blockId of ${data.limit()} bytes to " +
931+ s " ${peersReplicatedTo.size} peer(s) took $timeTakeMs ms " )
932+ if (peersReplicatedTo.size < numPeersToReplicateTo) {
933+ logWarning(s " Block $blockId replicated to only " +
934+ s " ${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers " )
847935 }
848936 }
849937
0 commit comments