Skip to content

Commit 01fd8be

Browse files
committed
Store WeakReference for serializedOnly broadcast variables
1 parent 75ab18e commit 01fd8be

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.broadcast
1919

2020
import java.io._
21-
import java.lang.ref.SoftReference
21+
import java.lang.ref.{Reference, SoftReference, WeakReference}
2222
import java.nio.ByteBuffer
2323
import java.util.zip.Adler32
2424

@@ -65,9 +65,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
6565
*
6666
* On the driver, if the value is required, it is read lazily from the block manager. We hold
6767
* a soft reference so that it can be garbage collected if required, as we can always reconstruct
68-
* in the future.
68+
* in the future. For internal broadcast variables where `serializedOnly = true`, we hold a
69+
* WeakReference to allow the value to be reclaimed more aggressively.
6970
*/
70-
@transient private var _value: SoftReference[T] = _
71+
@transient private var _value: Reference[T] = _
7172

7273
/** The compression codec to use, or None if compression is disabled */
7374
@transient private var compressionCodec: Option[CompressionCodec] = _
@@ -106,7 +107,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
106107
memoized
107108
} else {
108109
val newlyRead = readBroadcastBlock()
109-
_value = new SoftReference[T](newlyRead)
110+
_value = if (serializedOnly) {
111+
new WeakReference[T](newlyRead)
112+
} else {
113+
new SoftReference[T](newlyRead)
114+
}
110115
newlyRead
111116
}
112117
}
@@ -140,9 +145,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, serializedO
140145
// skipping the store reduces driver memory pressure because we don't add a long-lived
141146
// reference to the broadcasted object. However, this optimization cannot be applied for
142147
// local mode (since tasks might run on the driver). To guard against performance
143-
// regressions if an internal broadcast is accessed on the driver, we store a soft
148+
// regressions if an internal broadcast is accessed on the driver, we store a weak
144149
// reference to the broadcasted value:
145-
_value = new SoftReference[T](value)
150+
_value = new WeakReference[T](value)
146151
} else {
147152
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
148153
// do not create a duplicate copy of the broadcast variable's value.

0 commit comments

Comments
 (0)