Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}

// There is 8 bytes for the pointer to next value
if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) {
val needSize = cursor + 8 + row.getSizeInBytes
val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET
if (needSize > nowSize) {
val used = page.length
if (used >= (1 << 30)) {
sys.error("Can not build a HashedRelation that is larger than 8G")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to this pr though, sys.error instead of UnsupportedOperationException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. sys.error instead of UnsupportedOperationException

}
ensureAcquireMemory(used * 8L * 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doubling the size when growing is very typical, seems what you want to address is when the memory is enough for the requsted size but not enough for doubling the size. I'd suggest we should double the size most of the time, as long as there is enough memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok . Doubling the size when growing.

val newPage = new Array[Long](used * 2)
val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2)
ensureAcquireMemory(used * 8L * multiples)
Copy link
Member

@kiszk kiszk May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we move the size check into before ensureAcquireMemory()? IIUC, we have to check used * multiplies <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH` now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about shaping up this logic along with the other similar ones (spliting this func into two parts: grow/append)? e.g., UTF8StringBuilder https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java#L43

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on grow/append

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.Spliting append func into two parts: grow/append.

val newPage = new Array[Long](used * multiples)
Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET,
cursor - Platform.LONG_ARRAY_OFFSET)
page = newPage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,30 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
map.free()
}

test("LongToUnsafeRowMap with big values") {
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
0)
val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, StringType, false)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: UnsafeProjection.create(Array(StringType))

val keys = Seq(0L)
val map = new LongToUnsafeRowMap(taskMemoryManager, 1)
val bigStr = UTF8String.fromString("x" * 1024 * 1024 * 2)
Copy link
Contributor

@cloud-fan cloud-fan May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment to say, the page array is initialized with length 1 << 17, so here we need a value larger than 1 << 18, to trigger the bug

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we just do "x" * (1 << 19) here?

keys.foreach { k =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just have one key, why use loop?

map.append(k, unsafeProj(InternalRow(bigStr)))
}
map.optimize()
val row = unsafeProj(InternalRow(bigStr)).copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val resultRow = new UnsafeRow(1)

keys.foreach { k =>
assert(map.getValue(k, row) eq row)
assert(row.getUTF8String(0) === bigStr)
}
map.free()
}

test("Spark-14521") {
val ser = new KryoSerializer(
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
Expand Down