Skip to content

Conversation

@liuzqt
Copy link
Contributor

@liuzqt liuzqt commented Oct 1, 2022

What changes were proposed in this pull request?

Single task result must fit in 2GB, because we're using byte array or ByteBuffer(which is backed by byte array as well), and thus has a limit of 2GB(java array size limit on byte[]).

This PR is trying to fix this by replacing byte array with ChunkedByteBuffer.

Why are the changes needed?

To overcome the 2GB limit for single task.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@liuzqt liuzqt marked this pull request as ready for review October 3, 2022 18:36
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
SerializerHelper.serializeToChunkedBuffer(ser,
new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The object IndirectTaskResult should be small. Do we also want to use chunked buffer for it?

Copy link
Contributor Author

@liuzqt liuzqt Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The serializedDirectResult in line 668 is the one that might exceed 2GB and we want to replace it with ChunkedByteBuffer.

While the serializedResult here is always guaranteed to be smaller than 2GB in all cases

  • larger than maxResultSize, dropping, sending a IndirectTaskResult indication failure, which is small
  • larger than maxDirectResultSize, sending a IndirectTaskResult with the blockId, which is small
  • directly send back to driver, which is always smaller than 2GB, guarded by direct result size value check I added here

So let's just use ByteBuffer for serializedResult.

Thanks for the point!

Comment on lines 101 to 117
if (chunk.hasArray) {
// zero copy if the bytebuffer is backed by array
out.write(chunk.array(), chunk.arrayOffset(), chunk.limit())
} else {
// fallback to copy approach
if (buffer == null) {
buffer = new Array[Byte](bufferLen)
}
val originalPos = chunk.position()
chunk.rewind()
var bytesToRead = Math.min(chunk.remaining(), bufferLen)
while (bytesToRead > 0) {
chunk.get(buffer, 0, bytesToRead)
out.write(buffer, 0, bytesToRead)
bytesToRead = Math.min(chunk.remaining(), bufferLen)
}
chunk.position(originalPos)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we extract this common code block to a function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Extracted to writeBufferToDest helper function in companion object.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a quick pass, yet to go over the tests.

val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit()
val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks)
val serializedDirectResult = SerializerHelper.serializeToChunkedBuffer(ser, directResult)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the earlier invocation of serializeToChunkedBuffer (L#599) , here we have a good estimate of the size - something to leverage and minimize the cost of serializeToChunkedBuffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean we can explicitly choose the chunk size here to avoid too-small/to-large chunk?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do know valueByteBuffer.size - and we know what the serialization overhead of DirectTaskResult is.
Will give us a much tighter bound on the size.

val chunksNum = in.readInt()
val indices = 0 until chunksNum
val chunksSize = indices.map(_ => in.readInt())
val chunksDirect = indices.map(_ => in.readBoolean())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have cases where we want to preserve whether the buffer was direct or not across VM ? The current usecase does not require it ?
+CC @Ngone51 ?

If not, drop this and simplify the impl ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have cases where we want to preserve whether the buffer was direct or not across VM ?

I don't have such cases in my mind..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we default to on-heap buffer in deserialization regardless what it was before serialization? @mridulm @Ngone51

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let us do that - if some specific reason comes up in future, we can add support as required.
Will make the implementation much more simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, updated.

Copy link
Contributor

@mridulm mridulm Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are still preserving chunksDirect, remove it ? And always deserialize to heap buffer via ByteBuffer.wrap ?

@liuzqt
Copy link
Contributor Author

liuzqt commented Oct 12, 2022

Some general comments about the performance implication regarding replacing Array[Byte] and ByteBuffer(backed by Array[Byte]) with ChunkedByteBuffer:

  • when reading from stream (i.e., ByteArrayInputStream vs ChunkedByteBufferInputStream), no much differences, while ByteArrayInputStream might a little bit win in terms of cache locality because of continuous memory, but ChunkedByteBuffer won't be too bad along as the chunk is reasonable
  • when we're writing to stream(i.e., ByteArrayOutputStream vs ChunkedByteBufferOutputStream)
    • ByteArrayOutputStream start with a small buffer(32 bytes) and grow 2x exponentially, and have to do array copy every grow
    • ChunkedByteBufferOutputStream use fixed chunk size to grow(which you can specify when you create the stream), while the grow is append style instead of copy style
    • do some manual benchmark on large data, ChunkedByteBufferOutputStream is much faster, (tried different data size from 100MB to 1GB and different chunk size from 1KB to 1MB, can see at least ~2x speedup), I would attribute to array copy overhead mostly.
    • when eventually dump to ByteBuffer(or raw byte array) vs. ChunkedByteBuffer, the latter might waste some memory space in the last chunk, but not a big deal I believe. And in serialization they're the same.
    • after all, result collection is a small portion in the whole end-to-end query

tasks.foreach { task =>
val result = new DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq(), Array())
val result = new DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq(),
Array[Long]())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a required change (adding [Long]) now? Seems like it was Long already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remove [Long], will get a compile error:

overloaded method constructor DirectTaskResult with alternatives:
  (valueByteBuffer: java.nio.ByteBuffer,accumUpdates: Seq[org.apache.spark.util.AccumulatorV2[_, _]],metricPeaks: Array[Long])org.apache.spark.scheduler.DirectTaskResult[Int] <and>
  (valueByteBuffer: org.apache.spark.util.io.ChunkedByteBuffer,accumUpdates: Seq[org.apache.spark.util.AccumulatorV2[_, _]],metricPeaks: Array[Long])org.apache.spark.scheduler.DirectTaskResult[Int]
 cannot be applied to (java.nio.ByteBuffer, Seq[Nothing], Array[Nothing])
          val result = new DirectTaskResult[Int](valueSer.serialize(task.taskId), Seq(),

The compiler somehow failed to infer the type. This happened after I added the overloaded constructor in DirectTaskResult.

My guessing is that overloading has to be resolved before type constraints can be determined for each parameter, those that drive type inference but overloading resolution can't happen without knowing the types of each parameter.....I haven't figured out a way to solve this

// create data of size ~3GB in single partition, which exceeds the byte array limit
// random gen to make sure it's poorly compressed
val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 1000000) as data")
val res = df.queryExecution.executedPlan.executeCollect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this take much memory on the driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We want to test both serialization(executor side)/deserialization(driver side) handle the large data(>2GB limit) properly

@mridulm
Copy link
Contributor

mridulm commented Oct 13, 2022

@liuzqt Most task results are very small.
We will now be over-provisioning that by a few orders of magnitude when moving to ChunkedByteBufferOutputStream - while a vanishingly small set of cases hit the large buffer case.
This can potentially have an impact on memory utilization at executor, and if possible look at ways to mitigate - particularly, for example, when we have a good estimate of the output size.

This is not to say I have serious concerns (we do use ChunkedByteBufferOutputStream with precisely that size everywhere else !) - but it is not without tradeoff.

private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) extends Externalizable {
require(chunks != null, "chunks must not be null")
require(!chunks.contains(null), "chunks must not contain null")
Copy link
Contributor

@mridulm mridulm Oct 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next line should have implicitly checked for it - was this getting triggered ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ChunkeByteBuffer has a constructor overload def this(byteBuffer: ByteBuffer), which is invoked with null in the DirectTaskResult empty constructor.

It was triggered in some temp code snippet during my dev, but should not have any usage in the current code base(at least can not be found explicitly). That empty constructor might serve as default constructor for some serialization code I guess.......After all, the code in ChunkeByteBuffer are assuming chunks doesn't contains null, so I think it's no harm to do this check at the beginning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is another iteration over chunks as part of construction. Not a big deal, but I was looking to minimize impact of the change.

chunk.get(buffer, 0, bytesToRead)
out.write(buffer, 0, bytesToRead)
bytesToRead = Math.min(chunk.remaining(), bufferLen)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Pull this out into a separate method ?
I will need to search our codebase, but I would have expected this snippet to be already there somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be... but I was not able to locate them....not very familiar with the code base. Feel free to let me know if you find them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is Utils.writeByteBuffer which writes a buffer to an OutputStream.
It does not do a very good job, since it allocates a buffer the same size as remaining - instead, we should enhance it to do what this method is doing.

Additionally, we can use a ThreadLocal[Array[Byte]] in Utils for use with this copy (the buffer here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to reuse Utils.writeByteBuffer, and I noticed that there're two versions of Utils.writeByteBuffer for OutputStream and DataOutput respectively and the code is identical so added a Utils.writeByteBufferImpl to extract the common logic, also added ThreadLocal[Array[Byte]]

@liuzqt
Copy link
Contributor Author

liuzqt commented Nov 9, 2022

Tried to reduce the large result test to ~2.1GB, as well as local-cluster approach but neither works....Since I've verified that test on my local, so I removed the test

@mridulm
Copy link
Contributor

mridulm commented Nov 11, 2022

Ideally, we would want to make sure there are no regressions as the codebase evolves - having a unittest to capture the case would help for that.
Does bringing up a local cluster with a higher memory limit help to test this ?

@Ngone51
Copy link
Member

Ngone51 commented Nov 11, 2022

Should the PR title be changed to something like "Remove the limitation of a single task result must fit in 2GB"?

@liuzqt
Copy link
Contributor Author

liuzqt commented Nov 11, 2022

@mridulm I've tried local-cluster[1,1,3072] but doesn't help. Is there any way to increase the JVM mem in github action job?

@liuzqt liuzqt changed the title [SPARK-40622][SQL][CORE]Result of a single task in collect() must fit in 2GB [SPARK-40622][SQL][CORE]Remove the limitation that single task result must fit in 2GB Nov 11, 2022
Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for working on it!

@mridulm
Copy link
Contributor

mridulm commented Nov 14, 2022

+CC @Yikun, @HyukjinKwon - any suggestions for @liuzqt's query above ? Thanks

@Yikun
Copy link
Member

Yikun commented Nov 14, 2022

The 2GB should not the limited of github action, there only are a 7GB limited for total memory.

I'm not sure adding some jvm mem opts by setting sbt opt could help or not.

@liuzqt
Copy link
Contributor Author

liuzqt commented Nov 15, 2022

Increasing jvm mem to 6G will lead to TPCDS test get killed https://github.com/liuzqt/spark/actions/runs/3464533302/jobs/5786183673

let me try 5GB...

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't think we can have a test with 2GB because GA is already using a lot of memory in fact. One way would be to add a test, and mark it as ignore so other people can verify that manually in the future.

Another way is to skip the tests in GitHub Actions enviornment (e.g., assume(!sys.env.contains("GITHUB_ACTIONS"))).

The change looks very nice to have BTW. Thanks for fixing this.

@mridulm
Copy link
Contributor

mridulm commented Nov 15, 2022

Looks like TPCDS is still dying ...
Can we try @HyukjinKwon's suggestion @liuzqt - it will ensure we are testing this during local builds and release votes, while not causing GA to fail.
TPCDSQuerySuite already uses this for not executing q72

@asfgit asfgit closed this in c23245d Nov 16, 2022
@mridulm
Copy link
Contributor

mridulm commented Nov 16, 2022

Merged to master.
Thanks for fixing this @liuzqt !
Thanks for the reviews @Ngone51, @sadikovi, @jiangxb1987 :-)
And thanks for help with GA @HyukjinKwon and @Yikun !

test("collect data with single partition larger than 2GB bytes array limit") {
// This test requires large memory and leads to OOM in Github Action so we skip it. Developer
// should verify it in local build.
assume(!sys.env.contains("GITHUB_ACTIONS"))
Copy link
Contributor

@LuciferYang LuciferYang Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... I test this suite with Java 8/11/17 on Linux/MacOS On Apple Silicon with following commands:

  • Maven:
build/mvn clean install -DskipTests -pl sql/core -am
build/mvn clean test -pl sql/core -Dtest=none -DwildcardSuites=org.apache.spark.sql.DatasetLargeResultCollectingSuite

and

dev/change-scala-version.sh 2.13 
build/mvn clean install -DskipTests -pl sql/core -am -Pscala-2.13
build/mvn clean test -pl sql/core -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.sql.DatasetLargeResultCollectingSuite
  • SBT:
build/sbt clean "sql/testOnly org.apache.spark.sql.DatasetLargeResultCollectingSuite"
dev/change-scala-version.sh 2.13 
build/sbt clean "sql/testOnly org.apache.spark.sql.DatasetLargeResultCollectingSuite" -Pscala-2.13

All test failed with java.lang.OutOfMemoryError: Java heap space as follows:

10:19:56.910 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1(SerializerHelper.scala:40)
        at org.apache.spark.serializer.SerializerHelper$.$anonfun$serializeToChunkedBuffer$1$adapted(SerializerHelper.scala:40)
        at org.apache.spark.serializer.SerializerHelper$$$Lambda$2321/1995130077.apply(Unknown Source)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1853)
        at java.io.ObjectOutputStream.write(ObjectOutputStream.java:709)
        at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1(Utils.scala:271)
        at org.apache.spark.util.Utils$.$anonfun$writeByteBuffer$1$adapted(Utils.scala:271)
        at org.apache.spark.util.Utils$$$Lambda$2324/69671223.apply(Unknown Source)
        at org.apache.spark.util.Utils$.writeByteBufferImpl(Utils.scala:249)
        at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:271)
        at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2(ChunkedByteBuffer.scala:103)
        at org.apache.spark.util.io.ChunkedByteBuffer.$anonfun$writeExternal$2$adapted(ChunkedByteBuffer.scala:103)
        at org.apache.spark.util.io.ChunkedByteBuffer$$Lambda$2323/1073743200.apply(Unknown Source)
        at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1328)
        at org.apache.spark.util.io.ChunkedByteBuffer.writeExternal(ChunkedByteBuffer.scala:103)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.SerializerHelper$.serializeToChunkedBuffer(SerializerHelper.scala:42)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:599)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liuzqt Could you tell me how to run the local test successfully with Spark default Java options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon @mridulm @jiangxb1987 @sadikovi @Ngone51 I failed to run this case locally with Spark default Java options. Could you run successfully? Or should we ignore this case by default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah .. this is tricky.. Let's just probably mark this test as ignore for now then.. with adding some comments like some custom memory configurations would have to be applied ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#38704 ignore this case as default

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks man

