Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.types.LongType
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.util.{KnownSizeEstimation, Utils}

Expand Down Expand Up @@ -557,7 +558,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
def append(key: Long, row: UnsafeRow): Unit = {
val sizeInBytes = row.getSizeInBytes
if (sizeInBytes >= (1 << SIZE_BITS)) {
sys.error("Does not support row that is larger than 256M")
throw new UnsupportedOperationException("Does not support row that is larger than 256M")
}

if (key < minKey) {
Expand All @@ -567,19 +568,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
maxKey = key
}

// There is 8 bytes for the pointer to next value
if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) {
val used = page.length
if (used >= (1 << 30)) {
sys.error("Can not build a HashedRelation that is larger than 8G")
}
ensureAcquireMemory(used * 8L * 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doubling the size when growing is very typical, seems what you want to address is when the memory is enough for the requsted size but not enough for doubling the size. I'd suggest we should double the size most of the time, as long as there is enough memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok . Doubling the size when growing.

val newPage = new Array[Long](used * 2)
Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
cursor - Platform.LONG_ARRAY_OFFSET)
page = newPage
freeMemory(used * 8L)
}
grow(row.getSizeInBytes)

// copy the bytes of UnsafeRow
val offset = cursor
Expand Down Expand Up @@ -615,7 +604,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
growArray()
} else if (numKeys > array.length / 2 * 0.75) {
// The fill ratio should be less than 0.75
sys.error("Cannot build HashedRelation with more than 1/3 billions unique keys")
throw new UnsupportedOperationException(
"Cannot build HashedRelation with more than 1/3 billions unique keys")
}
}
} else {
Expand All @@ -626,6 +616,25 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
}

private def grow(inputRowSize: Int): Unit = {
// There is 8 bytes for the pointer to next value
val neededNumWords = (cursor - Platform.LONG_ARRAY_OFFSET + 8 + inputRowSize + 7) / 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget the comment for the 8 bytes pointer

if (neededNumWords > page.length) {
if (neededNumWords > (1 << 30)) {
throw new UnsupportedOperationException(
"Can not build a HashedRelation that is larger than 8G")
}
val newNumWords = math.max(neededNumWords, math.min(page.length * 2, 1 << 30))
ensureAcquireMemory(newNumWords * 8L)
val newPage = new Array[Long](newNumWords.toInt)
Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
cursor - Platform.LONG_ARRAY_OFFSET)
val used = page.length
page = newPage
freeMemory(used * 8L)
}
}

private def growArray(): Unit = {
var old_array = array
val n = array.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,30 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
map.free()
}

test("LongToUnsafeRowMap with big values") {
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
0)
val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, StringType, false)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: UnsafeProjection.create(Array(StringType))

val keys = Seq(0L)
val map = new LongToUnsafeRowMap(taskMemoryManager, 1)
val bigStr = UTF8String.fromString("x" * 1024 * 1024 * 2)
Copy link
Contributor

@cloud-fan cloud-fan May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment to say, the page array is initialized with length 1 << 17, so here we need a value larger than 1 << 18, to trigger the bug

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we just do "x" * (1 << 19) here?

keys.foreach { k =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just have one key, why use loop?

map.append(k, unsafeProj(InternalRow(bigStr)))
}
map.optimize()
val row = unsafeProj(InternalRow(bigStr)).copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val resultRow = new UnsafeRow(1)

keys.foreach { k =>
assert(map.getValue(k, row) eq row)
assert(row.getUTF8String(0) === bigStr)
}
map.free()
}

test("Spark-14521") {
val ser = new KryoSerializer(
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
Expand Down