Skip to content

Commit 67eda99

Browse files
committed
review comments
1 parent 3cf7987 commit 67eda99

4 files changed

Lines changed: 64 additions & 22 deletions

File tree

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void onHeadRequest() {
4747

4848
@Override
4949
public void onBlockPrefetch(long start, long end) {
50-
statistics.bytesPrefetched(end - start);
50+
statistics.bytesPrefetched(end - start + 1);
5151
}
5252

5353
@Override

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
3939
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
4040
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
41+
42+
import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_CACHE_TIMEOUT;
43+
import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_READ_BUFFER_SIZE;
44+
import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_REQUEST_COALESCE_TOLERANCE;
45+
import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_SMALL_OBJECT_PREFETCH_ENABLED;
4146
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
4247
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
4348
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
@@ -73,20 +78,20 @@ protected Configuration createConfiguration() {
7378
Configuration conf = super.createConfiguration();
7479
// Set the coalesce tolerance to 1KB, default is 1MB.
7580
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
76-
"." + "physicalio.request.coalesce.tolerance", S_16K);
81+
"." + AAL_REQUEST_COALESCE_TOLERANCE, S_16K);
7782

7883
// Set the minimum block size to 32KB. AAL uses a default block size of 128KB, which means the minimum size a S3
7984
// request will be is 128KB. Since the file being read is 128KB, we need to use this here to demonstrate that
8085
// separate GET requests are made for ranges that are not coalesced.
8186
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
82-
"." + "physicalio.readbuffersize", S_32K);
87+
"." + AAL_READ_BUFFER_SIZE, S_32K);
8388

8489
// Disable small object prefetched, otherwise anything less than 8MB is fetched in a single GET.
8590
conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
86-
"." + "physicalio.small.objects.prefetching.enabled", "false");
91+
"." + AAL_SMALL_OBJECT_PREFETCH_ENABLED, "false");
8792

8893
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
89-
"." + "physicalio.cache.timeout", 5000);
94+
"." + AAL_CACHE_TIMEOUT, 5000);
9095

9196
enableAnalyticsAccelerator(conf);
9297
// If encryption is set, some AAL tests will fail.

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,13 @@
5252
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
5353
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
5454
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST;
55-
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
55+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
56+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
57+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
58+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
59+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS;
60+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
61+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
5662
import static org.apache.hadoop.io.Sizes.S_1K;
5763
import static org.apache.hadoop.io.Sizes.S_1M;
5864
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -95,6 +101,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
95101

96102
S3AFileSystem fs =
97103
(S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration());
104+
105+
long fileLength = fs.getFileStatus(externalTestFile).getLen();
106+
107+
// Head request for the file length.
108+
verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 1);
109+
98110
byte[] buffer = new byte[500];
99111
IOStatistics ioStats;
100112

@@ -123,9 +135,9 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
123135

124136
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
125137

126-
// Total file size is: 21511173, and read starts from pos 5. Since policy is WHOLE_FILE, the whole file starts
127-
// getting prefetched as soon as the stream to it is opened. So prefetched bytes is 21511173 - 5 = 21511168
128-
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 21511168);
138+
// Since policy is WHOLE_FILE, the whole file starts getting prefetched as soon as the stream to it is opened.
139+
// So prefetched bytes is fileLen - 5
140+
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, fileLength - 5);
129141

130142
fs.close();
131143
verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1);
@@ -134,7 +146,7 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
134146
// in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here
135147
// s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS:
136148
// [0-8388607, 8388608-16777215, 16777216-21511173].
137-
verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4);
149+
verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 5);
138150
}
139151

140152
@Test
@@ -146,13 +158,15 @@ public void testSequentialPrefetching() throws IOException {
146158
// While this works well when running on EC2, for local testing, it can take more than 1s to download large chunks
147159
// of data. Set this value to higher for testing to prevent early cache evictions.
148160
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
149-
"." + "physicalio.cache.timeout", 10000);
161+
"." + AAL_CACHE_TIMEOUT, 10000);
150162

151163
S3AFileSystem fs =
152164
(S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration());
153165
byte[] buffer = new byte[10 * ONE_MB];
154166
IOStatistics ioStats;
155167

168+
long fileLength = fs.getFileStatus(externalTestFile).getLen();
169+
156170
// Here we read through the 21MB external test file, but do not pass in the WHOLE_FILE policy. Instead, we rely
157171
// on AAL detecting a sequential pattern being read, and then prefetching bytes in a geometrical progression.
158172
// AAL's sequential prefetching starts prefetching in increments 4MB, 8MB, 16MB etc. depending on how many
@@ -181,20 +195,19 @@ public void testSequentialPrefetching() throws IOException {
181195
verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
182196
// A total of 10MB is prefetched - 3MB and then 7MB.
183197
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB);
184-
198+
long bytesRemainingForPrefetch = fileLength - (inputStream.getPos() + 10 * ONE_MB);
185199
inputStream.readFully(buffer, 0, 10 * ONE_MB);
186200

