5151import org .apache .hadoop .fs .impl .CombinedFileRange ;
5252import org .apache .hadoop .fs .VectoredReadUtils ;
5353import org .apache .hadoop .fs .s3a .impl .ChangeTracker ;
54+ import org .apache .hadoop .fs .s3a .impl .InternalConstants ;
55+ import org .apache .hadoop .fs .s3a .impl .SDKStreamDrainer ;
5456import org .apache .hadoop .fs .s3a .statistics .S3AInputStreamStatistics ;
5557import org .apache .hadoop .fs .statistics .DurationTracker ;
5658import org .apache .hadoop .fs .statistics .IOStatistics ;
6567import static org .apache .hadoop .fs .VectoredReadUtils .mergeSortedRanges ;
6668import static org .apache .hadoop .fs .VectoredReadUtils .validateNonOverlappingAndReturnSortedRanges ;
6769import static org .apache .hadoop .fs .s3a .Invoker .onceTrackingDuration ;
68- import static org .apache .hadoop .fs .statistics .impl .IOStatisticsBinding .invokeTrackingDuration ;
6970import static org .apache .hadoop .util .StringUtils .toLowerCase ;
7071import static org .apache .hadoop .util .functional .FutureIO .awaitFuture ;
7172
@@ -97,10 +98,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
9798 public static final String OPERATION_OPEN = "open" ;
9899 public static final String OPERATION_REOPEN = "re-open" ;
99100
100- /**
101- * size of a buffer to create when draining the stream.
102- */
103- private static final int DRAIN_BUFFER_SIZE = 16384 ;
104101 /**
105102 * This is the maximum temporary buffer size we use while
106103 * populating the data in direct byte buffers during a vectored IO
@@ -242,6 +239,15 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) {
242239 streamStatistics .inputPolicySet (inputPolicy .ordinal ());
243240 }
244241
242+ /**
243+ * Get the current input policy.
244+ * @return input policy.
245+ */
246+ @ VisibleForTesting
247+ public S3AInputPolicy getInputPolicy () {
248+ return inputPolicy ;
249+ }
250+
245251 /**
246252 * Opens up the stream at specified target position and for given length.
247253 *
@@ -604,7 +610,7 @@ public synchronized void close() throws IOException {
604610 try {
605611 stopVectoredIOOperations .set (true );
606612 // close or abort the stream; blocking
607- awaitFuture ( closeStream ("close() operation" , false , true ) );
613+ closeStream ("close() operation" , false , true );
608614 // end the client+audit span.
609615 client .close ();
610616 // this is actually a no-op
@@ -664,18 +670,25 @@ private CompletableFuture<Boolean> closeStream(
664670 forceAbort ? "abort" : "soft" );
665671 boolean shouldAbort = forceAbort || remaining > readahead ;
666672 CompletableFuture <Boolean > operation ;
673+ SDKStreamDrainer drainer = new SDKStreamDrainer (
674+ uri ,
675+ object ,
676+ wrappedStream ,
677+ shouldAbort ,
678+ (int ) remaining ,
679+ streamStatistics ,
680+ reason );
667681
668682 if (blocking || shouldAbort || remaining <= asyncDrainThreshold ) {
669- // don't bother with async io.
670- operation = CompletableFuture .completedFuture (
671- drain (shouldAbort , reason , remaining , object , wrappedStream ));
683+ // don't bother with async IO if the caller plans to wait for
684+ // the result, there's an abort (which is fast), or
685+ // there is not much data to read.
686+ operation = CompletableFuture .completedFuture (drainer .apply ());
672687
673688 } else {
674689 LOG .debug ("initiating asynchronous drain of {} bytes" , remaining );
675- // schedule an async drain/abort with references to the fields so they
676- // can be reused
677- operation = client .submit (
678- () -> drain (false , reason , remaining , object , wrappedStream ));
690+ // schedule an async drain/abort
691+ operation = client .submit (drainer );
679692 }
680693
681694 // either the stream is closed in the blocking call or the async call is
@@ -685,117 +698,6 @@ private CompletableFuture<Boolean> closeStream(
685698 return operation ;
686699 }
687700
688- /**
689- * drain the stream. This method is intended to be
690- * used directly or asynchronously, and measures the
691- * duration of the operation in the stream statistics.
692- * @param shouldAbort force an abort; used if explicitly requested.
693- * @param reason reason for stream being closed; used in messages
694- * @param remaining remaining bytes
695- * @param requestObject http request object; needed to avoid GC issues.
696- * @param inner stream to close.
697- * @return was the stream aborted?
698- */
699- private boolean drain (
700- final boolean shouldAbort ,
701- final String reason ,
702- final long remaining ,
703- final S3Object requestObject ,
704- final S3ObjectInputStream inner ) {
705-
706- try {
707- return invokeTrackingDuration (
708- streamStatistics .initiateInnerStreamClose (shouldAbort ),
709- () -> drainOrAbortHttpStream (
710- shouldAbort ,
711- reason ,
712- remaining ,
713- requestObject ,
714- inner ));
715- } catch (IOException e ) {
716- // this is only here because invokeTrackingDuration() has it in its
717- // signature
718- return shouldAbort ;
719- }
720- }
721-
722- /**
723- * Drain or abort the inner stream.
724- * Exceptions are swallowed.
725- * If a close() is attempted and fails, the operation escalates to
726- * an abort.
727- *
728- * This does not set the {@link #closed} flag.
729- *
730- * A reference to the stream is passed in so that the instance
731- * {@link #wrappedStream} field can be reused as soon as this
732- * method is submitted;
733- * @param shouldAbort force an abort; used if explicitly requested.
734- * @param reason reason for stream being closed; used in messages
735- * @param remaining remaining bytes
736- * @param requestObject http request object; needed to avoid GC issues.
737- * @param inner stream to close.
738- * @return was the stream aborted?
739- */
740- private boolean drainOrAbortHttpStream (
741- boolean shouldAbort ,
742- final String reason ,
743- final long remaining ,
744- final S3Object requestObject ,
745- final S3ObjectInputStream inner ) {
746- // force a use of the request object so IDEs don't warn of
747- // lack of use.
748- requireNonNull (requestObject );
749-
750- if (!shouldAbort ) {
751- try {
752- // clean close. This will read to the end of the stream,
753- // so, while cleaner, can be pathological on a multi-GB object
754-
755- // explicitly drain the stream
756- long drained = 0 ;
757- byte [] buffer = new byte [DRAIN_BUFFER_SIZE ];
758- while (true ) {
759- final int count = inner .read (buffer );
760- if (count < 0 ) {
761- // no more data is left
762- break ;
763- }
764- drained += count ;
765- }
766- LOG .debug ("Drained stream of {} bytes" , drained );
767-
768- // now close it
769- inner .close ();
770- // this MUST come after the close, so that if the IO operations fail
771- // and an abort is triggered, the initial attempt's statistics
772- // aren't collected.
773- streamStatistics .streamClose (false , drained );
774- } catch (Exception e ) {
775- // exception escalates to an abort
776- LOG .debug ("When closing {} stream for {}, will abort the stream" ,
777- uri , reason , e );
778- shouldAbort = true ;
779- }
780- }
781- if (shouldAbort ) {
782- // Abort, rather than just close, the underlying stream. Otherwise, the
783- // remaining object payload is read from S3 while closing the stream.
784- LOG .debug ("Aborting stream {}" , uri );
785- try {
786- inner .abort ();
787- } catch (Exception e ) {
788- LOG .warn ("When aborting {} stream after failing to close it for {}" ,
789- uri , reason , e );
790- }
791- streamStatistics .streamClose (true , remaining );
792- }
793- LOG .debug ("Stream {} {}: {}; remaining={}" ,
794- uri , (shouldAbort ? "aborted" : "closed" ), reason ,
795- remaining );
796- return shouldAbort ;
797- }
798-
799701 /**
800702 * Forcibly reset the stream, by aborting the connection. The next
801703 * {@code read()} operation will trigger the opening of a new HTTPS
@@ -1080,8 +982,8 @@ private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQ
1080982 int drainBytes = 0 ;
1081983 int readCount ;
1082984 while (drainBytes < drainQuantity ) {
1083- if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity ) {
1084- byte [] drainBuffer = new byte [DRAIN_BUFFER_SIZE ];
985+ if (drainBytes + InternalConstants . DRAIN_BUFFER_SIZE <= drainQuantity ) {
986+ byte [] drainBuffer = new byte [InternalConstants . DRAIN_BUFFER_SIZE ];
1085987 readCount = objectContent .read (drainBuffer );
1086988 } else {
1087989 byte [] drainBuffer = new byte [(int ) (drainQuantity - drainBytes )];
@@ -1345,6 +1247,11 @@ public synchronized void unbuffer() {
13451247 closeStream ("unbuffer()" , false , false );
13461248 } finally {
13471249 streamStatistics .unbuffered ();
1250+ if (inputPolicy .isAdaptive ()) {
1251+ S3AInputPolicy policy = S3AInputPolicy .Random ;
1252+ LOG .debug ("Switching to seek policy {} after unbuffer() invoked" , policy );
1253+ setInputPolicy (policy );
1254+ }
13481255 }
13491256 }
13501257
0 commit comments