Skip to content

Commit 9f03f87

Browse files
committed
HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)
Part of HADOOP-18103. Introducing fs.s3a.vectored.read.min.seek.size and fs.s3a.vectored.read.max.merged.size to configure min seek and max read during a vectored IO operation in S3A connector. These properties actually define how the ranges will be merged. To completely disable merging set fs.s3a.max.readsize.vectored.read to 0. Contributed By: Mukund Thakur
1 parent 5c348c4 commit 9f03f87

11 files changed

Lines changed: 297 additions & 16 deletions

File tree

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ end of first and start of next range is more than this value.
474474

475475
Maximum number of bytes which can be read in one go after merging the ranges.
476476
Two ranges won't be merged if the combined data to be read is more than this value.
477+
Essentially setting this to 0 will disable the merging of ranges.
477478

478479
## Consistency
479480

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,6 @@
1818

1919
package org.apache.hadoop.fs.contract;
2020

21-
import org.apache.hadoop.fs.FSDataInputStream;
22-
import org.apache.hadoop.fs.FileRange;
23-
import org.apache.hadoop.fs.FileRangeImpl;
24-
import org.apache.hadoop.fs.FileSystem;
25-
import org.apache.hadoop.fs.Path;
26-
import org.apache.hadoop.fs.impl.FutureIOSupport;
27-
import org.slf4j.Logger;
28-
import org.slf4j.LoggerFactory;
29-
3021
import java.io.EOFException;
3122
import java.io.IOException;
3223
import java.nio.ByteBuffer;
@@ -42,6 +33,15 @@
4233
import org.junit.Test;
4334
import org.junit.runner.RunWith;
4435
import org.junit.runners.Parameterized;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
import org.apache.hadoop.fs.FSDataInputStream;
40+
import org.apache.hadoop.fs.FileRange;
41+
import org.apache.hadoop.fs.FileRangeImpl;
42+
import org.apache.hadoop.fs.FileSystem;
43+
import org.apache.hadoop.fs.Path;
44+
import org.apache.hadoop.fs.impl.FutureIOSupport;
4545

4646
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
4747

@@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
5252

5353
public static final int DATASET_LEN = 64 * 1024;
5454
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
55-
private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
55+
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
5656
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
5757
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
5858

@@ -172,6 +172,7 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
172172
}
173173
}
174174

