Skip to content

Commit 2ed6c3e

Browse files
cxzl25dongjoon-hyun
authored andcommitted
[SPARK-49509][CORE] Use Platform.allocateDirectBuffer instead of ByteBuffer.allocateDirect
### What changes were proposed in this pull request? This PR aims to use `Platform.allocateDirectBuffer` instead of `ByteBuffer.allocateDirect`. ### Why are the changes needed? apache#47733 (review) Allocating off-heap memory should use the `allocateDirectBuffer` API provided `by Platform`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47987 from cxzl25/SPARK-49509. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent c5293ec commit 2ed6c3e

3 files changed

Lines changed: 5 additions & 3 deletions

File tree

core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.io;
1818

1919
import org.apache.spark.storage.StorageUtils;
20+
import org.apache.spark.unsafe.Platform;
2021

2122
import java.io.File;
2223
import java.io.IOException;
@@ -47,7 +48,7 @@ public final class NioBufferedFileInputStream extends InputStream {
4748
private final FileChannel fileChannel;
4849

4950
public NioBufferedFileInputStream(File file, int bufferSizeInBytes) throws IOException {
50-
byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
51+
byteBuffer = Platform.allocateDirectBuffer(bufferSizeInBytes);
5152
fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
5253
byteBuffer.flip();
5354
this.cleanable = CLEANER.register(this, new ResourceCleaner(fileChannel, byteBuffer));

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.internal.LogKeys._
3535
import org.apache.spark.network.buffer.ManagedBuffer
3636
import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
3737
import org.apache.spark.security.CryptoStreamUtils
38+
import org.apache.spark.unsafe.Platform
3839
import org.apache.spark.unsafe.array.ByteArrayMethods
3940
import org.apache.spark.util.Utils
4041
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -324,7 +325,7 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize:
324325

325326
private var _transferred = 0L
326327

327-
private val buffer = ByteBuffer.allocateDirect(64 * 1024)
328+
private val buffer = Platform.allocateDirectBuffer(64 * 1024)
328329
buffer.flip()
329330

330331
override def count(): Long = blockSize

core/src/main/scala/org/apache/spark/util/DirectByteBufferOutputStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[spark] class DirectByteBufferOutputStream(capacity: Int) extends OutputS
5757
if (newCapacity < minCapacity) newCapacity = minCapacity
5858
val oldBuffer = buffer
5959
oldBuffer.flip()
60-
val newBuffer = ByteBuffer.allocateDirect(newCapacity)
60+
val newBuffer = Platform.allocateDirectBuffer(newCapacity)
6161
newBuffer.put(oldBuffer)
6262
StorageUtils.dispose(oldBuffer)
6363
buffer = newBuffer

0 commit comments

Comments
 (0)