-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22033][CORE] BufferHolder, other size checks should account for the specific VM array size limitations #19266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,11 +30,15 @@ public interface HashMapGrowthStrategy { | |
| HashMapGrowthStrategy DOUBLING = new Doubling(); | ||
|
|
||
| class Doubling implements HashMapGrowthStrategy { | ||
|
|
||
| private static final int ARRAY_MAX = Integer.MAX_VALUE - 8; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe worth adding a comment why this value is chosen as the max |
||
|
|
||
| @Override | ||
| public int nextCapacity(int currentCapacity) { | ||
| assert (currentCapacity > 0); | ||
| int doubleCapacity = currentCapacity * 2; | ||
| // Guard against overflow | ||
| return (currentCapacity * 2 > 0) ? (currentCapacity * 2) : Integer.MAX_VALUE; | ||
| return (doubleCapacity > 0 && doubleCapacity <= ARRAY_MAX) ? doubleCapacity : ARRAY_MAX; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -126,22 +126,20 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable | |
|
|
||
| /** Increase our size to newSize and grow the backing array if needed. */ | ||
| private def growToSize(newSize: Int): Unit = { | ||
| if (newSize < 0) { | ||
| throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements") | ||
| val arrayMax = Int.MaxValue - 8 | ||
| if (newSize < 0 || newSize - 2 > arrayMax) { | ||
| throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements") | ||
| } | ||
| val capacity = if (otherElements != null) otherElements.length + 2 else 2 | ||
| if (newSize > capacity) { | ||
| var newArrayLen = 8 | ||
| var newArrayLen = 8L | ||
| while (newSize - 2 > newArrayLen) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, I see that it's reserved, wasn't clear to me why |
||
| newArrayLen *= 2 | ||
| if (newArrayLen == Int.MinValue) { | ||
| // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue. | ||
| // Note that we set the new array length to Int.MaxValue - 2 so that our capacity | ||
| // calculation above still gives a positive integer. | ||
| newArrayLen = Int.MaxValue - 2 | ||
| } | ||
| } | ||
| val newArray = new Array[T](newArrayLen) | ||
| if (newArrayLen > arrayMax) { | ||
| newArrayLen = arrayMax | ||
| } | ||
| val newArray = new Array[T](newArrayLen.toInt) | ||
| if (otherElements != null) { | ||
| System.arraycopy(otherElements, 0, newArray, 0, otherElements.length) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,7 +59,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) | |
| throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements") | ||
| } | ||
| val newCapacity = | ||
| if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow | ||
| if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow | ||
| MAXIMUM_CAPACITY | ||
| } else { | ||
| capacity * 2 | ||
|
|
@@ -96,5 +96,5 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) | |
| } | ||
|
|
||
| private object PartitionedPairBuffer { | ||
| val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1 | ||
| val MAXIMUM_CAPACITY = (Int.MaxValue - 8) / 2 | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -344,7 +344,7 @@ class Word2Vec extends Serializable with Logging { | |
| val newSentences = sentences.repartition(numPartitions).cache() | ||
| val initRandom = new XORShiftRandom(seed) | ||
|
|
||
| if (vocabSize.toLong * vectorSize >= Int.MaxValue) { | ||
| if (vocabSize.toLong * vectorSize >= Int.MaxValue - 8) { | ||
|
||
| throw new RuntimeException("Please increase minCount or decrease vectorSize in Word2Vec" + | ||
| " to avoid an OOM. You are highly recommended to make your vocabSize*vectorSize, " + | ||
| "which is " + vocabSize + "*" + vectorSize + " for now, less than `Int.MaxValue`.") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -304,8 +304,8 @@ class BlockMatrix @Since("1.3.0") ( | |
| s"Int.MaxValue. Currently numRows: ${numRows()}") | ||
| require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " + | ||
| s"Int.MaxValue. Currently numCols: ${numCols()}") | ||
| require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " + | ||
| s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}") | ||
| require(numRows() * numCols() < Int.MaxValue - 8, "The length of the values array must be " + | ||
|
||
| s"less than ${Int.MaxValue - 8}. Currently numRows * numCols: ${numRows() * numCols()}") | ||
| val m = numRows().toInt | ||
| val n = numCols().toInt | ||
| val mem = m * n / 125000 | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert memory.size() <= (long) (Integer.MAX_VALUE - 8) * 8There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the same assert below?
spark/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
Line 53 in c66d64b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe also add the exact number in the error message instead of
2.1 billionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure whether the JVM array size limit applied here, because this represents a non-native array. Still wanted to fix the comment as I saw it. If an array exists, its length is valid, so didn't think that part of MemoryBlock represented an issue.