Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hadoop.fs.s3a;

import java.net.URI;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +34,6 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
Expand All @@ -49,7 +46,6 @@
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Test the prefetching input stream, validates that the underlying S3ACachingInputStream and
Expand All @@ -64,47 +60,39 @@ public ITestS3APrefetchingInputStream() {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class);

private static final int S_1K = 1024;
private static final int S_500 = 512;
private static final int S_1K = S_500 * 2;
private static final int S_1M = S_1K * S_1K;
// Path for file which should have length > block size so S3ACachingInputStream is used
private Path largeFile;
private FileSystem largeFileFS;
private int numBlocks;
private int blockSize;

// Size should be > block size so S3ACachingInputStream is used
private long largeFileSize;

// Size should be < block size so S3AInMemoryInputStream is used
private static final int SMALL_FILE_SIZE = S_1K * 16;
private static final int SMALL_FILE_SIZE = S_1K * 9;

private static final int TIMEOUT_MILLIS = 5000;
private static final int INTERVAL_MILLIS = 500;

private static final int BLOCK_SIZE = S_1K * 10;

@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the size is now being set in this method, the filesystem automatically created/destroyed is already set up for the tests; we can replace openFS() entirely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, makes them much simpler

return conf;
}

@Override
public void teardown() throws Exception {
super.teardown();
cleanupWithLogger(LOG, largeFileFS);
largeFileFS = null;
}

private void openFS() throws Exception {
Configuration conf = getConfiguration();
String largeFileUri = S3ATestUtils.getCSVTestFile(conf);

largeFile = new Path(largeFileUri);
blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
largeFileFS = new S3AFileSystem();
largeFileFS.initialize(new URI(largeFileUri), getConfiguration());
private void createLargeFile() throws Exception {
byte[] data = ContractTestUtils.dataset(S_1K * 72, 'x', 26);
Path largeFile = methodPath();
FileSystem largeFileFS = getFileSystem();
ContractTestUtils.writeDataset(getFileSystem(), largeFile, data, data.length, 16, true);
FileStatus fileStatus = largeFileFS.getFileStatus(largeFile);
largeFileSize = fileStatus.getLen();
numBlocks = calculateNumBlocks(largeFileSize, blockSize);
numBlocks = calculateNumBlocks(largeFileSize, BLOCK_SIZE);
}

private static int calculateNumBlocks(long largeFileSize, int blockSize) {
Expand All @@ -119,9 +107,9 @@ private static int calculateNumBlocks(long largeFileSize, int blockSize) {
public void testReadLargeFileFully() throws Throwable {
describe("read a large file fully, uses S3ACachingInputStream");
IOStatistics ioStats;
openFS();
createLargeFile();

try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();

byte[] buffer = new byte[S_1M * 10];
Expand Down Expand Up @@ -152,9 +140,9 @@ public void testReadLargeFileFullyLazySeek() throws Throwable {
describe("read a large file using readFully(position,buffer,offset,length),"
+ " uses S3ACachingInputStream");
IOStatistics ioStats;
openFS();
createLargeFile();

try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();

byte[] buffer = new byte[S_1M * 10];
Expand Down Expand Up @@ -183,25 +171,25 @@ public void testReadLargeFileFullyLazySeek() throws Throwable {
public void testRandomReadLargeFile() throws Throwable {
describe("random read on a large file, uses S3ACachingInputStream");
IOStatistics ioStats;
openFS();
createLargeFile();

try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();

byte[] buffer = new byte[blockSize];
byte[] buffer = new byte[BLOCK_SIZE];

// Don't read block 0 completely so it gets cached on read after seek
in.read(buffer, 0, blockSize - S_1K * 10);
in.read(buffer, 0, BLOCK_SIZE - S_500 * 10);

// Seek to block 2 and read all of it
in.seek(blockSize * 2);
in.read(buffer, 0, blockSize);
in.seek(BLOCK_SIZE * 2);
in.read(buffer, 0, BLOCK_SIZE);

// Seek to block 4 but don't read: noop.
in.seek(blockSize * 4);
in.seek(BLOCK_SIZE * 4);

// Backwards seek, will use cached block 0
in.seek(S_1K * 5);
in.seek(S_500 * 5);
in.read();

// Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch)
Expand Down Expand Up @@ -234,9 +222,9 @@ public void testRandomReadSmallFile() throws Throwable {

byte[] buffer = new byte[SMALL_FILE_SIZE];

in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);
in.read(buffer, 0, S_1K * 2);
in.seek(S_1K * 7);
in.read(buffer, 0, S_1K * 2);

verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
Expand All @@ -261,9 +249,9 @@ public void testStatusProbesAfterClosingStream() throws Throwable {
FSDataInputStream in = getFileSystem().open(smallFile);

byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);
in.read(buffer, 0, S_1K * 2);
in.seek(S_1K * 7);
in.read(buffer, 0, S_1K * 2);

long pos = in.getPos();
IOStatistics ioStats = in.getIOStatistics();
Expand Down Expand Up @@ -298,7 +286,6 @@ public void testStatusProbesAfterClosingStream() throws Throwable {
inputStreamStatistics, newInputStreamStatistics);

assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
Expand All @@ -37,19 +36,17 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Test the prefetching input stream with LRU cache eviction on S3ACachingInputStream.
Expand All @@ -63,9 +60,7 @@ public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest {
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{"1"},
{"2"},
{"3"},
{"4"}
{"2"}
});
}

Expand All @@ -78,45 +73,32 @@ public ITestS3APrefetchingLruEviction(final String maxBlocks) {
LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class);

private static final int S_1K = 1024;
// Path for file which should have length > block size so S3ACachingInputStream is used
private Path largeFile;
private FileSystem largeFileFS;
private int blockSize;
private static final int S_500 = 512;
private static final int SMALL_FILE_SIZE = S_1K * 56;

private static final int TIMEOUT_MILLIS = 5000;
private static final int TIMEOUT_MILLIS = 3000;
private static final int INTERVAL_MILLIS = 500;
private static final int BLOCK_SIZE = S_1K * 10;

@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
return conf;
}

@Override
public void teardown() throws Exception {
super.teardown();
cleanupWithLogger(LOG, largeFileFS);
largeFileFS = null;
}

private void openFS() throws Exception {
Configuration conf = getConfiguration();
String largeFileUri = S3ATestUtils.getCSVTestFile(conf);

largeFile = new Path(largeFileUri);
blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
largeFileFS = new S3AFileSystem();
largeFileFS.initialize(new URI(largeFileUri), getConfiguration());
}

@Test
public void testSeeksWithLruEviction() throws Throwable {
IOStatistics ioStats;
openFS();
byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'x', 26);
// Path for file which should have length > block size so S3ACachingInputStream is used
Path smallFile = methodPath();
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);

ExecutorService executorService = Executors.newFixedThreadPool(5,
new ThreadFactoryBuilder()
Expand All @@ -125,7 +107,7 @@ public void testSeeksWithLruEviction() throws Throwable {
.build());
CountDownLatch countDownLatch = new CountDownLatch(7);

try (FSDataInputStream in = largeFileFS.open(largeFile)) {
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();
// tests to add multiple blocks in the prefetch cache
// and let LRU eviction take place as more cache entries
Expand All @@ -135,43 +117,43 @@ public void testSeeksWithLruEviction() throws Throwable {
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
0,
blockSize - S_1K * 10));
BLOCK_SIZE - S_500 * 10));

// Seek to block 1 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize,
2 * S_1K));
BLOCK_SIZE,
2 * S_500));