187-
// Though the next GP should prefetch 16MB, since the file is ~23MB, only the bytes till EoF are prefetched:
188-
// 6291456 remaining bytes.
189-
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB + 6291456);
201+
202+
// Though the next GP should prefetch 16MB, since the file is ~23MB, only the bytes till EoF are prefetched.
203+
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB + bytesRemainingForPrefetch);
190204
inputStream.readFully(buffer, 0, 3 * ONE_MB);
191205
verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 3);
192206
}
193207

194208
// verify all AAL stats are passed to the FS.
195209
verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_CACHE_HIT, 3);
196210
verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 0);
197-
verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB + 6291456);
198211
}
199212

200213
@Test
@@ -213,6 +226,8 @@ public void testMalformedParquetFooter() throws IOException {
213226
Path sourcePath = new Path(file.toURI().getPath());
214227
getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
215228

229+
long fileLength = getFileSystem().getFileStatus(dest).getLen();
230+
216231
byte[] buffer = new byte[500];
217232
IOStatistics ioStats;
218233
int bytesRead;
@@ -232,9 +247,9 @@ public void testMalformedParquetFooter() throws IOException {
232247
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
233248
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
234249
verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
235-
// This file has a content length of 450. Since it's a parquet file, AAL will prefetch the footer bytes (last 32KB),
250+
// This file has a content length of 451. Since it's a parquet file, AAL will prefetch the footer bytes (last 32KB),
236251
// as soon as the file is opened, but because the file is < 32KB, the whole file is prefetched.
237-
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 450);
252+
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, fileLength);
238253

239254
// The footer is only prefetched once, but parsing is attempted on each stream open.
240255
verifyStatisticCounterValue(ioStats, STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 1);
@@ -367,8 +382,10 @@ public void testSequentialStreamsNoDuplicateGets() throws Throwable {
367382
describe("Sequential streams reading same object should not duplicate GETs");
368383

369384
Path dest = path("sequential-test.txt");
370-
byte[] data = dataset(S_1M, 256, 255);
371-
writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);
385+
int fileLen = S_1M;
386+
387+
byte[] data = dataset(fileLen, 256, 255);
388+
writeDataset(getFileSystem(), dest, data, fileLen, 1024, true);
372389

373390
byte[] buffer = new byte[ONE_MB];
374391
try (FSDataInputStream stream1 = getFileSystem().open(dest);
@@ -386,14 +403,14 @@ public void testSequentialStreamsNoDuplicateGets() throws Throwable {
386403

387404
// Since it's a small file (ALL will prefetch the whole file for size < 8MB), the whole file is prefetched
388405
// on the first read.
389-
verifyStatisticCounterValue(stats1, STREAM_READ_PREFETCHED_BYTES, 1048575);
406+
verifyStatisticCounterValue(stats1, STREAM_READ_PREFETCHED_BYTES, fileLen);
390407

391408
// The second stream will not prefetch any bytes, as they have already been prefetched by stream 1.
392409
verifyStatisticCounterValue(stats2, STREAM_READ_PREFETCHED_BYTES, 0);
393410
}
394411

395412
// verify value is passed up to the FS
396-
verifyStatisticCounterValue(getFileSystem().getIOStatistics(), STREAM_READ_PREFETCHED_BYTES, 1048575);
413+
verifyStatisticCounterValue(getFileSystem().getIOStatistics(), STREAM_READ_PREFETCHED_BYTES, fileLen);
397414

398415
// We did 3 reads, all of them were served from the small object cache. In this case, the whole object was
399416
// downloaded as soon as the stream to it was opened.

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,4 +284,24 @@ public interface S3ATestConstants {
284284
* Default policy on root tests: {@value}.
285285
*/
286286
boolean DEFAULT_ROOT_TESTS_ENABLED = true;
287+
288+
/**
289+
* Ranges within this distance of each other will be coalesced.
290+
*/
291+
String AAL_REQUEST_COALESCE_TOLERANCE = "physicalio.request.coalesce.tolerance";
292+
293+
/**
294+
* The minimum size of a block in AAL.
295+
*/
296+
String AAL_READ_BUFFER_SIZE = "physicalio.readbuffersize";
297+
298+
/**
299+
* Objects smaller than this will be downloaded completely.
300+
*/
301+
String AAL_SMALL_OBJECT_PREFETCH_ENABLED = "physicalio.small.objects.prefetching.enabled";
302+
303+
/**
304+
* Objects in AAL's cache will expire after this duration.
305+
*/
306+
String AAL_CACHE_TIMEOUT = "physicalio.cache.timeout";
287307
}

0 commit comments

Comments
 (0)