Skip to content

Commit 60e0c57

Browse files
committed
Fixing issues (formatting, variable names, etc.) from review comments
1 parent 8aa31cd commit 60e0c57

6 files changed

Lines changed: 52 additions & 52 deletions

File tree

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,24 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
7272
// Persist the result, so long as the task is not running locally
7373
if (context.runningLocally) { return computedValues }
7474
if (storageLevel.useDisk && !storageLevel.useMemory) {
75+
// In the case that this RDD is to be persisted using DISK_ONLY
76+
// the iterator will be passed directly to the blockManager (rather then
77+
// caching it to an ArrayBuffer first), then the resulting block data iterator
78+
// will be passed back to the user. If the iterator generates a lot of data,
79+
// this means that it doesn't all have to be held in memory at one time.
80+
// This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
81+
// blocks aren't dropped by the block store before enabling that.
7582
blockManager.put(key, computedValues, storageLevel, tellMaster = true)
7683
return blockManager.get(key) match {
7784
case Some(values) =>
7885
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
7986
case None =>
8087
logInfo("Failure to store %s".format(key))
81-
return null
88+
throw new Exception("Block manager failed to return persisted valued")
8289
}
8390
} else {
91+
// In this case the RDD is cached to an array buffer. This will save the results
92+
// if we're dealing with a 'one-time' iterator
8493
val elements = new ArrayBuffer[Any]
8594
elements ++= computedValues
8695
blockManager.put(key, elements, storageLevel, tellMaster = true)

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ private[spark] class BlockManager(
540540
// If we're storing bytes, then initiate the replication before storing them locally.
541541
// This is faster as data is already serialized and ready to send.
542542
val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
543-
//Duplicate doesn't copy the bytes, just creates a wrapper
543+
// Duplicate doesn't copy the bytes, just creates a wrapper
544544
val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
545545
Future {
546546
replicate(blockId, bufferView, level)
@@ -559,13 +559,13 @@ private[spark] class BlockManager(
559559
// Save it just to memory first, even if it also has useDisk set to true; we will
560560
// drop it to disk later if the memory store can't hold it.
561561
val res = data match {
562-
case IteratorValues(values_i) =>
563-
memoryStore.putValues(blockId, values_i, level, true)
564-
case ArrayBufferValues(values_a) =>
565-
memoryStore.putValues(blockId, values_a, level, true)
566-
case ByteBufferValues(value_bytes) => {
567-
value_bytes.rewind();
568-
memoryStore.putBytes(blockId, value_bytes, level)
562+
case IteratorValues(iterator) =>
563+
memoryStore.putValues(blockId, iterator, level, true)
564+
case ArrayBufferValues(array) =>
565+
memoryStore.putValues(blockId, array, level, true)
566+
case ByteBufferValues(bytes) => {
567+
bytes.rewind();
568+
memoryStore.putBytes(blockId, bytes, level)
569569
}
570570
}
571571
size = res.size
@@ -579,13 +579,13 @@ private[spark] class BlockManager(
579579
val askForBytes = level.replication > 1
580580

581581
val res = data match {
582-
case IteratorValues(values_i) =>
583-
diskStore.putValues(blockId, values_i, level, askForBytes)
584-
case ArrayBufferValues(values_a) =>
585-
diskStore.putValues(blockId, values_a, level, askForBytes)
586-
case ByteBufferValues(value_bytes) => {
587-
value_bytes.rewind();
588-
diskStore.putBytes(blockId, value_bytes, level)
582+
case IteratorValues(iterator) =>
583+
diskStore.putValues(blockId, iterator, level, askForBytes)
584+
case ArrayBufferValues(array) =>
585+
diskStore.putValues(blockId, array, level, askForBytes)
586+
case ByteBufferValues(bytes) => {
587+
bytes.rewind();
588+
diskStore.putBytes(blockId, bytes, level)
589589
}
590590
}
591591
size = res.size

core/src/main/scala/org/apache/spark/storage/BlockStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
4141
returnValues: Boolean) : PutResult
4242

4343
def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
44-
returnValues: Boolean) : PutResult
44+
returnValues: Boolean) : PutResult
4545

4646
/**
4747
* Return the size of a block in bytes.

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
5656
}
5757

5858
override def putValues(
59-
blockId: BlockId,
60-
values: ArrayBuffer[Any],
61-
level: StorageLevel,
62-
returnValues: Boolean)
59+
blockId: BlockId,
60+
values: ArrayBuffer[Any],
61+
level: StorageLevel,
62+
returnValues: Boolean)
6363
: PutResult = {
6464
return putValues(blockId, values.toIterator, level, returnValues)
6565
}

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
6767
}
6868

6969
override def putValues(
70-
blockId: BlockId,
71-
values: ArrayBuffer[Any],
72-
level: StorageLevel,
73-
returnValues: Boolean)
70+
blockId: BlockId,
71+
values: ArrayBuffer[Any],
72+
level: StorageLevel,
73+
returnValues: Boolean)
7474
: PutResult = {
7575
if (level.deserialized) {
7676
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])

core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,48 +36,39 @@ class FlatmapIteratorSuite extends FunSuite with LocalSparkContext {
3636
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
3737
.setAppName("iterator_to_disk_test")
3838
sc = new SparkContext(sconf)
39-
try {
40-
val expand_size = 100
41-
val data = sc.parallelize( (1 to 5).toSeq ).
42-
flatMap( x => Stream.range(0, expand_size) )
43-
var persisted = data.persist(StorageLevel.DISK_ONLY)
44-
println(persisted.count())
45-
assert( persisted.count() == 500)
46-
assert( persisted.filter( _==1 ).count() == 5 )
47-
} catch {
48-
case _ : OutOfMemoryError => assert(false)
49-
}
39+
val expand_size = 100
40+
val data = sc.parallelize((1 to 5).toSeq).
41+
flatMap( x => Stream.range(0, expand_size))
42+
var persisted = data.persist(StorageLevel.DISK_ONLY)
43+
println(persisted.count())
44+
assert(persisted.count()===500)
45+
assert(persisted.filter(_==1).count()===5)
5046
}
5147

5248
test("Flatmap Iterator to Memory") {
5349
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
5450
.setAppName("iterator_to_disk_test")
5551
sc = new SparkContext(sconf)
56-
try {
57-
val expand_size = 100
58-
val data = sc.parallelize( (1 to 5).toSeq ).
59-
flatMap( x => Stream.range(0, expand_size) )
60-
var persisted = data.persist(StorageLevel.MEMORY_ONLY)
61-
println(persisted.count())
62-
assert( persisted.count() == 500)
63-
assert( persisted.filter( _==1 ).count() == 5 )
64-
} catch {
65-
case _ : OutOfMemoryError => assert(false)
66-
}
52+
val expand_size = 100
53+
val data = sc.parallelize((1 to 5).toSeq).
54+
flatMap(x => Stream.range(0, expand_size))
55+
var persisted = data.persist(StorageLevel.MEMORY_ONLY)
56+
println(persisted.count())
57+
assert(persisted.count()===500)
58+
assert(persisted.filter(_==1).count()===5)
6759
}
6860

6961
test("Serializer Reset") {
7062
val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
7163
.setAppName("serializer_reset_test")
7264
.set("spark.serializer.objectStreamReset", "10")
73-
7465
sc = new SparkContext(sconf)
7566
val expand_size = 500
76-
val data = sc.parallelize( Seq(1,2) ).
77-
flatMap( x => Stream.range(1, expand_size).
78-
map( y => "%d: string test %d".format(y,x) ) )
67+
val data = sc.parallelize(Seq(1,2)).
68+
flatMap(x => Stream.range(1, expand_size).
69+
map(y => "%d: string test %d".format(y,x)))
7970
var persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
80-
assert( persisted.filter( _.startsWith("1:") ).count() == 2 )
71+
assert(persisted.filter(_.startsWith("1:")).count()===2)
8172
}
8273

8374
}

0 commit comments

Comments
 (0)