// Seek to block 2 and don't read completely
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
blockSize * 2L,
2 * S_1K));
BLOCK_SIZE * 2L,
2 * S_500));

// Seek to block 3 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize * 3L,
2 * S_1K));
BLOCK_SIZE * 3L,
2 * S_500));

// Seek to block 4 and don't read completely
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
blockSize * 4L,
2 * S_1K));
BLOCK_SIZE * 4L,
2 * S_500));

// Seek to block 5 and don't read completely
executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
in,
blockSize * 5L,
2 * S_1K));
BLOCK_SIZE * 5L,
2 * S_500));

// backward seek, can't use block 0 as it is evicted
executorService.submit(() -> readFullyWithSeek(countDownLatch,
in,
S_1K * 5,
2 * S_1K));
S_500 * 5,
2 * S_500));

countDownLatch.await();

Expand Down Expand Up @@ -205,8 +187,7 @@ public void testSeeksWithLruEviction() throws Throwable {
*/
private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in,
long position, int len) {
byte[] buffer = new byte[blockSize];
// Don't read block 0 completely
byte[] buffer = new byte[BLOCK_SIZE];
try {
in.readFully(position, buffer, 0, len);
countDownLatch.countDown();
Expand All @@ -228,8 +209,7 @@ private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDat
*/
private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in,
long position, int len) {
byte[] buffer = new byte[blockSize];
// Don't read block 0 completely
byte[] buffer = new byte[BLOCK_SIZE];
try {
in.seek(position);
in.readFully(buffer, 0, len);
Expand Down