Skip to content

Commit d675ce8

Browse files
Add support for readFully at the S3SeekableInputStream level
1 parent b89d41e commit d675ce8

5 files changed

Lines changed: 155 additions & 0 deletions

File tree

input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStream.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package software.amazon.s3.analyticsaccelerator;
1717

18+
import java.io.EOFException;
1819
import java.io.IOException;
1920
import java.nio.ByteBuffer;
2021
import java.util.List;
@@ -234,6 +235,41 @@ public void readVectored(
234235
logicalIO.readVectored(ranges, allocate);
235236
}
236237

238+
/**
239+
* Fill the provided buffer with the contents of the input source starting at {@code position} for
240+
* the given {@code offset} and {@code length}.
241+
*
242+
* @param position start position of the read
243+
* @param buffer target buffer to copy data
244+
* @param offset offset in the buffer to copy the data
245+
* @param length size of the read
246+
* @throws IOException if an I/O error occurs
247+
*/
248+
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
249+
throwIfClosed("cannot read from closed stream");
250+
validatePositionedReadArgs(position, buffer, offset, length);
251+
252+
if (length == 0) {
253+
return;
254+
}
255+
256+
this.telemetry.measureVerbose(
257+
() ->
258+
Operation.builder()
259+
.name(OPERATION_READ)
260+
.attribute(StreamAttributes.uri(this.s3URI))
261+
.attribute(StreamAttributes.etag(this.logicalIO.metadata().getEtag()))
262+
.attribute(StreamAttributes.range(position, position + length - 1))
263+
.build(),
264+
() -> {
265+
int bytesRead = this.logicalIO.read(buffer, offset, length, position);
266+
if (bytesRead < length) {
267+
throw new EOFException(
268+
"Reached the end of stream with " + (length - bytesRead) + " bytes left to read");
269+
}
270+
});
271+
}
272+
237273
/**
238274
* Releases all resources associated with the {@link S3SeekableInputStream}.
239275
*

input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/SeekableInputStream.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,19 @@ public abstract void readVectored(
8080
Consumer<ByteBuffer> release)
8181
throws IOException;
8282

83+
/**
84+
* Fill the provided buffer with the contents of the input source starting at {@code position} for
85+
* the given {@code offset} and {@code length}.
86+
*
87+
* @param position start position of the read
88+
* @param buffer target buffer to copy data
89+
* @param offset offset in the buffer to copy the data
90+
* @param length size of the read
91+
* @throws IOException if an I/O error occurs
92+
*/
93+
public abstract void readFully(long position, byte[] buffer, int offset, int length)
94+
throws IOException;
95+
8396
/**
8497
* Validates the arguments for a read operation. This method is available to use in all subclasses
8598
* to ensure consistency.

input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/model/InMemorySeekableStream.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package software.amazon.s3.analyticsaccelerator.model;
1717

18+
import java.io.IOException;
1819
import java.nio.ByteBuffer;
1920
import java.util.List;
2021
import java.util.function.Consumer;
@@ -84,6 +85,28 @@ public void readVectored(
8485
}
8586
}
8687

88+
@Override
89+
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
90+
// Save current position of stream
91+
long prevPosition = this.position;
92+
if (position >= this.contentLength) {
93+
throw new IOException("Position is beyond end of stream");
94+
}
95+
96+
data.position((int) position);
97+
int bytesAvailable = this.contentLength - (int) position;
98+
int bytesToRead = Math.min(length, bytesAvailable);
99+
data.get(buffer, offset, bytesToRead);
100+
if (bytesToRead < length) {
101+
throw new IOException(
102+
"Reached the end of stream with " + (length - bytesToRead) + " bytes left to read");
103+
}
104+
105+
// Restore original position
106+
this.position = prevPosition;
107+
data.position((int) this.position);
108+
}
109+
87110
@Override
88111
public int read() {
89112
if (this.position >= this.contentLength) {

input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/property/InMemoryS3SeekableInputStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ public void readVectored(
131131
this.delegate.readVectored(ranges, allocate, release);
132132
}
133133

134+
@Override
135+
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
136+
this.delegate.readFully(position, buffer, offset, length);
137+
}
138+
134139
@Override
135140
public int read() throws IOException {
136141
return this.delegate.read();

input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,84 @@ public void testInsufficientBuffer() throws IOException {
448448
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
449449
SpotBugsLambdaWorkaround.assertReadResult(
450450
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
451+
assertThrows(
452+
IndexOutOfBoundsException.class, () -> seekableInputStream.readFully(0, new byte[0], 0, 8));
453+
}
454+
455+
@Test
456+
void testReadFullyWithInvalidArgument() throws IOException {
457+
// Given: seekable stream
458+
try (S3SeekableInputStream stream = getTestStream()) {
459+
// When & Then: reading with invalid arguments, exception is thrown
460+
// -1 is invalid position
461+
assertThrows(IllegalArgumentException.class, () -> stream.readFully(-1, new byte[10], 0, 5));
462+
// -1 is invalid length
463+
assertThrows(IllegalArgumentException.class, () -> stream.readFully(0, new byte[10], 0, -1));
464+
// Requesting more data than byte buffer size
465+
assertThrows(IndexOutOfBoundsException.class, () -> stream.readFully(0, new byte[5], 0, 10));
466+
}
467+
}
468+
469+
@Test
470+
void testReadFullyHappyCase() throws IOException {
471+
// Given: seekable stream
472+
try (S3SeekableInputStream stream = getTestStream()) {
473+
// When: reading 5 bytes from position 3
474+
byte[] buf = new byte[5];
475+
stream.readFully(3, buf, 0, 5);
476+
477+
// Then: buffer contains the expected 5 bytes from position 3
478+
byte[] expected = TEST_DATA.substring(3, 8).getBytes(StandardCharsets.UTF_8);
479+
assertArrayEquals(expected, buf);
480+
481+
// Position should remain unchanged after readFully
482+
assertEquals(0, stream.getPos());
483+
}
484+
}
485+
486+
@Test
487+
void testReadFullyDoesNotAlterPosition() throws IOException {
488+
// Given: seekable stream with data "test-data12345678910"
489+
try (S3SeekableInputStream stream = getTestStream()) {
490+
// When:
491+
// 1) Reading first 5 bytes from position 0 (should be "test-")
492+
// 2) Reading 5 bytes from position 10 using readFully (should be "23456")
493+
// 3) Reading next 5 bytes from current position (should be "data1")
494+
byte[] one = new byte[5];
495+
byte[] two = new byte[5];
496+
byte[] three = new byte[5];
497+
498+
int numBytesRead1 = stream.read(one, 0, one.length);
499+
stream.readFully(10, two, 0, two.length);
500+
int numBytesRead3 = stream.read(three, 0, three.length);
501+
502+
// Then: readFully did not alter the position and reads #1 and #3 return subsequent bytes
503+
// First read should return 5 bytes
504+
assertEquals(5, numBytesRead1);
505+
// Third read should also return 5 bytes, continuing from where first read left off
506+
assertEquals(5, numBytesRead3);
507+
508+
// Verify the actual content of each buffer
509+
assertEquals("test-", new String(one, StandardCharsets.UTF_8));
510+
assertEquals("data1", new String(three, StandardCharsets.UTF_8));
511+
assertEquals("23456", new String(two, StandardCharsets.UTF_8));
512+
513+
// Verify the stream position is at 10 (5 + 5) after all reads
514+
assertEquals(10, stream.getPos());
515+
}
516+
}
517+
518+
@Test
519+
public void testReadFullyOnClosedStream() throws IOException {
520+
S3SeekableInputStream seekableInputStream = getTestStream();
521+
seekableInputStream.close();
522+
assertThrows(IOException.class, () -> seekableInputStream.readFully(0, new byte[8], 0, 8));
523+
}
524+
525+
@Test
526+
public void testZeroLengthReadFully() throws IOException {
527+
S3SeekableInputStream seekableInputStream = getTestStream();
528+
assertDoesNotThrow(() -> seekableInputStream.readFully(0, new byte[0], 0, 0));
451529
}
452530

453531
private S3SeekableInputStream getTestStream() {

0 commit comments

Comments
 (0)