Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
Expand Down Expand Up @@ -320,10 +321,10 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {

// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
Optional.empty());
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: sortedRanges) {
validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,13 @@ support -and fallback everywhere else.

The restriction "no overlapping ranges" was only initially enforced in
the S3A connector, which would raise `UnsupportedOperationException`.
Adding the range check as a precondition for all implementations guarantees
consistent behavior everywhere.
Adding the range check as a precondition for all implementations (Raw Local
being an exception) guarantees consistent behavior everywhere.
The reason Raw Local doesn't have this precondition is ChecksumFileSystem
creates the chunked ranges based on the checksum chunk size and then calls
readVectored on Raw Local which may lead to overlapping ranges in some cases.
For details see [HADOOP-19291](https://issues.apache.org/jira/browse/HADOOP-19291)

For reliable use with older hadoop releases with the API: sort the list of ranges
and check for overlaps before calling `readVectored()`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,23 +270,42 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
}

/**
* Vectored IO doesn't support overlapping ranges.
* Most file systems won't support overlapping ranges.
* Currently, only Raw Local supports it.
*/
@Test
public void testOverlappingRanges() throws Exception {
verifyExceptionalVectoredRead(
getSampleOverlappingRanges(),
IllegalArgumentException.class);
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
verifyExceptionalVectoredRead(
getSampleOverlappingRanges(),
IllegalArgumentException.class);
} else {
try (FSDataInputStream in = openVectorFile()) {
List<FileRange> fileRanges = getSampleOverlappingRanges();
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
}

/**
* Same ranges are special case of overlapping.
*/
@Test
public void testSameRanges() throws Exception {
verifyExceptionalVectoredRead(
getSampleSameRanges(),
IllegalArgumentException.class);
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
verifyExceptionalVectoredRead(
getSampleSameRanges(),
IllegalArgumentException.class);
} else {
try (FSDataInputStream in = openVectorFile()) {
List<FileRange> fileRanges = getSampleSameRanges();
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
}

/**
Expand Down Expand Up @@ -329,10 +348,9 @@ public void testSomeRandomNonOverlappingRanges() throws Exception {
public void testConsecutiveRanges() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
final int offset = 500;
final int length = 100;
final int length = 2011;
range(fileRanges, offset, length);
range(fileRanges, 600, 200);
range(fileRanges, 800, 100);
range(fileRanges, offset + length, length);
try (FSDataInputStream in = openVectorFile()) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,6 @@ public interface ContractOptions {
* Does vector read check file length on open rather than in the read call?
*/
String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check";

String VECTOR_IO_OVERLAPPING_RANGES = "vector-io-overlapping-ranges";
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,9 @@
<value>true</value>
</property>

<property>
<name>fs.contract.vector-io-overlapping-ranges</name>
<value>true</value>
</property>

</configuration>