Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
array = readLongArray(readBuffer, length)
val pageLength = readLong().toInt
page = readLongArray(readBuffer, pageLength)
// Set cursor because cursor is used in write function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe: Restore cursor variable to make this map able to be serialized again on executors?

cursor = pageLength * 8 + Platform.LONG_ARRAY_OFFSET
}

override def readExternal(in: ObjectInput): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,39 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
map.free()
}

test("SPARK-24809: Serializing LongHashedRelation in executor may result in data error") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have an end-to-end test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this UT can cover the case I had met.
End-to-end test is too hard to structure because this case just occurs when executor's memory is not enough to hold the block and the broadcast cache is removed by the garbage collector.

val unsafeProj = UnsafeProjection.create(Array[DataType](LongType))
val originalMap = new LongToUnsafeRowMap(mm, 1)

val key1 = 1L
val value1 = new Random().nextLong()

val key2 = 2L
val value2 = new Random().nextLong()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to use Random here? Can we use two arbitrary long values?


originalMap.append(key1, unsafeProj(InternalRow(value1)))
originalMap.append(key2, unsafeProj(InternalRow(value2)))
originalMap.optimize()

val resultRow = new UnsafeRow(1)
assert(originalMap.getValue(key1, resultRow).getLong(0) === value1)
assert(originalMap.getValue(key2, resultRow).getLong(0) === value2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to test LongToUnsafeRowMap's normal feature here. We just need to verify the map after two ser/de can work normally.


val ser = new KryoSerializer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can write sparkContext.env.serializer.newInstance()

(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()

val mapSerializedInDriver = ser.deserialize[LongToUnsafeRowMap](ser.serialize(originalMap))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

// Simulate serialize/deserialize twice on driver and executor
val firstTimeSerialized = ...
val secondTimeSerialized = ...

val mapSerializedInExecutor =
ser.deserialize[LongToUnsafeRowMap](ser.serialize(mapSerializedInDriver))

assert(mapSerializedInExecutor.getValue(key1, resultRow).getLong(0) === value1)
assert(mapSerializedInExecutor.getValue(key2, resultRow).getLong(0) === value2)

originalMap.free()
mapSerializedInDriver.free()
mapSerializedInExecutor.free()
}

test("Spark-14521") {
val ser = new KryoSerializer(
(new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
Expand Down