Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1606,8 +1606,11 @@ public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
CompletableFuture<T> result = new CompletableFuture<>();
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result, () -> {
LOG.debug("Starting submitted operation in {}", auditSpan.getSpanId());
try (AuditSpan span = auditSpan.activate()) {
return operation.apply();
} finally {
LOG.debug("Completed submitted operation in {}", auditSpan.getSpanId());
}
}));
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.VectoredReadUtils;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
Expand All @@ -65,7 +67,6 @@
import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges;
import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

Expand Down Expand Up @@ -97,10 +98,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";

/**
* size of a buffer to create when draining the stream.
*/
private static final int DRAIN_BUFFER_SIZE = 16384;
/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
Expand Down Expand Up @@ -242,6 +239,15 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) {
streamStatistics.inputPolicySet(inputPolicy.ordinal());
}

/**
* Get the current input policy.
* @return input policy.
*/
@VisibleForTesting
public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}

/**
* Opens up the stream at specified target position and for given length.
*
Expand Down Expand Up @@ -604,7 +610,7 @@ public synchronized void close() throws IOException {
try {
stopVectoredIOOperations.set(true);
// close or abort the stream; blocking
awaitFuture(closeStream("close() operation", false, true));
closeStream("close() operation", false, true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this is blocking here's no need for that await future, but i think i will reinstate it for safety

// end the client+audit span.
client.close();
// this is actually a no-op
Expand Down Expand Up @@ -664,18 +670,25 @@ private CompletableFuture<Boolean> closeStream(
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
CompletableFuture<Boolean> operation;
SDKStreamDrainer drainer = new SDKStreamDrainer(
uri,
object,
wrappedStream,
shouldAbort,
(int) remaining,
streamStatistics,
reason);

if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
// don't bother with async io.
operation = CompletableFuture.completedFuture(
drain(shouldAbort, reason, remaining, object, wrappedStream));
// don't bother with async IO if the caller plans to wait for
// the result, there's an abort (which is fast), or
// there is not much data to read.
operation = CompletableFuture.completedFuture(drainer.apply());

} else {
LOG.debug("initiating asynchronous drain of {} bytes", remaining);
// schedule an async drain/abort with references to the fields so they
// can be reused
operation = client.submit(
() -> drain(false, reason, remaining, object, wrappedStream));
// schedule an async drain/abort
operation = client.submit(drainer);
}

// either the stream is closed in the blocking call or the async call is
Expand All @@ -685,117 +698,6 @@ private CompletableFuture<Boolean> closeStream(
return operation;
}

/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object; needed to avoid GC issues.
* @param inner stream to close.
* @return was the stream aborted?
*/
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final S3ObjectInputStream inner) {

try {
return invokeTrackingDuration(
streamStatistics.initiateInnerStreamClose(shouldAbort),
() -> drainOrAbortHttpStream(
shouldAbort,
reason,
remaining,
requestObject,
inner));
} catch (IOException e) {
// this is only here because invokeTrackingDuration() has it in its
// signature
return shouldAbort;
}
}

/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* This does not set the {@link #closed} flag.
*
* A reference to the stream is passed in so that the instance
* {@link #wrappedStream} field can be reused as soon as this
* method is submitted;
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object; needed to avoid GC issues.
* @param inner stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final S3ObjectInputStream inner) {
// force a use of the request object so IDEs don't warn of
// lack of use.
requireNonNull(requestObject);

if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
// so, while cleaner, can be pathological on a multi-GB object

// explicitly drain the stream
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inner.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);

// now close it
inner.close();
// this MUST come after the close, so that if the IO operations fail
// and an abort is triggered, the initial attempt's statistics
// aren't collected.
streamStatistics.streamClose(false, drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream {}", uri);
try {
inner.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
}
streamStatistics.streamClose(true, remaining);
}
LOG.debug("Stream {} {}: {}; remaining={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}

/**
* Forcibly reset the stream, by aborting the connection. The next
* {@code read()} operation will trigger the opening of a new HTTPS
Expand Down Expand Up @@ -1080,8 +982,8 @@ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQ
int drainBytes = 0;
int readCount;
while (drainBytes < drainQuantity) {
if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) {
byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE];
if (drainBytes + InternalConstants.DRAIN_BUFFER_SIZE <= drainQuantity) {
byte[] drainBuffer = new byte[InternalConstants.DRAIN_BUFFER_SIZE];
readCount = objectContent.read(drainBuffer);
} else {
byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
Expand Down Expand Up @@ -1345,6 +1247,11 @@ public synchronized void unbuffer() {
closeStream("unbuffer()", false, false);
} finally {
streamStatistics.unbuffered();
if (inputPolicy.isAdaptive()) {
S3AInputPolicy policy = S3AInputPolicy.Random;
LOG.debug("Switching to seek policy {} after unbuffer() invoked", policy);
setInputPolicy(policy);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ private static InterruptedIOException translateInterruptedException(
} else {
String name = innerCause.getClass().getName();
if (name.endsWith(".ConnectTimeoutException")
|| name.endsWith(".ConnectionPoolTimeoutException")
|| name.endsWith("$ConnectTimeoutException")) {
// TCP connection http timeout from the shaded or unshaded filenames
// com.amazonaws.thirdparty.apache.http.conn.ConnectTimeoutException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
"Total count of bytes read from an input stream",
TYPE_COUNTER),
STREAM_READ_UNBUFFERED(
StreamStatisticNames.STREAM_READ_UNBUFFERED,
"Total count of input stream unbuffering operations",
TYPE_COUNTER),
STREAM_READ_BLOCKS_IN_FILE_CACHE(
StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
"Gauge of blocks in disk cache",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public final class InternalConstants {
*/
public static final boolean DELETE_CONSIDERED_IDEMPOTENT = true;

/**
* size of a buffer to create when draining the stream.
*/
public static final int DRAIN_BUFFER_SIZE = 16384;

private InternalConstants() {
}

Expand Down Expand Up @@ -97,6 +102,7 @@ private InternalConstants() {

static {
Set<String> keys = Stream.of(
Constants.ASYNC_DRAIN_THRESHOLD,
Constants.INPUT_FADVISE,
Constants.READAHEAD_RANGE)
.collect(Collectors.toSet());
Expand Down
Loading