175+
@Test
175176
public void testSameRanges() throws Exception {
176177
FileSystem fs = getFileSystem();
177178
List<FileRange> fileRanges = new ArrayList<>();

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,30 @@ public void testSortAndMergeMoreCases() throws Exception {
207207
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
208208

209209
}
210+
211+
@Test
212+
public void testMaxSizeZeroDisablesMering() throws Exception {
213+
List<FileRange> randomRanges = Arrays.asList(
214+
new FileRangeImpl(3000, 110),
215+
new FileRangeImpl(3000, 100),
216+
new FileRangeImpl(2100, 100)
217+
);
218+
assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
219+
assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
220+
assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0);
221+
}
222+
223+
private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
224+
int chunkSize,
225+
int minimumSeek,
226+
int maxSize) {
227+
List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
228+
.sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
229+
Assertions.assertThat(combinedFileRanges)
230+
.describedAs("Mismatch in number of ranges post merging")
231+
.hasSize(inputRanges.size());
232+
}
233+
210234
interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
211235
// nothing
212236
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,16 @@ public static <T> void assertFutureFailedExceptionally(CompletableFuture<T> futu
8686
"completed exceptionally")
8787
.isTrue();
8888
}
89+
90+
/**
91+
* Assert two same type of values.
92+
* @param actual actual value.
93+
* @param expected expected value.
94+
* @param message error message to print in case of mismatch.
95+
*/
96+
public static <T> void assertEqual(T actual, T expected, String message) {
97+
Assertions.assertThat(actual)
98+
.describedAs("Mismatch in %s", message)
99+
.isEqualTo(expected);
100+
}
89101
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,4 +1177,30 @@ private Constants() {
11771177
*/
11781178
public static final String FS_S3A_CREATE_HEADER = "fs.s3a.create.header";
11791179

1180+
/**
1181+
* What is the smallest reasonable seek in bytes such
1182+
* that we group ranges together during vectored read operation.
1183+
* Value : {@value}.
1184+
*/
1185+
public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE =
1186+
"fs.s3a.vectored.read.min.seek.size";
1187+
1188+
/**
1189+
* What is the largest merged read size in bytes such
1190+
* that we group ranges together during vectored read.
1191+
* Setting this value to 0 will disable merging of ranges.
1192+
* Value : {@value}.
1193+
*/
1194+
public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE =
1195+
"fs.s3a.vectored.read.max.merged.size";
1196+
1197+
/**
1198+
* Default minimum seek in bytes during vectored reads : {@value}.
1199+
*/
1200+
public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K
1201+
1202+
/**
1203+
* Default maximum read size in bytes during vectored reads : {@value}.
1204+
*/
1205+
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
11801206
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
313313
* {@code openFile()}.
314314
*/
315315
private S3AInputPolicy inputPolicy;
316+
/** Vectored IO context. */
317+
private VectoredIOContext vectoredIOContext;
318+
319+
private long readAhead;
316320
private ChangeDetectionPolicy changeDetectionPolicy;
317321
private final AtomicBoolean closed = new AtomicBoolean(false);
318322
private volatile boolean isClosed = false;
@@ -584,6 +588,7 @@ public void initialize(URI name, Configuration originalConf)
584588
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
585589
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
586590
inputPolicy);
591+
vectoredIOContext = populateVectoredIOContext(conf);
587592
} catch (AmazonClientException e) {
588593
// amazon client exception: stop all services then throw the translation
589594
cleanupWithLogger(LOG, span);
@@ -597,6 +602,23 @@ public void initialize(URI name, Configuration originalConf)
597602
}
598603
}
599604

605+
/**
606+
* Populates the configurations related to vectored IO operation
607+
* in the context which has to passed down to input streams.
608+
* @param conf configuration object.
609+
* @return VectoredIOContext.
610+
*/
611+
private VectoredIOContext populateVectoredIOContext(Configuration conf) {
612+
final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
613+
DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
614+
final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
615+
DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
616+
return new VectoredIOContext()
617+
.setMinSeekForVectoredReads(minSeekVectored)
618+
.setMaxReadSizeForVectoredReads(maxReadSizeVectored)
619+
.build();
620+
}
621+
600622
/**
601623
* Set the client side encryption gauge to 0 or 1, indicating if CSE is
602624
* enabled through the gauge or not.
@@ -1552,7 +1574,8 @@ protected S3AReadOpContext createReadContext(
15521574
invoker,
15531575
statistics,
15541576
statisticsContext,
1555-
fileStatus)
1577+
fileStatus,
1578+
vectoredIOContext)
15561579
.withAuditSpan(auditSpan);
15571580
openFileHelper.applyDefaultOptions(roc);
15581581
return roc.build();

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
145145
private S3AInputPolicy inputPolicy;
146146
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
147147

148+
/** Vectored IO context. */
149+
private final VectoredIOContext vectoredIOContext;
150+
148151
/**
149152
* This is the actual position within the object, used by
150153
* lazy seek to decide whether to seek on the next read or not.
@@ -212,6 +215,7 @@ public S3AInputStream(S3AReadOpContext ctx,
212215
setReadahead(ctx.getReadahead());
213216
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
214217
this.unboundedThreadPool = unboundedThreadPool;
218+
this.vectoredIOContext = context.getVectoredIOContext();
215219
}
216220

217221
/**
@@ -859,6 +863,7 @@ public String toString() {
859863
sb.append(" remainingInCurrentRequest=")
860864
.append(remainingInCurrentRequest());
861865
sb.append(" ").append(changeTracker);
866+
sb.append(" ").append(vectoredIOContext);
862867
sb.append('\n').append(s);
863868
sb.append('}');
864869
return sb.toString();
@@ -905,6 +910,22 @@ public void readFully(long position, byte[] buffer, int offset, int length)
905910
}
906911
}
907912

913+
/**
914+
* {@inheritDoc}.
915+
*/
916+
@Override
917+
public int minSeekForVectorReads() {
918+
return vectoredIOContext.getMinSeekForVectorReads();
919+
}
920+
921+
/**
922+
* {@inheritDoc}.
923+
*/
924+
@Override
925+
public int maxReadSizeForVectorReads() {
926+
return vectoredIOContext.getMaxReadSizeForVectorReads();
927+
}
928+
908929
/**
909930
* {@inheritDoc}
910931
* Vectored read implementation for S3AInputStream.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,24 +64,32 @@ public class S3AReadOpContext extends S3AOpContext {
6464
*/
6565
private long asyncDrainThreshold;
6666

