-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-3546] InputStream of ManagedBuffer is not closed and causes running out of file descriptor #2408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3546] InputStream of ManagedBuffer is not closed and causes running out of file descriptor #2408
Changes from 1 commit
bf29d4a
b37231a
5f63f67
074781d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package org.apache.spark.network | |
|
|
||
| import java.io.{FileInputStream, RandomAccessFile, File, InputStream} | ||
| import java.nio.ByteBuffer | ||
| import java.nio.channels.FileChannel | ||
| import java.nio.channels.FileChannel.MapMode | ||
|
|
||
| import com.google.common.io.ByteStreams | ||
|
|
@@ -66,8 +67,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt | |
| override def size: Long = length | ||
|
|
||
| override def nioByteBuffer(): ByteBuffer = { | ||
| val channel = new RandomAccessFile(file, "r").getChannel | ||
| channel.map(MapMode.READ_ONLY, offset, length) | ||
| var channel: FileChannel = null | ||
| try { | ||
| channel = new RandomAccessFile(file, "r").getChannel | ||
| channel.map(MapMode.READ_ONLY, offset, length) | ||
| } finally { | ||
| channel.close() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would throw an NPE if an error occurred in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Originally I was going to check channel is null or not, but I forgot at previous PR. |
||
| } | ||
| } | ||
|
|
||
| override def inputStream(): InputStream = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,14 @@ | |
|
|
||
| package org.apache.spark.storage | ||
|
|
||
| import java.io.InputStream | ||
| import java.util.concurrent.LinkedBlockingQueue | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.collection.mutable.HashSet | ||
| import scala.collection.mutable.Queue | ||
|
|
||
| import org.apache.spark.{TaskContext, Logging, SparkException} | ||
| import org.apache.spark.{TaskContext, Logging} | ||
| import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService} | ||
| import org.apache.spark.serializer.Serializer | ||
| import org.apache.spark.util.Utils | ||
|
|
@@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator( | |
| blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds, | ||
| new BlockFetchingListener { | ||
| override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { | ||
| results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), | ||
| () => serializer.newInstance().deserializeStream( | ||
| blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator | ||
| )) | ||
| shuffleMetrics.remoteBytesRead += data.size | ||
| shuffleMetrics.remoteBlocksFetched += 1 | ||
| logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) | ||
| var is: InputStream = null | ||
| try { | ||
| is = data.inputStream() | ||
| results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), | ||
| () => serializer.newInstance().deserializeStream( | ||
| blockManager.wrapForCompression(BlockId(blockId), is)).asIterator | ||
| )) | ||
| shuffleMetrics.remoteBytesRead += data.size | ||
|
||
| shuffleMetrics.remoteBlocksFetched += 1 | ||
| logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) | ||
| } finally { | ||
| if (is != null) { | ||
| is.close() | ||
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| override def onBlockFetchFailure(e: Throwable): Unit = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part looks good to me