Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;

import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;

@RunWith(Parameterized.class)
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
Expand All @@ -53,8 +55,6 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
public static final int DATASET_LEN = 64 * 1024;
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);

private final IntFunction<ByteBuffer> allocate;

Expand All @@ -77,8 +77,6 @@ public void setup() throws Exception {
Path path = path(VECTORED_READ_FILE_NAME);
FileSystem fs = getFileSystem();
createFile(fs, path, true, DATASET);
Path bigFile = path(VECTORED_READ_FILE_1MB_NAME);
createFile(fs, bigFile, true, DATASET_MB);
}

@Test
Expand All @@ -99,7 +97,7 @@ public void testVectoredReadMultipleRanges() throws Exception {
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
combinedFuture.get();

validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}

Expand Down Expand Up @@ -132,7 +130,7 @@ public void testDisjointRanges() throws Exception {
fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}

Expand All @@ -149,7 +147,7 @@ public void testAllRangesMergedIntoOne() throws Exception {
fileRanges.add(new FileRangeImpl(8*1024 - 101, 100));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}

Expand All @@ -168,7 +166,7 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
fileRanges.add(new FileRangeImpl(40*1024, 1024));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}

Expand All @@ -184,24 +182,7 @@ public void testSameRanges() throws Exception {
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
}
}

@Test
public void testVectoredRead1MBFile() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(new FileRangeImpl(1293, 25837));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_1MB_NAME))
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, allocate);
ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData());
FileRange resRange = fileRanges.get(0);
assertDatasetEquals((int) resRange.getOffset(), "vecRead",
vecRes, resRange.getLength(), DATASET_MB);
validateVectoredReadResult(fileRanges, DATASET);
}
}

Expand All @@ -215,7 +196,7 @@ public void testOverlappingRanges() throws Exception {
fileRanges.add(new FileRangeImpl(10, 980));
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}

Expand Down Expand Up @@ -272,7 +253,7 @@ public void testNormalReadAfterVectoredRead() throws Exception {
Assertions.assertThat(in.getPos())
.describedAs("Vectored read shouldn't change file pointer.")
.isEqualTo(200);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}

Expand All @@ -290,7 +271,7 @@ public void testVectoredReadAfterNormalRead() throws Exception {
.describedAs("Vectored read shouldn't change file pointer.")
.isEqualTo(200);
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges);
validateVectoredReadResult(fileRanges, DATASET);
}
}

Expand All @@ -302,8 +283,8 @@ public void testMultipleVectoredReads() throws Exception {
try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
in.readVectored(fileRanges1, allocate);
in.readVectored(fileRanges2, allocate);
validateVectoredReadResult(fileRanges2);
validateVectoredReadResult(fileRanges1);
validateVectoredReadResult(fileRanges2, DATASET);
validateVectoredReadResult(fileRanges1, DATASET);
}
}

Expand All @@ -314,27 +295,6 @@ protected List<FileRange> createSomeOverlappingRanges() {
return fileRanges;
}

protected void validateVectoredReadResult(List<FileRange> fileRanges)
throws ExecutionException, InterruptedException {
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
int i = 0;
for (FileRange res : fileRanges) {
completableFutures[i++] = res.getData();
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
combinedFuture.get();

for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData();
try {
ByteBuffer buffer = FutureIOSupport.awaitFuture(data);
assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET);
} catch (Exception ex) {
LOG.error("Exception while running vectored read ", ex);
Assert.fail("Exception while running vectored read " + ex);
}
}
}

