Skip to content

Commit 6c5c1d4

Browse files
committed
read local
1 parent 17b13c5 commit 6c5c1d4

14 files changed

Lines changed: 230 additions & 107 deletions

core/src/main/scala/org/apache/spark/network/BlockDataManager.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.network
1919

2020
import org.apache.spark.network.buffer.ManagedBuffer
21-
import org.apache.spark.storage.{BlockId, StorageLevel}
21+
import org.apache.spark.storage.{BlockManagerId, BlockId, StorageLevel}
2222

2323
private[spark]
2424
trait BlockDataManager {
@@ -29,6 +29,12 @@ trait BlockDataManager {
2929
*/
3030
def getBlockData(blockId: BlockId): ManagedBuffer
3131

32+
/**
33+
* Interface to get other executor's block data as the same node as blockManagerId. Throws
34+
* an exception if the block cannot be found or cannot be read successfully.
35+
*/
36+
def getBlockData(blockId: BlockId, blockManagerId: BlockManagerId): ManagedBuffer
37+
3238
/**
3339
* Put the block locally, using the given storage level.
3440
*/

core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ class FileShuffleBlockManager(conf: SparkConf)
162162
val fileId = shuffleState.nextFileId.getAndIncrement()
163163
val files = Array.tabulate[File](numBuckets) { bucketId =>
164164
val filename = physicalFileName(shuffleId, bucketId, fileId)
165-
blockManager.diskBlockManager.getFile(filename)
165+
blockManager.diskBlockManager.getFile(filename, blockManager.blockManagerId)
166166
}
167167
val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
168168
shuffleState.allFileGroups.add(fileGroup)
@@ -180,7 +180,8 @@ class FileShuffleBlockManager(conf: SparkConf)
180180
Some(segment.nioByteBuffer())
181181
}
182182

183-
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
183+
override def getBlockData(blockId: ShuffleBlockId,
184+
blockManagerId: BlockManagerId = blockManager.blockManagerId): ManagedBuffer = {
184185
if (consolidateShuffleFiles) {
185186
// Search all file groups associated with this shuffle.
186187
val shuffleState = shuffleStates(blockId.shuffleId)

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,16 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
5252
ShuffleBlockId(shuffleId, mapId, 0)
5353
}
5454

55-
def getDataFile(shuffleId: Int, mapId: Int): File = {
56-
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
55+
def getDataFile(shuffleId: Int,
56+
mapId: Int,
57+
blockManagerId: BlockManagerId = blockManager.blockManagerId): File = {
58+
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0), blockManagerId)
5759
}
5860

59-
private def getIndexFile(shuffleId: Int, mapId: Int): File = {
60-
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
61+
private def getIndexFile(shuffleId: Int,
62+
mapId: Int,
63+
blockManagerId: BlockManagerId = blockManager.blockManagerId): File = {
64+
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0), blockManagerId)
6165
}
6266

6367
/**
@@ -101,10 +105,11 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
101105
Some(getBlockData(blockId).nioByteBuffer())
102106
}
103107

104-
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
108+
override def getBlockData(blockId: ShuffleBlockId,
109+
blockManagerId: BlockManagerId = blockManager.blockManagerId): ManagedBuffer = {
105110
// The block is actually going to be a range of a single map output file for this map, so
106111
// find out the consolidated file, then the offset within that from our index
107-
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
112+
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId, blockManagerId)
108113

109114
val in = new DataInputStream(new FileInputStream(indexFile))
110115
try {
@@ -113,7 +118,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
113118
val nextOffset = in.readLong()
114119
new FileSegmentManagedBuffer(
115120
transportConf,
116-
getDataFile(blockId.shuffleId, blockId.mapId),
121+
getDataFile(blockId.shuffleId, blockId.mapId, blockManagerId),
117122
offset,
118123
nextOffset - offset)
119124
} finally {

core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.shuffle
1919

2020
import java.nio.ByteBuffer
2121
import org.apache.spark.network.buffer.ManagedBuffer
22-
import org.apache.spark.storage.ShuffleBlockId
22+
import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId}
2323

2424
private[spark]
2525
trait ShuffleBlockManager {
@@ -31,7 +31,7 @@ trait ShuffleBlockManager {
3131
*/
3232
def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]
3333

34-
def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
34+
def getBlockData(blockId: ShuffleBlockId, blockManagerId: BlockManagerId): ManagedBuffer
3535

3636
def stop(): Unit
3737
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ private[spark] class BlockManager(
202202
blockManagerId
203203
}
204204

205-
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
205+
master.registerBlockManager(
206+
blockManagerId, maxMemory, slaveActor, diskBlockManager.getLocalDirsPath())
206207

207208
// Register Executors' configuration with the local shuffle service, if one should exist.
208209
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
@@ -265,7 +266,8 @@ private[spark] class BlockManager(
265266
def reregister(): Unit = {
266267
// TODO: We might need to rate limit re-registering.
267268
logInfo("BlockManager re-registering with master")
268-
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
269+
master.registerBlockManager(
270+
blockManagerId, maxMemory, slaveActor, diskBlockManager.getLocalDirsPath())
269271
reportAllBlocks()
270272
}
271273

@@ -295,13 +297,18 @@ private[spark] class BlockManager(
295297
}
296298
}
297299

