diff --git a/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java b/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java index 4e251a1c2901..412d612c7f1d 100644 --- a/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java +++ b/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java @@ -17,6 +17,7 @@ package org.apache.spark.io; import org.apache.spark.storage.StorageUtils; +import org.apache.spark.unsafe.Platform; import java.io.File; import java.io.IOException; @@ -47,7 +48,7 @@ public final class NioBufferedFileInputStream extends InputStream { private final FileChannel fileChannel; public NioBufferedFileInputStream(File file, int bufferSizeInBytes) throws IOException { - byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes); + byteBuffer = Platform.allocateDirectBuffer(bufferSizeInBytes); fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); byteBuffer.flip(); this.cleanable = CLEANER.register(this, new ResourceCleaner(fileChannel, byteBuffer)); diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 1498b224b0c9..3e57094b36a7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -35,6 +35,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils} import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBuffer @@ -324,7 +325,7 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: private var _transferred = 0L - private val buffer = ByteBuffer.allocateDirect(64 * 1024) + private val buffer = Platform.allocateDirectBuffer(64 * 1024) buffer.flip() override def count(): Long = blockSize diff --git a/core/src/main/scala/org/apache/spark/util/DirectByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/DirectByteBufferOutputStream.scala index a4145bb36acc..1683e892511f 100644 --- a/core/src/main/scala/org/apache/spark/util/DirectByteBufferOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/DirectByteBufferOutputStream.scala @@ -57,7 +57,7 @@ private[spark] class DirectByteBufferOutputStream(capacity: Int) extends OutputS if (newCapacity < minCapacity) newCapacity = minCapacity val oldBuffer = buffer oldBuffer.flip() - val newBuffer = ByteBuffer.allocateDirect(newCapacity) + val newBuffer = Platform.allocateDirectBuffer(newCapacity) newBuffer.put(oldBuffer) StorageUtils.dispose(oldBuffer) buffer = newBuffer