-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-18805. S3A prefetch tests to work with small files #5851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
f83da9b
de92bc4
37b5bf6
bab8785
1e4bc26
2170057
d2e2d73
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,6 @@ | |
| import org.apache.hadoop.fs.statistics.IOStatistics; | ||
| import org.apache.hadoop.test.LambdaTestUtils; | ||
|
|
||
| import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; | ||
| import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; | ||
| import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; | ||
| import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; | ||
|
|
@@ -64,26 +63,30 @@ public ITestS3APrefetchingInputStream() { | |
| private static final Logger LOG = | ||
| LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class); | ||
|
|
||
| private static final int S_1K = 1024; | ||
| private static final int S_500 = 512; | ||
| private static final int S_1K = S_500 * 2; | ||
| private static final int S_1M = S_1K * S_1K; | ||
| // Path for file which should have length > block size so S3ACachingInputStream is used | ||
| private int numBlocks; | ||
| private Path largeFile; | ||
| private FileSystem largeFileFS; | ||
| private int numBlocks; | ||
| private int blockSize; | ||
|
|
||
| // Size should be > block size so S3ACachingInputStream is used | ||
| private long largeFileSize; | ||
|
|
||
| // Size should be < block size so S3AInMemoryInputStream is used | ||
| private static final int SMALL_FILE_SIZE = S_1K * 16; | ||
| private static final int SMALL_FILE_SIZE = S_1K * 9; | ||
|
|
||
| private static final int TIMEOUT_MILLIS = 5000; | ||
| private static final int INTERVAL_MILLIS = 500; | ||
|
|
||
| private static final int BLOCK_SIZE = S_1K * 10; | ||
|
|
||
| @Override | ||
| public Configuration createConfiguration() { | ||
| Configuration conf = super.createConfiguration(); | ||
| S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); | ||
| S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); | ||
| conf.setBoolean(PREFETCH_ENABLED_KEY, true); | ||
| conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); | ||
| return conf; | ||
| } | ||
|
|
||
|
|
@@ -94,17 +97,16 @@ public void teardown() throws Exception { | |
| largeFileFS = null; | ||
| } | ||
|
|
||
| private void openFS() throws Exception { | ||
| private void openFS(String fileName) throws Exception { | ||
|
||
| Configuration conf = getConfiguration(); | ||
| String largeFileUri = S3ATestUtils.getCSVTestFile(conf); | ||
|
|
||
| largeFile = new Path(largeFileUri); | ||
| blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); | ||
| byte[] data = ContractTestUtils.dataset(S_1K * 72, 'x', 26); | ||
| largeFile = path(fileName); | ||
| ContractTestUtils.writeDataset(getFileSystem(), largeFile, data, data.length, 16, true); | ||
| largeFileFS = new S3AFileSystem(); | ||
| largeFileFS.initialize(new URI(largeFileUri), getConfiguration()); | ||
| largeFileFS.initialize(new URI(largeFile.toString()), getConfiguration()); | ||
| FileStatus fileStatus = largeFileFS.getFileStatus(largeFile); | ||
| largeFileSize = fileStatus.getLen(); | ||
| numBlocks = calculateNumBlocks(largeFileSize, blockSize); | ||
| numBlocks = calculateNumBlocks(largeFileSize, BLOCK_SIZE); | ||
| } | ||
|
|
||
| private static int calculateNumBlocks(long largeFileSize, int blockSize) { | ||
|
|
@@ -119,7 +121,7 @@ private static int calculateNumBlocks(long largeFileSize, int blockSize) { | |
| public void testReadLargeFileFully() throws Throwable { | ||
| describe("read a large file fully, uses S3ACachingInputStream"); | ||
| IOStatistics ioStats; | ||
| openFS(); | ||
| openFS("testReadLargeFileFully"); | ||
|
|
||
| try (FSDataInputStream in = largeFileFS.open(largeFile)) { | ||
| ioStats = in.getIOStatistics(); | ||
|
|
@@ -152,7 +154,7 @@ public void testReadLargeFileFullyLazySeek() throws Throwable { | |
| describe("read a large file using readFully(position,buffer,offset,length)," | ||
| + " uses S3ACachingInputStream"); | ||
| IOStatistics ioStats; | ||
| openFS(); | ||
| openFS("testReadLargeFileFullyLazySeek"); | ||
|
|
||
| try (FSDataInputStream in = largeFileFS.open(largeFile)) { | ||
| ioStats = in.getIOStatistics(); | ||
|
|
@@ -183,25 +185,25 @@ public void testReadLargeFileFullyLazySeek() throws Throwable { | |
| public void testRandomReadLargeFile() throws Throwable { | ||
| describe("random read on a large file, uses S3ACachingInputStream"); | ||
| IOStatistics ioStats; | ||
| openFS(); | ||
| openFS("testRandomReadLargeFile"); | ||
|
|
||
| try (FSDataInputStream in = largeFileFS.open(largeFile)) { | ||
| ioStats = in.getIOStatistics(); | ||
|
|
||
| byte[] buffer = new byte[blockSize]; | ||
| byte[] buffer = new byte[BLOCK_SIZE]; | ||
|
|
||
| // Don't read block 0 completely so it gets cached on read after seek | ||
| in.read(buffer, 0, blockSize - S_1K * 10); | ||
| in.read(buffer, 0, BLOCK_SIZE - S_500 * 10); | ||
|
|
||
| // Seek to block 2 and read all of it | ||
| in.seek(blockSize * 2); | ||
| in.read(buffer, 0, blockSize); | ||
| in.seek(BLOCK_SIZE * 2); | ||
| in.read(buffer, 0, BLOCK_SIZE); | ||
|
|
||
| // Seek to block 4 but don't read: noop. | ||
| in.seek(blockSize * 4); | ||
| in.seek(BLOCK_SIZE * 4); | ||
|
|
||
| // Backwards seek, will use cached block 0 | ||
| in.seek(S_1K * 5); | ||
| in.seek(S_500 * 5); | ||
| in.read(); | ||
|
|
||
| // Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch) | ||
|
|
@@ -234,9 +236,9 @@ public void testRandomReadSmallFile() throws Throwable { | |
|
|
||
| byte[] buffer = new byte[SMALL_FILE_SIZE]; | ||
|
|
||
| in.read(buffer, 0, S_1K * 4); | ||
| in.seek(S_1K * 12); | ||
| in.read(buffer, 0, S_1K * 4); | ||
| in.read(buffer, 0, S_1K * 2); | ||
| in.seek(S_1K * 7); | ||
| in.read(buffer, 0, S_1K * 2); | ||
|
|
||
| verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); | ||
| verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1); | ||
|
|
@@ -261,9 +263,9 @@ public void testStatusProbesAfterClosingStream() throws Throwable { | |
| FSDataInputStream in = getFileSystem().open(smallFile); | ||
|
|
||
| byte[] buffer = new byte[SMALL_FILE_SIZE]; | ||
| in.read(buffer, 0, S_1K * 4); | ||
| in.seek(S_1K * 12); | ||
| in.read(buffer, 0, S_1K * 4); | ||
| in.read(buffer, 0, S_1K * 2); | ||
| in.seek(S_1K * 7); | ||
| in.read(buffer, 0, S_1K * 2); | ||
|
|
||
| long pos = in.getPos(); | ||
| IOStatistics ioStats = in.getIOStatistics(); | ||
|
|
@@ -298,7 +300,6 @@ public void testStatusProbesAfterClosingStream() throws Throwable { | |
| inputStreamStatistics, newInputStreamStatistics); | ||
|
|
||
| assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10)); | ||
|
|
||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,11 +39,11 @@ | |
| import org.apache.hadoop.fs.FSDataInputStream; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.fs.contract.ContractTestUtils; | ||
| import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; | ||
| import org.apache.hadoop.fs.statistics.IOStatistics; | ||
| import org.apache.hadoop.test.LambdaTestUtils; | ||
|
|
||
| import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; | ||
| import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; | ||
| import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; | ||
| import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT; | ||
|
|
@@ -63,9 +63,7 @@ public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest { | |
| public static Collection<Object[]> params() { | ||
| return Arrays.asList(new Object[][]{ | ||
| {"1"}, | ||
| {"2"}, | ||
| {"3"}, | ||
| {"4"} | ||
| {"2"} | ||
| }); | ||
| } | ||
|
|
||
|
|
@@ -78,39 +76,43 @@ public ITestS3APrefetchingLruEviction(final String maxBlocks) { | |
| LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class); | ||
|
|
||
| private static final int S_1K = 1024; | ||
| private static final int S_500 = 512; | ||
| private static final int SMALL_FILE_SIZE = S_1K * 56; | ||
|
|
||
| // Path for file which should have length > block size so S3ACachingInputStream is used | ||
| private Path largeFile; | ||
| private FileSystem largeFileFS; | ||
| private int blockSize; | ||
| private Path smallFile; | ||
| private FileSystem smallFileFS; | ||
|
|
||
| private static final int TIMEOUT_MILLIS = 5000; | ||
| private static final int TIMEOUT_MILLIS = 3000; | ||
| private static final int INTERVAL_MILLIS = 500; | ||
| private static final int BLOCK_SIZE = S_1K * 10; | ||
|
|
||
| @Override | ||
| public Configuration createConfiguration() { | ||
| Configuration conf = super.createConfiguration(); | ||
| S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); | ||
| S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT); | ||
| S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); | ||
| conf.setBoolean(PREFETCH_ENABLED_KEY, true); | ||
| conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks)); | ||
| conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); | ||
| return conf; | ||
| } | ||
|
|
||
| @Override | ||
| public void teardown() throws Exception { | ||
| super.teardown(); | ||
| cleanupWithLogger(LOG, largeFileFS); | ||
| largeFileFS = null; | ||
| cleanupWithLogger(LOG, smallFileFS); | ||
| smallFileFS = null; | ||
| } | ||
|
|
||
| private void openFS() throws Exception { | ||
| Configuration conf = getConfiguration(); | ||
| String largeFileUri = S3ATestUtils.getCSVTestFile(conf); | ||
|
|
||
| largeFile = new Path(largeFileUri); | ||
| blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); | ||
| largeFileFS = new S3AFileSystem(); | ||
| largeFileFS.initialize(new URI(largeFileUri), getConfiguration()); | ||
| byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'x', 26); | ||
| smallFile = path("iTestS3APrefetchingLruEviction"); | ||
| ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true); | ||
| smallFileFS = new S3AFileSystem(); | ||
|
||
| smallFileFS.initialize(new URI(smallFile.toString()), getConfiguration()); | ||
| } | ||
|
|
||
| @Test | ||
|
|
@@ -125,7 +127,7 @@ public void testSeeksWithLruEviction() throws Throwable { | |
| .build()); | ||
| CountDownLatch countDownLatch = new CountDownLatch(7); | ||
|
|
||
| try (FSDataInputStream in = largeFileFS.open(largeFile)) { | ||
| try (FSDataInputStream in = smallFileFS.open(smallFile)) { | ||
| ioStats = in.getIOStatistics(); | ||
| // tests to add multiple blocks in the prefetch cache | ||
| // and let LRU eviction take place as more cache entries | ||
|
|
@@ -135,43 +137,43 @@ public void testSeeksWithLruEviction() throws Throwable { | |
| executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, | ||
| in, | ||
| 0, | ||
| blockSize - S_1K * 10)); | ||
| BLOCK_SIZE - S_500 * 10)); | ||
|
|
||
| // Seek to block 1 and don't read completely | ||
| executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, | ||
| in, | ||
| blockSize, | ||
| 2 * S_1K)); | ||
| BLOCK_SIZE, | ||
| 2 * S_500)); | ||
|
|
||
| // Seek to block 2 and don't read completely | ||
| executorService.submit(() -> readFullyWithSeek(countDownLatch, | ||
| in, | ||
| blockSize * 2L, | ||
| 2 * S_1K)); | ||
| BLOCK_SIZE * 2L, | ||
| 2 * S_500)); | ||
|
|
||
| // Seek to block 3 and don't read completely | ||
| executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, | ||
| in, | ||
| blockSize * 3L, | ||
| 2 * S_1K)); | ||
| BLOCK_SIZE * 3L, | ||
| 2 * S_500)); | ||
|
|
||
| // Seek to block 4 and don't read completely | ||
| executorService.submit(() -> readFullyWithSeek(countDownLatch, | ||
| in, | ||
| blockSize * 4L, | ||
| 2 * S_1K)); | ||
| BLOCK_SIZE * 4L, | ||
| 2 * S_500)); | ||
|
|
||
| // Seek to block 5 and don't read completely | ||
| executorService.submit(() -> readFullyWithPositionedRead(countDownLatch, | ||
| in, | ||
| blockSize * 5L, | ||
| 2 * S_1K)); | ||
| BLOCK_SIZE * 5L, | ||
| 2 * S_500)); | ||
|
|
||
| // backward seek, can't use block 0 as it is evicted | ||
| executorService.submit(() -> readFullyWithSeek(countDownLatch, | ||
| in, | ||
| S_1K * 5, | ||
| 2 * S_1K)); | ||
| S_500 * 5, | ||
| 2 * S_500)); | ||
|
|
||
| countDownLatch.await(); | ||
|
|
||
|
|
@@ -205,8 +207,7 @@ public void testSeeksWithLruEviction() throws Throwable { | |
| */ | ||
| private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in, | ||
| long position, int len) { | ||
| byte[] buffer = new byte[blockSize]; | ||
| // Don't read block 0 completely | ||
| byte[] buffer = new byte[BLOCK_SIZE]; | ||
| try { | ||
| in.readFully(position, buffer, 0, len); | ||
| countDownLatch.countDown(); | ||
|
|
@@ -228,8 +229,7 @@ private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDat | |
| */ | ||
| private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in, | ||
| long position, int len) { | ||
| byte[] buffer = new byte[blockSize]; | ||
| // Don't read block 0 completely | ||
| byte[] buffer = new byte[BLOCK_SIZE]; | ||
| try { | ||
| in.seek(position); | ||
| in.readFully(buffer, 0, len); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the size is now being set in this method, the filesystem automatically created/destroyed is already set up for the tests; we can replace openFS() entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, makes them much simpler