protected void testExceptionalVectoredRead(FileSystem fs,
List<FileRange> fileRanges,
Expand All @@ -351,26 +311,4 @@ protected void testExceptionalVectoredRead(FileSystem fs,
.describedAs(s)
.isTrue();
}

/**
* Assert that the data read matches the dataset at the given offset.
* This helps verify that the seek process is moving the read pointer
* to the correct location in the file.
* @param readOffset the offset in the file where the read began.
* @param operation operation name for the assertion.
* @param data data read in.
* @param length length of data to check.
* @param originalData
*/
private void assertDatasetEquals(
final int readOffset, final String operation,
final ByteBuffer data,
int length, byte[] originalData) {
for (int i = 0; i < length; i++) {
int o = readOffset + i;
assertEquals(operation + " with read offset " + readOffset
+ ": data[" + i + "] != DATASET[" + o + "]",
originalData[o], data.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand All @@ -29,6 +30,8 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.functional.FutureIO;

import org.junit.Assert;
import org.junit.AssumptionViolatedException;
import org.slf4j.Logger;
Expand All @@ -39,6 +42,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -49,6 +53,9 @@
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
Expand All @@ -68,6 +75,11 @@ public class ContractTestUtils extends Assert {
public static final String IO_CHUNK_MODULUS_SIZE = "io.chunk.modulus.size";
public static final int DEFAULT_IO_CHUNK_MODULUS_SIZE = 128;

/**
* Timeout in seconds for vectored read operation in tests : {@value}.
*/
public static final int VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS = 5 * 60;

/**
* Assert that a property in the property set matches the expected value.
* @param props property set
Expand Down Expand Up @@ -1095,6 +1107,59 @@ public static void validateFileContent(byte[] concat, byte[][] bytes) {
mismatch);
}

/**
* Utility to validate vectored read results.
* @param fileRanges input ranges.
* @param originalData original data.
* @throws IOException any ioe.
*/
public static void validateVectoredReadResult(List<FileRange> fileRanges,
byte[] originalData)
throws IOException, TimeoutException {
CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
int i = 0;
for (FileRange res : fileRanges) {
completableFutures[i++] = res.getData();
}
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
FutureIO.awaitFuture(combinedFuture,
VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS);

for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData();
ByteBuffer buffer = FutureIO.awaitFuture(data,
VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
assertDatasetEquals((int) res.getOffset(), "vecRead",
buffer, res.getLength(), originalData);
}
}


/**
* Assert that the data read matches the dataset at the given offset.
* This helps verify that the seek process is moving the read pointer
* to the correct location in the file.
* @param readOffset the offset in the file where the read began.
* @param operation operation name for the assertion.
* @param data data read in.
* @param length length of data to check.
* @param originalData original data.
*/
public static void assertDatasetEquals(
final int readOffset,
final String operation,
final ByteBuffer data,
int length, byte[] originalData) {
for (int i = 0; i < length; i++) {
int o = readOffset + i;
assertEquals(operation + " with read offset " + readOffset
+ ": data[" + i + "] != DATASET[" + o + "]",
originalData[o], data.get());
}
}

/**
* Receives test data from the given input file and checks the size of the
* data as well as the pattern inside the received data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* @param ctx operation context
* @param s3Attributes object attributes
* @param client S3 client to use
* @param streamStatistics stream io stats.
* @param unboundedThreadPool thread pool to use.
*/
public S3AInputStream(S3AReadOpContext ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public AuditSpan getAuditSpan() {
}

/**
<<<<<<< HEAD
* Set builder value.
* @param value new value
* @return the builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ public interface GetContentSummaryCallbacks {

/***
* List all entries under a path.
*
* @param path
* @param path path.
* @param recursive if the subdirectories need to be traversed recursively
* @return an iterator over the listing.
* @throws IOException failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
package org.apache.hadoop.fs.s3a.scale;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntFunction;

import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
Expand All @@ -35,7 +40,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
Expand All @@ -47,6 +55,7 @@
import org.apache.hadoop.util.Progressable;

import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
Expand Down Expand Up @@ -446,6 +455,30 @@ public void test_040_PositionedReadHugeFile() throws Throwable {
toHuman(timer.nanosPerOperation(ops)));
}

@Test
public void test_045_vectoredIOHugeFile() throws Throwable {
assumeHugeFileExists();
List<FileRange> rangeList = new ArrayList<>();
rangeList.add(new FileRangeImpl(5856368, 1167716));
rangeList.add(new FileRangeImpl(3520861, 1167700));
rangeList.add(new FileRangeImpl(8191913, 1167775));
rangeList.add(new FileRangeImpl(1520861, 1167700));
rangeList.add(new FileRangeImpl(2520861, 116770));
rangeList.add(new FileRangeImpl(9191913, 116770));
rangeList.add(new FileRangeImpl(2820861, 156770));
IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
FileSystem fs = getFileSystem();
CompletableFuture<FSDataInputStream> builder =
fs.openFile(hugefile).build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(rangeList, allocate);
byte[] readFullRes = new byte[(int)filesize];
in.readFully(0, readFullRes);
// Comparing vectored read results with read fully.
validateVectoredReadResult(rangeList, readFullRes);
}
}

/**
* Read in the entire file using read() calls.
* @throws Throwable failure
Expand Down