67+
/**
68+
* Vectored IO context for vectored read api
69+
* in {@code S3AInputStream#readVectored(List, IntFunction)}.
70+
*/
71+
private final VectoredIOContext vectoredIOContext;
72+
6773
/**
6874
* Instantiate.
6975
* @param path path of read
7076
* @param invoker invoker for normal retries.
7177
* @param stats Fileystem statistics (may be null)
7278
* @param instrumentation statistics context
7379
* @param dstFileStatus target file status
80+
* @param vectoredIOContext context for vectored read operation.
7481
*/
7582
public S3AReadOpContext(
7683
final Path path,
7784
Invoker invoker,
7885
@Nullable FileSystem.Statistics stats,
7986
S3AStatisticsContext instrumentation,
80-
FileStatus dstFileStatus) {
81-
87+
FileStatus dstFileStatus,
88+
VectoredIOContext vectoredIOContext) {
8289
super(invoker, stats, instrumentation,
8390
dstFileStatus);
8491
this.path = requireNonNull(path);
92+
this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
8593
}
8694

8795
/**
@@ -145,6 +153,7 @@ public AuditSpan getAuditSpan() {
145153
}
146154

147155
/**
156+
<<<<<<< HEAD
148157
* Set builder value.
149158
* @param value new value
150159
* @return the builder
@@ -199,6 +208,14 @@ public long getAsyncDrainThreshold() {
199208
return asyncDrainThreshold;
200209
}
201210

211+
/**
212+
* Get Vectored IO context for this this read op.
213+
* @return vectored IO context.
214+
*/
215+
public VectoredIOContext getVectoredIOContext() {
216+
return vectoredIOContext;
217+
}
218+
202219
@Override
203220
public String toString() {
204221
final StringBuilder sb = new StringBuilder(
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import java.util.List;
22+
import java.util.function.IntFunction;
23+
24+
/**
25+
* Context related to vectored IO operation.
26+
* See {@link S3AInputStream#readVectored(List, IntFunction)}.
27+
*/
28+
public class VectoredIOContext {
29+
30+
/**
31+
* What is the smallest reasonable seek that we should group
32+
* ranges together during vectored read operation.
33+
*/
34+
private int minSeekForVectorReads;
35+
36+
/**
37+
* What is the largest size that we should group ranges
38+
* together during vectored read operation.
39+
* Setting this value 0 will disable merging of ranges.
40+
*/
41+
private int maxReadSizeForVectorReads;
42+
43+
/**
44+
* Default no arg constructor.
45+
*/
46+
public VectoredIOContext() {
47+
}
48+
49+
public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
50+
this.minSeekForVectorReads = minSeek;
51+
return this;
52+
}
53+
54+
public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
55+
this.maxReadSizeForVectorReads = maxSize;
56+
return this;
57+
}
58+
59+
public VectoredIOContext build() {
60+
return this;
61+
}
62+
63+
public int getMinSeekForVectorReads() {
64+
return minSeekForVectorReads;
65+
}
66+
67+
public int getMaxReadSizeForVectorReads() {
68+
return maxReadSizeForVectorReads;
69+
}
70+
71+
@Override
72+
public String toString() {
73+
return "VectoredIOContext{" +
74+
"minSeekForVectorReads=" + minSeekForVectorReads +
75+
", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
76+
'}';
77+
}
78+
}

0 commit comments

Comments
 (0)