@@ -82,7 +82,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
8282
8383 /**
8484 * Creates an input stream to read data from this ChunkedByteBuffer.
85- *
85+ *
8686 * @param dispose if true, [[dispose() ]] will be called at the end of the stream
8787 * in order to close any memory-mapped files which back this buffer.
8888 */
@@ -149,10 +149,37 @@ private class ChunkedByteBufferInputStream(
149149 }
150150 }
151151
152- // TODO(josh): implement
153- // override def read(b: Array[Byte]): Int = super.read(b)
154- // override def read(b: Array[Byte], off: Int, len: Int): Int = super.read(b, off, len)
155- // override def skip(n: Long): Long = super.skip(n)
152+ override def read (dest : Array [Byte ], offset : Int , length : Int ): Int = {
153+ if (currentChunk != null && ! currentChunk.hasRemaining && chunks.hasNext) {
154+ StorageUtils .dispose(currentChunk)
155+ currentChunk = chunks.next()
156+ }
157+ if (currentChunk != null && currentChunk.hasRemaining) {
158+ val amountToGet = math.min(currentChunk.remaining(), length)
159+ currentChunk.get(dest, offset, amountToGet)
160+ amountToGet
161+ } else {
162+ close()
163+ - 1
164+ }
165+ }
166+
167+ override def skip (bytes : Long ): Long = {
168+ if (currentChunk != null ) {
169+ val amountToSkip = math.min(bytes, currentChunk.remaining).toInt
170+ currentChunk.position(currentChunk.position + amountToSkip)
171+ if (currentChunk.remaining() == 0 ) {
172+ if (chunks.hasNext) {
173+ currentChunk = chunks.next()
174+ } else {
175+ close()
176+ }
177+ }
178+ amountToSkip
179+ } else {
180+ 0L
181+ }
182+ }
156183
157184 override def close (): Unit = {
158185 if (currentChunk != null ) {
0 commit comments