beliefer pushed a commit to beliefer/spark that referenced this pull request Nov 24, 2022
…t must fit in 2GB

### What changes were proposed in this pull request?

Single task result must fit in 2GB, because we're using byte array or `ByteBuffer`(which is backed by byte array as well), and thus has a limit of 2GB(java array size limit on `byte[]`).

This PR is trying to fix this by replacing byte array with `ChunkedByteBuffer`.

### Why are the changes needed?

To overcome the 2GB limit for single task.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes apache#38064 from liuzqt/SPARK-40622.

Authored-by: Ziqi Liu <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…t must fit in 2GB

### What changes were proposed in this pull request?

Single task result must fit in 2GB, because we're using byte array or `ByteBuffer`(which is backed by byte array as well), and thus has a limit of 2GB(java array size limit on `byte[]`).

This PR is trying to fix this by replacing byte array with `ChunkedByteBuffer`.

### Why are the changes needed?

To overcome the 2GB limit for single task.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes apache#38064 from liuzqt/SPARK-40622.

Authored-by: Ziqi Liu <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 15, 2022
…t must fit in 2GB

### What changes were proposed in this pull request?

Single task result must fit in 2GB, because we're using byte array or `ByteBuffer`(which is backed by byte array as well), and thus has a limit of 2GB(java array size limit on `byte[]`).

This PR is trying to fix this by replacing byte array with `ChunkedByteBuffer`.

### Why are the changes needed?

To overcome the 2GB limit for single task.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes apache#38064 from liuzqt/SPARK-40622.

Authored-by: Ziqi Liu <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…t must fit in 2GB

### What changes were proposed in this pull request?

Single task result must fit in 2GB, because we're using byte array or `ByteBuffer`(which is backed by byte array as well), and thus has a limit of 2GB(java array size limit on `byte[]`).

This PR is trying to fix this by replacing byte array with `ChunkedByteBuffer`.

### Why are the changes needed?

To overcome the 2GB limit for single task.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes apache#38064 from liuzqt/SPARK-40622.

Authored-by: Ziqi Liu <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants