Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ private void checkIfVectoredIOStopped() throws InterruptedIOException {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() {
return streamStatistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
Expand Down Expand Up @@ -56,6 +57,21 @@ public class S3APrefetchingInputStream
*/
private S3ARemoteInputStream inputStream;

/**
* To be only used by synchronized getPos().
*/
private long lastReadCurrentPos = 0;

/**
* To be only used by getIOStatistics().
*/
private IOStatistics ioStatistics = null;

/**
* To be only used by getS3AStreamStatistics().
*/
private S3AInputStreamStatistics inputStreamStatistics = null;

/**
* Initializes a new instance of the {@code S3APrefetchingInputStream} class.
*
Expand Down Expand Up @@ -115,14 +131,20 @@ public synchronized int available() throws IOException {
}

/**
* Gets the current position.
* Gets the current position. If the underlying S3 input stream is closed,
* it returns last read current position from the underlying steam. If the
* current position was never read and the underlying input stream is closed,
* this would return 0.
*
* @return the current position.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized long getPos() throws IOException {
return isClosed() ? 0 : inputStream.getPos();
if (!isClosed()) {
lastReadCurrentPos = inputStream.getPos();
}
return lastReadCurrentPos;
}

/**
Expand Down Expand Up @@ -215,11 +237,12 @@ public boolean hasCapability(String capability) {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() {
if (isClosed()) {
return null;
if (!isClosed()) {
inputStreamStatistics = inputStream.getS3AStreamStatistics();
}
return inputStream.getS3AStreamStatistics();
return inputStreamStatistics;
}

/**
Expand All @@ -229,10 +252,10 @@ public S3AInputStreamStatistics getS3AStreamStatistics() {
*/
@Override
public IOStatistics getIOStatistics() {
if (isClosed()) {
return null;
if (!isClosed()) {
ioStatistics = inputStream.getIOStatistics();
}
return inputStream.getIOStatistics();
return ioStatistics;
}

protected boolean isClosed() {
Expand All @@ -249,7 +272,6 @@ protected void throwIfClosed() throws IOException {

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
throwIfClosed();
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;

import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
Expand Down Expand Up @@ -240,4 +242,56 @@ public void testRandomReadSmallFile() throws Throwable {
}
}

@Test
public void testStatusProbesAfterClosingStream() throws Throwable {
describe("When the underlying input stream is closed, the prefetch input stream"
+ " should still support some status probes");

byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
Path smallFile = path("testStatusProbesAfterClosingStream");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just use path() and one is built up from the method name. this is better as it avoids cut and paste bugs and if a build is parameterized, the parameterized string is used in the path (though tests fail if that string isn't a valid path any more)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah my bad, should have used methodPath(). You meant methodPath() only, correct?

ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);

FSDataInputStream in = getFileSystem().open(smallFile);

byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);

long pos = in.getPos();
IOStatistics ioStats = in.getIOStatistics();
S3AInputStreamStatistics inputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();

assertNotNull("Prefetching input IO stats should not be null", ioStats);
assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
pos);

in.close();

// status probes after closing the input stream
long newPos = in.getPos();
IOStatistics newIoStats = in.getIOStatistics();
S3AInputStreamStatistics newInputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();

assertNotNull("Prefetching input IO stats should not be null", newIoStats);
assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
newPos);

// compare status probes after closing of the stream with status probes done before
// closing the stream
assertEquals("Position retrieved through stream before and after closing should match", pos,
newPos);
assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
newIoStats);
assertEquals("Stream stats retrieved through stream before and after closing should match",
inputStreamStatistics, newInputStreamStatistics);

assertFalse("Not supported with prefetch", in.seekToNewSource(10));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you have the error message include "seekToNewSource()".


}

}