|
18 | 18 | package org.apache.spark.broadcast |
19 | 19 |
|
20 | 20 | import java.io._ |
21 | | -import java.lang.ref.WeakReference |
| 21 | +import java.lang.ref.SoftReference |
22 | 22 | import java.nio.ByteBuffer |
23 | 23 | import java.util.zip.Adler32 |
24 | 24 |
|
@@ -63,10 +63,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) |
63 | 63 | * which builds this value by reading blocks from the driver and/or other executors. |
64 | 64 | * |
65 | 65 | * On the driver, if the value is required, it is read lazily from the block manager. We hold |
66 | | - * a weak reference so that it can be garbage collected if required, as we can always reconstruct |
| 66 | + * a soft reference so that it can be garbage collected if required, as we can always reconstruct |
67 | 67 | * in the future. |
68 | 68 | */ |
69 | | - @transient private var _value: WeakReference[T] = _ |
| 69 | + @transient private var _value: SoftReference[T] = _ |
70 | 70 |
|
71 | 71 | /** The compression codec to use, or None if compression is disabled */ |
72 | 72 | @transient private var compressionCodec: Option[CompressionCodec] = _ |
@@ -95,13 +95,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) |
95 | 95 | /** The checksum for all the blocks. */ |
96 | 96 | private var checksums: Array[Int] = _ |
97 | 97 |
|
98 | | - override protected def getValue() = { |
| 98 | + override protected def getValue() = synchronized { |
99 | 99 | val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get |
100 | 100 | if (memoized != null) { |
101 | 101 | memoized |
102 | 102 | } else { |
103 | 103 | val newlyRead = readBroadcastBlock() |
104 | | - _value = new WeakReference[T](newlyRead) |
| 104 | + _value = new SoftReference[T](newlyRead) |
105 | 105 | newlyRead |
106 | 106 | } |
107 | 107 | } |
|
0 commit comments