Skip to content

Commit caf5893

Browse files
committed
HADOOP-18410. S3AInputStream.unbuffer() does not release http connections -prefetch changes(#4766)
Changes in HADOOP-18410 which are needed for the S3A prefetching stream; needed as part of the HADOOP-18703 backport Change-Id: Ib403ca793e29a4416e5d892f9081de5832da3b68
1 parent 2c6f01c commit caf5893

2 files changed

Lines changed: 17 additions & 93 deletions

File tree

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java

Lines changed: 15 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import com.amazonaws.services.s3.model.GetObjectRequest;
2929
import com.amazonaws.services.s3.model.S3Object;
30+
import com.amazonaws.services.s3.model.S3ObjectInputStream;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

@@ -36,12 +37,10 @@
3637
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
3738
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
3839
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
40+
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
3941
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
4042
import org.apache.hadoop.fs.statistics.DurationTracker;
4143

42-
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
43-
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
44-
4544
/**
4645
* Encapsulates low level interactions with S3 object on AWS.
4746
*/
@@ -79,7 +78,7 @@ public class S3ARemoteObject {
7978
* Maps a stream returned by openForRead() to the associated S3 object.
8079
* That allows us to close the object when closing the stream.
8180
*/
82-
private Map<InputStream, S3Object> s3Objects;
81+
private final Map<InputStream, S3Object> s3Objects;
8382

8483
/**
8584
* uri of the object being read.
@@ -225,104 +224,27 @@ public InputStream openForRead(long offset, int size) throws IOException {
225224
void close(InputStream inputStream, int numRemainingBytes) {
226225
S3Object obj;
227226
synchronized (s3Objects) {
228-
obj = s3Objects.get(inputStream);
227+
obj = s3Objects.remove(inputStream);
229228
if (obj == null) {
230229
throw new IllegalArgumentException("inputStream not found");
231230
}
232-
s3Objects.remove(inputStream);
233231
}
234-
232+
SDKStreamDrainer drainer = new SDKStreamDrainer(
233+
uri,
234+
obj,
235+
(S3ObjectInputStream)inputStream,
236+
false,
237+
numRemainingBytes,
238+
streamStatistics,
239+
"close() operation");
235240
if (numRemainingBytes <= context.getAsyncDrainThreshold()) {
236241
// don't bother with async io.
237-
drain(false, "close() operation", numRemainingBytes, obj, inputStream);
242+
drainer.apply();
238243
} else {
239244
LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
240-
// schedule an async drain/abort with references to the fields so they
241-
// can be reused
242-
client.submit(
243-
() -> drain(false, "close() operation", numRemainingBytes, obj,
244-
inputStream));
245+
// schedule an async drain/abort
246+
client.submit(drainer);
245247
}
246248
}
247249

248-
/**
249-
* drain the stream. This method is intended to be
250-
* used directly or asynchronously, and measures the
251-
* duration of the operation in the stream statistics.
252-
*
253-
* @param shouldAbort force an abort; used if explicitly requested.
254-
* @param reason reason for stream being closed; used in messages
255-
* @param remaining remaining bytes
256-
* @param requestObject http request object;
257-
* @param inputStream stream to close.
258-
* @return was the stream aborted?
259-
*/
260-
private boolean drain(
261-
final boolean shouldAbort,
262-
final String reason,
263-
final long remaining,
264-
final S3Object requestObject,
265-
final InputStream inputStream) {
266-
267-
try {
268-
return invokeTrackingDuration(
269-
streamStatistics.initiateInnerStreamClose(shouldAbort),
270-
() -> drainOrAbortHttpStream(shouldAbort, reason, remaining,
271-
requestObject, inputStream));
272-
} catch (IOException e) {
273-
// this is only here because invokeTrackingDuration() has it in its
274-
// signature
275-
return shouldAbort;
276-
}
277-
}
278-
279-
/**
280-
* Drain or abort the inner stream.
281-
* Exceptions are swallowed.
282-
* If a close() is attempted and fails, the operation escalates to
283-
* an abort.
284-
*
285-
* @param shouldAbort force an abort; used if explicitly requested.
286-
* @param reason reason for stream being closed; used in messages
287-
* @param remaining remaining bytes
288-
* @param requestObject http request object
289-
* @param inputStream stream to close.
290-
* @return was the stream aborted?
291-
*/
292-
private boolean drainOrAbortHttpStream(
293-
boolean shouldAbort,
294-
final String reason,
295-
final long remaining,
296-
final S3Object requestObject,
297-
final InputStream inputStream) {
298-
299-
if (!shouldAbort && remaining > 0) {
300-
try {
301-
long drained = 0;
302-
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
303-
while (true) {
304-
final int count = inputStream.read(buffer);
305-
if (count < 0) {
306-
// no more data is left
307-
break;
308-
}
309-
drained += count;
310-
}
311-
LOG.debug("Drained stream of {} bytes", drained);
312-
} catch (Exception e) {
313-
// exception escalates to an abort
314-
LOG.debug("When closing {} stream for {}, will abort the stream", uri,
315-
reason, e);
316-
shouldAbort = true;
317-
}
318-
}
319-
cleanupWithLogger(LOG, inputStream);
320-
cleanupWithLogger(LOG, requestObject);
321-
streamStatistics.streamClose(shouldAbort, remaining);
322-
323-
LOG.debug("Stream {} {}: {}; remaining={}", uri,
324-
(shouldAbort ? "aborted" : "closed"), reason,
325-
remaining);
326-
return shouldAbort;
327-
}
328250
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
4646
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
4747
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
48+
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
4849
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
4950
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
5051
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
@@ -102,6 +103,7 @@ public Configuration createConfiguration() {
102103
INPUT_FADVISE,
103104
MAX_ERROR_RETRIES,
104105
MAXIMUM_CONNECTIONS,
106+
PREFETCH_ENABLED_KEY,
105107
READAHEAD_RANGE,
106108
REQUEST_TIMEOUT,
107109
RETRY_LIMIT,

0 commit comments

Comments
 (0)