Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,23 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
IntBuffer sums = sumsBytes.asIntBuffer();
sums.position(offset / FSInputChecker.CHECKSUM_SIZE);
ByteBuffer current = data.duplicate();
int numChunks = data.remaining() / bytesPerSum;
int numFullChunks = data.remaining() / bytesPerSum;
boolean partialChunk = (data.remaining() % bytesPerSum != 0);
int totalChunks = numFullChunks;
if (partialChunk) {
totalChunks++;
}
CRC32 crc = new CRC32();
// check each chunk to ensure they match
for(int c = 0; c < numChunks; ++c) {
for(int c = 0; c < totalChunks; ++c) {
// set the buffer position and the limit
current.limit((c + 1) * bytesPerSum);
if (c == numFullChunks) {
int lastIncompleteChunk = data.remaining() % bytesPerSum;
current.limit((c * bytesPerSum) + lastIncompleteChunk);
} else {
current.limit((c + 1) * bytesPerSum);
}

current.position(c * bytesPerSum);
// compute the crc
crc.reset();
Expand All @@ -396,11 +407,33 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
return data;
}

/**
* Validates range parameters.
* In case of CheckSum FS, we already have calculated
* fileLength so failing fast here.
* @param ranges requested ranges.
* @param fileLen length of file.
* @throws EOFException end of file exception.
*/
private void validateRangeRequest(List<? extends FileRange> ranges, long fileLen) throws EOFException {
for (FileRange range : ranges) {
VectoredReadUtils.validateRangeRequest(range);
if (range.getOffset() + range.getLength() > fileLen) {
LOG.warn("Requested range [{}, {}) is beyond EOF for path {}",
range.getOffset(), range.getLength(), file);
throw new EOFException("Requested range [" + range.getOffset() + ", "
+ range.getLength() + ") is beyond EOF for path " + file);
}
}
}

@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
long length = fs.getFileStatus(file).getLen();
validateRangeRequest(ranges, length);

// If the stream doesn't have checksums, just delegate.
VectoredReadUtils.validateVectoredReadRanges(ranges);
if (sums == null) {
datas.readVectored(ranges, allocate);
return;
Expand All @@ -410,15 +443,18 @@ public void readVectored(List<? extends FileRange> ranges,
List<CombinedFileRange> dataRanges =
VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum,
minSeek, maxReadSizeForVectorReads());
// While merging the ranges above, they are rounded up based on the value of bytesPerSum
// which leads to some ranges crossing the EOF thus they need to be fixed else it will
// cause EOFException during actual reads.
for (CombinedFileRange range : dataRanges) {
if (range.getOffset() + range.getLength() > length) {
range.setLength((int) (length - range.getOffset()));
}
}
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate);
// Data read is correct. I have verified content of dataRanges.
// There is some bug below here as test (testVectoredReadMultipleRanges)
// is failing, should be
// somewhere while slicing the merged data into smaller user ranges.
// Spend some time figuring out but it is a complex code.
for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public void testConsecutiveRanges() throws Exception {
}
}

/**
* Test to validate EOF ranges. Default implementation fails with EOFException
* while reading the ranges. Some implementation like s3, checksum fs fail fast
* as they already have the file length calculated.
*/
@Test
public void testEOFRanges() throws Exception {
FileSystem fs = getFileSystem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.contract.localfs;

import java.io.EOFException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -30,6 +31,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
Expand All @@ -52,9 +54,31 @@ protected AbstractFSContract createContract(Configuration conf) {

@Test
public void testChecksumValidationDuringVectoredRead() throws Exception {
Path testPath = path("big_range_checksum");
Path testPath = path("big_range_checksum_file");
List<FileRange> someRandomRanges = new ArrayList<>();
someRandomRanges.add(FileRange.createFileRange(10, 1024));
someRandomRanges.add(FileRange.createFileRange(1025, 1024));
validateCheckReadException(testPath, DATASET_LEN, someRandomRanges);
}


/**
* Test for file size less than checksum chunk size.
* {@code ChecksumFileSystem#bytesPerChecksum}.
*/
@Test
public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception {
Path testPath = path("big_range_checksum_file");
final int length = 471;
List<FileRange> smallFileRanges = new ArrayList<>();
smallFileRanges.add(FileRange.createFileRange(10, 50));
smallFileRanges.add(FileRange.createFileRange(100, 20));
validateCheckReadException(testPath, length, smallFileRanges);
}

private void validateCheckReadException(Path testPath, int length, List<FileRange> ranges) throws Exception {
LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
try (FSDataOutputStream out = localFs.create(testPath, true)){
out.write(datasetCorrect);
}
Expand All @@ -63,24 +87,55 @@ public void testChecksumValidationDuringVectoredRead() throws Exception {
.describedAs("Checksum file should be present")
.isTrue();
CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build();
List<FileRange> someRandomRanges = new ArrayList<>();
someRandomRanges.add(FileRange.createFileRange(10, 1024));
someRandomRanges.add(FileRange.createFileRange(1025, 1024));
try (FSDataInputStream in = fis.get()){
in.readVectored(someRandomRanges, getAllocate());
validateVectoredReadResult(someRandomRanges, datasetCorrect);
in.readVectored(ranges, getAllocate());
validateVectoredReadResult(ranges, datasetCorrect);
}
final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64);
final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64);
try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){
out.write(datasetCorrupted);
}
CompletableFuture<FSDataInputStream> fisN = localFs.openFile(testPath).build();
try (FSDataInputStream in = fisN.get()){
in.readVectored(someRandomRanges, getAllocate());
in.readVectored(ranges, getAllocate());
// Expect checksum exception when data is updated directly through
// raw local fs instance.
intercept(ChecksumException.class,
() -> validateVectoredReadResult(someRandomRanges, datasetCorrupted));
() -> validateVectoredReadResult(ranges, datasetCorrupted));
}
}
@Test
public void tesChecksumVectoredReadBoundaries() throws Exception {
Path testPath = path("boundary_range_checksum_file");
final int length = 1071;
LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
try (FSDataOutputStream out = localFs.create(testPath, true)){
out.write(datasetCorrect);
}
Path checksumPath = localFs.getChecksumFile(testPath);
Assertions.assertThat(localFs.exists(checksumPath))
.describedAs("Checksum file should be present")
.isTrue();
CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build();
List<FileRange> smallRange = new ArrayList<>();
smallRange.add(FileRange.createFileRange(1000, 71));
try ( FSDataInputStream in = fis.get()) {
in.readVectored(smallRange, getAllocate());
validateVectoredReadResult(smallRange, datasetCorrect);
}
}


/**
* Overriding in checksum fs as vectored read api fails fast
* in case of EOF requested range.
*/
@Override
public void testEOFRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100));
verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class);
}
}