7777import static org .apache .hadoop .fs .VectoredReadUtils .mergeSortedRanges ;
7878import static org .apache .hadoop .fs .VectoredReadUtils .validateAndSortRanges ;
7979import static org .apache .hadoop .fs .s3a .Invoker .onceTrackingDuration ;
80+ import static org .apache .hadoop .fs .s3a .impl .ErrorTranslation .shouldInputStreamBeAborted ;
81+ import static org .apache .hadoop .fs .s3a .impl .SDKStreamDrainer .abortSdkStream ;
8082import static org .apache .hadoop .util .StringUtils .toLowerCase ;
8183import static org .apache .hadoop .util .functional .FutureIO .awaitFuture ;
8284
@@ -571,11 +573,8 @@ public synchronized int read() throws IOException {
571573 }
572574 try {
573575 b = wrappedStream .read ();
574- } catch (HttpChannelEOFException | SocketTimeoutException e ) {
575- onReadFailure (e , true );
576- throw e ;
577576 } catch (IOException e ) {
578- onReadFailure (e , false );
577+ onReadFailure (e , shouldInputStreamBeAborted ( e ) );
579578 throw e ;
580579 }
581580 return b ;
@@ -718,7 +717,7 @@ public synchronized void close() throws IOException {
718717 if (!closed ) {
719718 closed = true ;
720719 try {
721- stopVectoredIOOperations . set ( true );
720+ stopVectorOperations ( );
722721 // close or abort the stream; blocking
723722 closeStream ("close() operation" , false , true );
724723 // end the client+audit span.
@@ -994,9 +993,7 @@ public void readVectored(final List<? extends FileRange> ranges,
994993 final Consumer <ByteBuffer > release ) throws IOException {
995994 LOG .debug ("Starting vectored read on path {} for ranges {} " , pathStr , ranges );
996995 checkNotClosed ();
997- if (stopVectoredIOOperations .getAndSet (false )) {
998- LOG .debug ("Reinstating vectored read operation for path {} " , pathStr );
999- }
996+ maybeStartVectorOperations ();
1000997 // fail fast on parameters which would otherwise only be checked
1001998 // in threads and/or in failures.
1002999 requireNonNull (allocate , "Null allocator" );
@@ -1021,8 +1018,7 @@ public void readVectored(final List<? extends FileRange> ranges,
10211018 streamStatistics .readVectoredOperationStarted (sortedRanges .size (), sortedRanges .size ());
10221019 for (FileRange range : sortedRanges ) {
10231020 // submit the read operation to the threadpool
1024- // TODO: track wait time.
1025- boundedThreadPool .submit (() -> readSingleRange (range , bufferPool ));
1021+ boundedThreadPool .submit (() -> readSingleRangeWithRetries (range , bufferPool ));
10261022 }
10271023 } else {
10281024 LOG .debug ("Trying to merge the ranges as they are not disjoint" );
@@ -1041,6 +1037,24 @@ public void readVectored(final List<? extends FileRange> ranges,
10411037 " on path {} for ranges {} " , pathStr , ranges );
10421038 }
10431039
1040+ /**
1041+ * Start/restart vector operations if not active.
1042+ * In particular, after an unbuffer(), this performs any
1043+ * initialization required.
1044+ */
1045+ private void maybeStartVectorOperations () {
1046+ if (stopVectoredIOOperations .getAndSet (false )) {
1047+ LOG .debug ("Reinstating vectored read operation for path {} " , pathStr );
1048+ }
1049+ }
1050+
1051+ /**
1052+ * Stop vector operations.
1053+ */
1054+ private void stopVectorOperations () {
1055+ stopVectoredIOOperations .set (true );
1056+ }
1057+
10441058 /**
10451059 * Read the data from S3 for the bigger combined file range and update all the
10461060 * underlying ranges.
@@ -1056,11 +1070,13 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
10561070 LOG .debug ("Start reading {} from path {} " , combinedFileRange , pathStr );
10571071 ResponseInputStream <GetObjectResponse > rangeContent = null ;
10581072 try {
1059- // issue the GET request; this retries internally.
1073+ // issue the GET request; this retries GET but not reads internally.
10601074 rangeContent = getS3Object ("readCombinedFileRange" ,
10611075 combinedFileRange .getOffset (),
10621076 combinedFileRange .getLength (),
10631077 true );
1078+ // GET has succeeded, make sure request is good to continue
1079+ checkIfVectoredIOStopped ();
10641080 } catch (IOException ex ) {
10651081 // any exception here means that repeated HEAD requests have failed;
10661082 // consider the request unrecoverable.
@@ -1070,6 +1086,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
10701086 .forEach (f -> f .completeExceptionally (ex ));
10711087 return ;
10721088 }
1089+
10731090 // at this point there is a stream to read from, which
10741091 // MUST be closed in the finally block.
10751092 try {
@@ -1086,12 +1103,23 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
10861103 .filter (f -> !f .getData ().isDone ())
10871104 .collect (Collectors .toList ());
10881105
1089- // Attempt to recover from the failure by reading each range individually.
1090- // Suboptimal, but pragmatic.
1106+ LOG .debug ("There are {} remaining vector ranges to retrieve" , unreadRanges .size ());
1107+
1108+ // Attempt to recover from the failure by reading each range individually
1109+ // within the current thread.
1110+ // If a single read is unrecoverable, all subsequent range reads are failed
1111+ // with the same exception.
1112+ // this is to process unrecoverable failures faster.
1113+ IOException lastIOE = null ;
10911114 for (FileRange child : unreadRanges ) {
1092- readSingleRange (child , bufferPool );
1115+ if (lastIOE == null ) {
1116+ // all good so far: request the next range
1117+ lastIOE = readSingleRangeWithRetries (child , bufferPool );
1118+ } else {
1119+ // a predecessor failed, do not attempt to recover.
1120+ child .getData ().completeExceptionally (lastIOE );
1121+ }
10931122 }
1094-
10951123 } finally {
10961124 IOUtils .cleanupWithLogger (LOG , rangeContent );
10971125 }
@@ -1112,7 +1140,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
11121140 */
11131141 @ Retries .OnceTranslated
11141142 private void populateChildBuffers (CombinedFileRange combinedFileRange ,
1115- InputStream objectContent ,
1143+ ResponseInputStream < GetObjectResponse > objectContent ,
11161144 ByteBufferPool bufferPool ) throws IOException {
11171145 // If the combined file range just contains a single child
11181146 // range, we only have to fill that one child buffer else
@@ -1142,6 +1170,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange,
11421170 // work out how much
11431171 long drainQuantity = child .getOffset () - position ;
11441172 // and drain it.
1173+ // this will raise EOFException if a -1 was returned.
11451174 drainUnnecessaryData (objectContent , position , drainQuantity );
11461175 }
11471176 }
@@ -1151,6 +1180,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange,
11511180 } catch (IOException e ) {
11521181 // release the buffer
11531182 bufferPool .putBuffer (buffer );
1183+ // rethrow
11541184 throw e ;
11551185 }
11561186 child .getData ().complete (buffer );
@@ -1170,7 +1200,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange,
11701200 */
11711201 @ Retries .OnceTranslated
11721202 private void drainUnnecessaryData (
1173- final InputStream objectContent ,
1203+ final ResponseInputStream < GetObjectResponse > objectContent ,
11741204 final long position ,
11751205 long drainQuantity ) throws IOException {
11761206
@@ -1194,11 +1224,18 @@ private void drainUnnecessaryData(
11941224 "End of stream reached draining data between ranges; expected %,d bytes;"
11951225 + " only drained %,d bytes before -1 returned (position=%,d)" ,
11961226 drainQuantity , drainBytes , position + drainBytes );
1227+ LOG .debug (s );
11971228 throw new EOFException (s );
11981229 }
11991230 drainBytes += readCount ;
12001231 remaining -= readCount ;
12011232 }
1233+ } catch (IOException ex ) {
1234+ if (shouldInputStreamBeAborted (ex )) {
1235+ // abort the stream if the exception indicates this is needed.
1236+ abortSdkStream (uri , objectContent , streamStatistics , "drain failure" );
1237+ }
1238+ throw ex ;
12021239 } finally {
12031240 streamStatistics .readVectoredBytesDiscarded (drainBytes );
12041241 LOG .debug ("{} bytes drained from stream " , drainBytes );
@@ -1207,11 +1244,49 @@ private void drainUnnecessaryData(
12071244
12081245 /**
12091246 * Read data from S3 for this range and populate a buffer.
1247+ * The GET request and single range reads are retried.
1248+ * Any IOException which is propagated by the retry logic is
1249+ * attached to the range as an exceptional failure.
12101250 * @param range range of data to read.
12111251 * @param bufferPool buffer allocator.
1252+ * @return any IOE which resulted in the read being unsuccessful; null on success.
12121253 */
1213- @ Retries .RetryTranslated ("GET is retried; reads are not" )
1214- private void readSingleRange (FileRange range , ByteBufferPool bufferPool ) {
1254+ @ Retries .RetryTranslated
1255+ private IOException readSingleRangeWithRetries (
1256+ FileRange range ,
1257+ ByteBufferPool bufferPool ) {
1258+ try {
1259+ context .getReadInvoker ().retry ("vector read" , uri , true , () ->
1260+ readSingleRange (range , bufferPool ));
1261+ return null ;
1262+ } catch (IOException ex ) {
1263+ // the retry mechanism has stopped retrying, so mark the request as a failure.
1264+ range .getData ().completeExceptionally (ex );
1265+ return ex ;
1266+ }
1267+ }
1268+
1269+ /**
1270+ * Read data from S3 for this range and populate a buffer.
1271+ * If the full read was succesful, the range's future is declared
1272+ * complete.
1273+ * <p>
1274+ * If an exception is raised,
1275+ * <ol>
1276+ * <li>The buffer is returned to the pool.</li>
1277+ * <li>The HTTP connection will be aborted if deemed to have failed.</li>
1278+ * <li>The relevant statistics will be updated.</li>
1279+ * <li>The exception is rethrown.</li>
1280+ * </ol>
1281+ * This is to allow the operation to be invoked in a retry() operation.
1282+ * @param range range of data to read.
1283+ * @param bufferPool buffer allocator.
1284+ * @throws IOException failure to GET or read the data.
1285+ */
1286+ @ Retries .OnceTranslated
1287+ private void readSingleRange (FileRange range ,
1288+ ByteBufferPool bufferPool ) throws IOException {
1289+
12151290 LOG .debug ("Start reading {} from {} " , range , pathStr );
12161291 if (range .getLength () == 0 ) {
12171292 ByteBuffer buffer = bufferPool .getBuffer (false , range .getLength ());
@@ -1220,26 +1295,40 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool) {
12201295 range .getData ().complete (buffer );
12211296 return ;
12221297 }
1298+ // buffer which will be fetched from the buffer pool and populated,
1299+ // on successful reads this will be returned in the response.
1300+ // on failures it must be returned to the pool.
12231301 ByteBuffer buffer = null ;
1302+ // the contents of the ranged object request.
12241303 ResponseInputStream <GetObjectResponse > objectRange = null ;
12251304 try {
12261305 long position = range .getOffset ();
12271306 int length = range .getLength ();
12281307 // a GET request, which has risk of failing if the file is gone, changed etc.
1229- objectRange = getS3Object ("readSingleRange" , position , length , true );
1308+ objectRange = getS3Object ("readSingleRange" , position , length , false );
1309+
1310+ // GET has succeeded, make sure request is good to continue
1311+ checkIfVectoredIOStopped ();
1312+
12301313 buffer = bufferPool .getBuffer (false , range .getLength ());
12311314
12321315 // read in the data and declare this range successfully read.
12331316 populateBuffer (range , buffer , objectRange );
12341317 range .getData ().complete (buffer );
12351318 LOG .debug ("Finished reading range {} from path {}" , range , pathStr );
12361319 } catch (IOException ex ) {
1320+ // any failure.
1321+ // log, the error, return the buffer to the pool, and report a failure.
12371322 LOG .debug ("Exception while reading a range {} from path {}" , range , pathStr , ex );
12381323 if (buffer != null ) {
12391324 // return any buffer to the pool
12401325 bufferPool .putBuffer (buffer );
12411326 }
1242- range .getData ().completeExceptionally (ex );
1327+ if (shouldInputStreamBeAborted (ex )) {
1328+ // abort the stream if the exception indicates this is needed.
1329+ abortSdkStream (uri , objectRange , streamStatistics , "read failure" );
1330+ }
1331+ throw ex ;
12431332 } finally {
12441333 IOUtils .cleanupWithLogger (LOG , objectRange );
12451334 }
@@ -1252,7 +1341,7 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool) {
12521341 * @param range vector range to populate.
12531342 * @param buffer buffer to fill.
12541343 * @param objectContent result retrieved from S3 store.
1255- * @throws IOException any IOE.
1344+ * @throws IOException any IOE raised reading the input stream .
12561345 * @throws EOFException if EOF if read() call returns -1
12571346 * @throws InterruptedIOException if vectored IO operation is stopped.
12581347 */
@@ -1306,12 +1395,12 @@ private void readByteArray(InputStream objectContent,
13061395 length - readBytes );
13071396 LOG .debug ("read {} bytes from stream" , readBytesCurr );
13081397 if (readBytesCurr < 0 ) {
1309- // TODO: abort the stream.
1310- throw new EOFException (
1311- String . format ( "HTTP stream closed before all bytes were read."
1312- + " Expected %,d bytes but only read %,d bytes. Current position %,d"
1313- + " (%s)" ,
1314- length , readBytes , position , range ) );
1398+ final String message = String . format ( "HTTP stream closed before all bytes were read."
1399+ + " Expected %,d bytes but only read %,d bytes. Current position %,d"
1400+ + " (%s)" ,
1401+ length , readBytes , position , range );
1402+ LOG . warn ( message );
1403+ throw new EOFException ( message );
13151404 }
13161405 readBytes += readBytesCurr ;
13171406 position += readBytesCurr ;
@@ -1322,13 +1411,15 @@ private void readByteArray(InputStream objectContent,
13221411 }
13231412
13241413 /**
1325- * Read data from S3 with retries for the GET request
1326- * This also handles if file has been changed while the
1414+ * Read data from S3 with retries for the GET request, as part of a vector IO
1415+ * operation.
1416+ * <p>
1417+ * This also handles the file being changed while the
13271418 * http call is getting executed. If the file has been
13281419 * changed RemoteFileChangedException is thrown.
13291420 * <p>
1330- * Also checks if the vectored io operation has been stopped before and after
1331- * the http get request such that we don't waste time populating the buffers.
1421+ * It checks if the vectored io operation has been stopped before
1422+ * the http GET request such that we don't waste time populating the buffers.
13321423 * @param operationName name of the operation for which get object on S3 is called.
13331424 * @param position position of the object to be read from S3.
13341425 * @param length length from position of the object to be read from S3.
@@ -1373,20 +1464,19 @@ private ResponseInputStream<GetObjectResponse> getS3Object(String operationName,
13731464 }
13741465 changeTracker .processResponse (objectRange .response (), operationName ,
13751466 position );
1376- checkIfVectoredIOStopped ();
13771467 return objectRange ;
13781468 }
13791469
13801470 /**
1381- * Check if vectored io operation has been stooped . This happens
1382- * when the stream is closed or unbuffer is called.
1471+ * Check if vectored io operation has been stopped . This happens
1472+ * when the stream is closed or unbuffer() was called during the read .
13831473 * @throws InterruptedIOException throw InterruptedIOException such
13841474 * that all running vectored io is
13851475 * terminated thus releasing resources.
13861476 */
13871477 private void checkIfVectoredIOStopped () throws InterruptedIOException {
13881478 if (stopVectoredIOOperations .get ()) {
1389- throw new InterruptedIOException ("Stream closed or unbuffer is called" );
1479+ throw new InterruptedIOException ("Stream closed or unbuffer() was called during the read " );
13901480 }
13911481 }
13921482
@@ -1484,7 +1574,7 @@ public static long validateReadahead(@Nullable Long readahead) {
14841574 @ Override
14851575 public synchronized void unbuffer () {
14861576 try {
1487- stopVectoredIOOperations . set ( true );
1577+ stopVectorOperations ( );
14881578 closeStream ("unbuffer()" , false , false );
14891579 } finally {
14901580 streamStatistics .unbuffered ();
0 commit comments