Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.StringJoiner;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -181,4 +182,11 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate, release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -438,6 +439,13 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, (b) -> { });
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// If the stream doesn't have checksums, just delegate.
if (sums == null) {
Expand All @@ -462,8 +470,8 @@ public void readVectored(List<? extends FileRange> ranges,
}
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate);
sums.readVectored(checksumRanges, allocate, release);
datas.readVectored(dataRanges, allocate, release);
for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -306,4 +307,11 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate, release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -136,4 +137,30 @@ default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
VectoredReadUtils.readVectored(this, ranges, allocate);
}

/**
* Variant of {@link #readVectored(List, IntFunction)} where a release() function
* may be invoked if problems surface during reads -this method is called to
* try to return any allocated buffer which has not been read yet.
* Buffers which have successfully been read and returned to the caller do not
* get released: this is for failures only.
* <p>
* The default implementation calls readVectored/2 so as to ensure that
* if an existing stream implementation does not implement this method
* all is good.
* <p>
* Implementations SHOULD override this method if they can release buffers as
* part of their error handling.
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove "the"

* @param release the function to release a ByteBuffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove "the"

* @throws IOException any IOE.
* @throws IllegalArgumentException if the any of ranges are invalid, or they overlap.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems like some words are out of order. Maybe "if any of the ranges..."?

*/
default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws IOException {
readVectored(ranges, allocate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -68,6 +69,7 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
Expand Down Expand Up @@ -319,74 +321,125 @@ AsynchronousFileChannel getAsyncChannel() throws IOException {
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: sortedRanges) {
// Set up all of the futures, so that the caller can await on
// their competion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "completion"

for (FileRange range: sortedRanges) {
validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
AsynchronousFileChannel channel = getAsyncChannel();
ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
for(int i = 0; i < sortedRanges.size(); ++i) {
FileRange range = sortedRanges.get(i);
buffers[i] = allocate.apply(range.getLength());
channel.read(buffers[i], range.getOffset(), i, asyncHandler);
}
} catch (IOException ioe) {
LOG.debug("Exception occurred during vectored read ", ioe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this was unnecessary. Any failure will automatically be caught in failed() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no IOE to be raised any more

for(FileRange range: sortedRanges) {
range.getData().completeExceptionally(ioe);
}
}
// Initiate the asynchronous reads.
new AsyncHandler(getAsyncChannel(),
sortedRanges,
allocate,
release)
.initiateRead();
}
}

/**
* A CompletionHandler that implements readFully and translates back
* into the form of CompletionHandler that our users expect.
* <p>
* All reads are started in {@link #initiateRead()};
* the handler then receives callbacks on success
* {@link #completed(Integer, Integer)}, and on failure
* by {@link #failed(Throwable, Integer)}.
* These are mapped to the specific range in the read, and its
* outcome updated.
*/
static class AsyncHandler implements CompletionHandler<Integer, Integer> {
private static class AsyncHandler implements CompletionHandler<Integer, Integer> {
/** File channel to read from. */
private final AsynchronousFileChannel channel;

/** Ranges to fetch. */
private final List<? extends FileRange> ranges;

/** Allocate operation. */
private final IntFunction<ByteBuffer> allocate;

/** Release operation. */
private final Consumer<ByteBuffer> release;

/** Buffers being read. */
private final ByteBuffer[] buffers;

AsyncHandler(AsynchronousFileChannel channel,
List<? extends FileRange> ranges,
ByteBuffer[] buffers) {
AsyncHandler(
final AsynchronousFileChannel channel,
final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) {
this.channel = channel;
this.ranges = ranges;
this.buffers = buffers;
this.allocate = allocate;
this.release = release;
this.buffers = new ByteBuffer[ranges.size()];
}

/**
* Initiate the read operation.
* Allocate all buffers, queue the read into the channel,
* providing this object as the handler.
*/
private void initiateRead() {
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
buffers[i] = allocate.apply(range.getLength());
channel.read(buffers[i], range.getOffset(), i, this);
}
}

/**
* Successful read, though for an EOF the number of bytes may be -1.
* That is mapped to a {@link #failed(Throwable, Integer)} outcome.
* @param result The bytes read or -1
* @param rangeIndex range index within the range list.
*/

@Override
public void completed(Integer result, Integer r) {
FileRange range = ranges.get(r);
ByteBuffer buffer = buffers[r];
public void completed(Integer result, Integer rangeIndex) {
FileRange range = ranges.get(rangeIndex);
ByteBuffer buffer = buffers[rangeIndex];
if (result == -1) {
failed(new EOFException("Read past End of File"), r);
// no data was read back.
failed(new EOFException("Read past End of File"), rangeIndex);
} else {
if (buffer.remaining() > 0) {
// issue a read for the rest of the buffer
// QQ: What if this fails? It has the same handler.
channel.read(buffer, range.getOffset() + buffer.position(), r, this);
channel.read(buffer, range.getOffset() + buffer.position(), rangeIndex, this);
} else {
// QQ: Why is this required? I think because we don't want the
// user to read data beyond limit.
// Flip the buffer and declare success.
buffer.flip();
range.getData().complete(buffer);
}
}
}

/**
* The read of the range failed.
* <p>
* Release the buffer supplied for this range.
* @param exc exception.
* @param rangeIndex range index within the range list.
*/
@Override
public void failed(Throwable exc, Integer r) {
LOG.debug("Failed while reading range {} ", r, exc);
ranges.get(r).getData().completeExceptionally(exc);
public void failed(Throwable exc, Integer rangeIndex) {
LOG.debug("Failed while reading range {} ", rangeIndex, exc);
// release the buffer
release.accept(buffers[rangeIndex]);
// report the failure.
ranges.get(rangeIndex).getData().completeExceptionally(exc);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.slf4j.Logger;
Expand All @@ -52,6 +53,15 @@
private static final Logger LOG =
LoggerFactory.getLogger(VectoredReadUtils.class);

/**
* This releaser just logs at debug that the buffer
* was released.
*/
public static final Consumer<ByteBuffer> LOG_BYTE_BUFFER_RELEASED =
(buffer) -> {
LOG.debug("release buffer {}", buffer.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we parameterize and add. releasing buffer for range[x-y] ?

};

/**
* Validate a single range.
* @param range range to validate.
Expand Down Expand Up @@ -98,8 +108,25 @@
public static void readVectored(PositionedReadable stream,
List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws EOFException {
readVectored(stream, ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

/**
* Variant of {@link #readVectored(PositionedReadable, List, IntFunction)}
* where a release() function is invoked if problems surface during reads.
* @param ranges the byte ranges to read
* @param allocate the function to allocate ByteBuffer
* @param release the function to release a ByteBuffer.
* @throws IllegalArgumentException if the any of ranges are invalid, or they overlap.
* @throws EOFException the range offset is negative
*/
public static void readVectored(PositionedReadable stream,

Check failure on line 123 in hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java#L123

javadoc: warning: no @param for stream
List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws EOFException {

for (FileRange range: validateAndSortRanges(ranges, Optional.empty())) {
range.setData(readRangeFrom(stream, range, allocate));
range.setData(readRangeFrom(stream, range, allocate, release));
}
}

Expand All @@ -118,11 +145,31 @@
PositionedReadable stream,
FileRange range,
IntFunction<ByteBuffer> allocate) throws EOFException {
return readRangeFrom(stream, range, allocate, LOG_BYTE_BUFFER_RELEASED);
}

/**
* Synchronously reads a range from the stream dealing with the combinations
* of ByteBuffers buffers and PositionedReadable streams.
* @param stream the stream to read from
* @param range the range to read
* @param allocate the function to allocate ByteBuffers
* @param release the function to release a ByteBuffer.
* @return the CompletableFuture that contains the read data or an exception.
* @throws IllegalArgumentException the range is invalid other than by offset or being null.
* @throws EOFException the range offset is negative
* @throws NullPointerException if the range is null.
*/
public static CompletableFuture<ByteBuffer> readRangeFrom(
PositionedReadable stream,
FileRange range,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws EOFException {

validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
ByteBuffer buffer = allocate.apply(range.getLength());
try {
ByteBuffer buffer = allocate.apply(range.getLength());
if (stream instanceof ByteBufferPositionedReadable) {
LOG.debug("ByteBufferPositionedReadable.readFully of {}", range);
((ByteBufferPositionedReadable) stream).readFully(range.getOffset(),
Expand All @@ -136,6 +183,7 @@
result.complete(buffer);
} catch (IOException ioe) {
LOG.debug("Failed to read {}", range, ioe);
release.accept(buffer);
result.completeExceptionally(ioe);
}
return result;
Expand All @@ -147,6 +195,8 @@
* @param range file range
* @param buffer destination buffer
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/
private static void readNonByteBufferPositionedReadable(
PositionedReadable stream,
Expand Down
Loading