Skip to content

Commit a8eddfe

Browse files
committed
[SPARK-1912] Lazily initialize buffers for local shuffle blocks.
This is a simplified fix for SPARK-1912.
1 parent 3901245 commit a8eddfe

2 files changed

Lines changed: 3 additions & 24 deletions

File tree

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,8 @@ object BlockFetcherIterator {
196196
// any memory that might exceed our maxBytesInFlight
197197
for (id <- localBlocksToFetch) {
198198
try {
199-
// getLocalFromDisk never return None but throws BlockException
200-
val iter = getLocalFromDisk(id, serializer).get
201-
// Pass 0 as size since it's not in flight
202199
readMetrics.localBlocksFetched += 1
203-
results.put(new FetchResult(id, 0, () => iter))
200+
results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get))
204201
logDebug("Got local block " + id)
205202
} catch {
206203
case e: Exception => {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,26 +1039,8 @@ private[spark] class BlockManager(
10391039
bytes: ByteBuffer,
10401040
serializer: Serializer = defaultSerializer): Iterator[Any] = {
10411041
bytes.rewind()
1042-
1043-
def getIterator: Iterator[Any] = {
1044-
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1045-
serializer.newInstance().deserializeStream(stream).asIterator
1046-
}
1047-
1048-
if (blockId.isShuffle) {
1049-
/* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
1050-
* at the beginning. The wrapping will cost some memory (compression instance
1051-
* initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
1052-
* wrapping lazily to save memory. */
1053-
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
1054-
lazy val proxy = f
1055-
override def hasNext: Boolean = proxy.hasNext
1056-
override def next(): Any = proxy.next()
1057-
}
1058-
new LazyProxyIterator(getIterator)
1059-
} else {
1060-
getIterator
1061-
}
1042+
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1043+
serializer.newInstance().deserializeStream(stream).asIterator
10621044
}
10631045

10641046
def stop(): Unit = {

0 commit comments

Comments
 (0)