300+
override def getBlockData(blockId: BlockId): ManagedBuffer = {
301+
getBlockData(blockId, blockManagerId)
302+
}
303+
298304
/**
299-
* Interface to get local block data. Throws an exception if the block cannot be found or
300-
* cannot be read successfully.
305+
* Interface to get other executor's block data as the same node as blockManagerId.
306+
* Throws an exception if the block cannot be found or cannot be read successfully.
301307
*/
302-
override def getBlockData(blockId: BlockId): ManagedBuffer = {
308+
override def getBlockData(blockId: BlockId, blockManagerId: BlockManagerId): ManagedBuffer = {
303309
if (blockId.isShuffle) {
304-
shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
310+
shuffleManager.shuffleBlockManager.getBlockData(
311+
blockId.asInstanceOf[ShuffleBlockId], blockManagerId)
305312
} else {
306313
val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
307314
.asInstanceOf[Option[ByteBuffer]]

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,12 @@ class BlockManagerMaster(
4646
}
4747

4848
/** Register the BlockManager's id with the driver. */
49-
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
49+
def registerBlockManager(blockManagerId: BlockManagerId,
50+
maxMemSize: Long,
51+
slaveActor: ActorRef,
52+
localDirs: Array[String]) {
5053
logInfo("Trying to register BlockManager")
51-
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
54+
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, localDirs))
5255
logInfo("Registered BlockManager")
5356
}
5457

@@ -75,6 +78,11 @@ class BlockManagerMaster(
7578
askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
7679
}
7780

81+
/** Return other blockmanager's local dirs on the same machine as blockManagerId */
82+
def getLocalDirsPath(blockManagerId: BlockManagerId): Map[BlockManagerId, Array[String]] = {
83+
askDriverWithReply[Map[BlockManagerId, Array[String]]](GetLocalDirsPath(blockManagerId))
84+
}
85+
7886
/**
7987
* Check if block manager master has a block. Note that this can be used to check for only
8088
* those blocks that are reported to block manager master.

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
5353
private val akkaTimeout = AkkaUtils.askTimeout(conf)
5454

5555
override def receiveWithLogging: PartialFunction[Any, Unit] = {
56-
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
57-
register(blockManagerId, maxMemSize, slaveActor)
56+
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, localDirsPath) =>
57+
register(blockManagerId, maxMemSize, slaveActor, localDirsPath)
5858
sender ! true
5959

6060
case UpdateBlockInfo(
@@ -77,6 +77,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
7777
case GetMemoryStatus =>
7878
sender ! memoryStatus
7979

80+
case GetLocalDirsPath(blockManagerId) =>
81+
sender ! getLocalDirsPath(blockManagerId)
82+
8083
case GetStorageStatus =>
8184
sender ! storageStatus
8285

@@ -223,6 +226,15 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
223226
}
224227
}
225228

229+
// Return local dirs of other blockmanager on the same machine as blockManagerId
230+
private def getLocalDirsPath(
231+
blockManagerId: BlockManagerId): Map[BlockManagerId, Array[String]] = {
232+
blockManagerInfo
233+
.filter { case(id, _) => (id != blockManagerId && id.host == blockManagerId.host)}
234+
.mapValues { info => info.localDirsPath }
235+
.toMap
236+
}
237+
226238
// Return a map from the block manager id to max memory and remaining memory.
227239
private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
228240
blockManagerInfo.map { case(blockManagerId, info) =>
@@ -291,7 +303,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
291303
).map(_.flatten.toSeq)
292304
}
293305

294-
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
306+
private def register(
307+
id: BlockManagerId,
308+
maxMemSize: Long,
309+
slaveActor: ActorRef, localDirsPath: Array[String]) {
295310
val time = System.currentTimeMillis()
296311
if (!blockManagerInfo.contains(id)) {
297312
blockManagerIdByExecutor.get(id.executorId) match {
@@ -308,7 +323,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
308323
blockManagerIdByExecutor(id.executorId) = id
309324

310325
blockManagerInfo(id) = new BlockManagerInfo(
311-
id, System.currentTimeMillis(), maxMemSize, slaveActor)
326+
id, System.currentTimeMillis(), maxMemSize, slaveActor, localDirsPath)
312327
}
313328
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
314329
}
@@ -320,7 +335,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
320335
memSize: Long,
321336
diskSize: Long,
322337
tachyonSize: Long): Boolean = {
323-
324338
if (!blockManagerInfo.contains(blockManagerId)) {
325339
if (blockManagerId.isDriver && !isLocal) {
326340
// We intentionally do not register the master (except in local mode),
@@ -412,7 +426,8 @@ private[spark] class BlockManagerInfo(
412426
val blockManagerId: BlockManagerId,
413427
timeMs: Long,
414428
val maxMem: Long,
415-
val slaveActor: ActorRef)
429+
val slaveActor: ActorRef,
430+
val localDirsPath: Array[String])
416431
extends Logging {
417432

418433
private var _lastSeenMs: Long = timeMs

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ private[spark] object BlockManagerMessages {
5252
case class RegisterBlockManager(
5353
blockManagerId: BlockManagerId,
5454
maxMemSize: Long,
55-
sender: ActorRef)
55+
sender: ActorRef,
56+
subDirs: Array[String])
5657
extends ToBlockManagerMaster
5758

5859
case class UpdateBlockInfo(
@@ -109,4 +110,6 @@ private[spark] object BlockManagerMessages {
109110
extends ToBlockManagerMaster
110111

111112
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
113+
114+
case class GetLocalDirsPath(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
112115
}

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import org.apache.spark.{SparkConf, Logging}
2424
import org.apache.spark.executor.ExecutorExitCode
2525
import org.apache.spark.util.Utils
2626

27+
import scala.collection.mutable
28+
2729
/**
2830
* Creates and maintains the logical mapping between logical blocks and physical on-disk
2931
* locations. By default, one block is mapped to one file with a name given by its BlockId.
@@ -51,40 +53,74 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
5153
// of subDirs(i) is protected by the lock of subDirs(i)
5254
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
5355

56+
private val localDirsByBlkMgr = new mutable.HashMap[BlockManagerId, Array[String]]
57+
58+
def getLocalDirsPath(): Array[String] = {
59+
localDirs.map(file => file.getAbsolutePath)
60+
}
61+
5462
private val shutdownHook = addShutdownHook()
5563

56-
/** Looks up a file by hashing it into one of our local subdirectories. */
57-
// This method should be kept in sync with
58-
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
59-
def getFile(filename: String): File = {
60-
// Figure out which local directory it hashes to, and which subdirectory in that
61-
val hash = Utils.nonNegativeHash(filename)
62-
val dirId = hash % localDirs.length
63-
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
64-
65-
// Create the subdirectory if it doesn't already exist
66-
val subDir = subDirs(dirId).synchronized {
67-
val old = subDirs(dirId)(subDirId)
68-
if (old != null) {
69-
old
70-
} else {
71-
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
72-
if (!newDir.exists() && !newDir.mkdir()) {
73-
throw new IOException(s"Failed to create local dir in $newDir.")
64+
def getFile(
65+
fileName: String,
66+
blockManagerId: BlockManagerId): File = {
67+
val hash = Utils.nonNegativeHash(fileName)
68+
val createDirIfAbsent =
69+
blockManagerId.executorId == blockManager.blockManagerId.executorId
70+
71+
if (createDirIfAbsent) {
72+
val dirId = hash % localDirs.length
73+
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
74+
75+
// Create the subdirectory if it doesn't already exist
76+
var subDir = subDirs(dirId)(subDirId)
77+
if (subDir == null) {
78+
subDir = subDirs(dirId).synchronized {
79+
val old = subDirs(dirId)(subDirId)
80+
if (old != null) {
81+
old
82+
} else {
83+
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
84+
if (!newDir.exists() && !newDir.mkdir()) {
85+
throw new IOException(s"Failed to create local dir in $newDir.")
86+
}
87+
subDirs(dirId)(subDirId) = newDir
88+
newDir
89+
}
90+
}
91+
}
92+
new File(subDir, fileName)
93+
} else {
94+
var tmpLocalDirs = localDirsByBlkMgr.get(blockManagerId)
95+
if (!tmpLocalDirs.isDefined) {
96+
tmpLocalDirs = localDirsByBlkMgr.synchronized {
97+
val old = localDirsByBlkMgr.get(blockManagerId)
98+
if(old.isDefined) {
99+
old
100+
} else {
101+
localDirsByBlkMgr ++= blockManager.master.getLocalDirsPath(blockManager.blockManagerId)
102+
localDirsByBlkMgr.get(blockManagerId)
103+
}
74104
}
75-
subDirs(dirId)(subDirId) = newDir
76-
newDir
77105
}
78-
}
79106

80-
new File(subDir, filename)
107+
val dirId = hash % tmpLocalDirs.get.length
108+
val subDirId = (hash / tmpLocalDirs.get.length) % subDirsPerLocalDir
109+
new File(tmpLocalDirs.get(dirId) + "/" + "%02x".format(subDirId), fileName)
110+
}
81111
}
82112

83-
def getFile(blockId: BlockId): File = getFile(blockId.name)
113+
def getFile(
114+
blockId: BlockId,
115+
blockManagerId: BlockManagerId = blockManager.blockManagerId): File = {
116+
// val getFromThisExecutor = blockManagerId == blockManager.blockManagerId
117+
// val dirs = if (getFromThisExecutor) getLocalDirsPath else localDirsByBlkMgr(blockManagerId)
118+
getFile(blockId.name, blockManagerId)
119+
}
84120

85121
/** Check if disk block manager has a block. */
86122
def containsBlock(blockId: BlockId): Boolean = {
87-
getFile(blockId.name).exists()
123+
getFile(blockId).exists()
88124
}
89125

90126
/** List all the files currently stored on disk by the disk manager. */

0 commit comments

Comments
 (0)