Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ end of first and start of next range is more than this value.

Maximum number of bytes which can be read in one go after merging the ranges.
Two ranges won't be merged if the combined data to be read is more than this value.
Essentially setting this to 0 will disable the merging of ranges.

## Consistency

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@

package org.apache.hadoop.fs.contract;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -42,6 +33,15 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileRangeImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;

import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;

Expand All @@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac

public static final int DATASET_LEN = 64 * 1024;
private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);

Expand Down Expand Up @@ -172,6 +172,7 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
}
}

@Test
public void testSameRanges() throws Exception {
FileSystem fs = getFileSystem();
List<FileRange> fileRanges = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,30 @@ public void testSortAndMergeMoreCases() throws Exception {
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));

}

@Test
public void testMaxSizeZeroDisablesMering() throws Exception {
List<FileRange> randomRanges = Arrays.asList(
new FileRangeImpl(3000, 110),
new FileRangeImpl(3000, 100),
new FileRangeImpl(2100, 100)
);
assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0);
}

private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
int chunkSize,
int minimumSeek,
int maxSize) {
List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
.sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
Assertions.assertThat(combinedFileRanges)
.describedAs("Mismatch in number of ranges post merging")
.hasSize(inputRanges.size());
}

interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
// nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,16 @@ public static <T> void assertFutureFailedExceptionally(CompletableFuture<T> futu
"completed exceptionally")
.isTrue();
}

/**
* Assert two same type of values.
* @param actual actual value.
* @param expected expected value.
* @param message error message to print in case of mismatch.
*/
public static <T> void assertEqual(T actual, T expected, String message) {
Assertions.assertThat(actual)
.describedAs("Mismatch in %s", message)
.isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1066,4 +1066,31 @@ private Constants() {
* Require that all S3 access is made through Access Points.
*/
public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";

/**
* What is the smallest reasonable seek in bytes such
* that we group ranges together during vectored read operation.
* Value : {@value}.
*/
public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE =
"fs.s3a.vectored.read.min.seek.size";

/**
* What is the largest merged read size in bytes such
* that we group ranges together during vectored read.
* Setting this value to 0 will disable merging of ranges.
* Value : {@value}.
*/
public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE =
"fs.s3a.vectored.read.max.merged.size";

/**
* Default minimum seek in bytes during vectored reads : {@value}.
*/
public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K

/**
* Default maximum read size in bytes during vectored reads : {@value}.
*/
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Storage Statistics Bonded to the instrumentation. */
private S3AStorageStatistics storageStatistics;

/** Vectored IO context. */
private VectoredIOContext vectoredIOContext;

private long readAhead;
private S3AInputPolicy inputPolicy;
private ChangeDetectionPolicy changeDetectionPolicy;
Expand Down Expand Up @@ -551,6 +554,7 @@ public void initialize(URI name, Configuration originalConf)
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
listing = new Listing(listingOperationCallbacks, createStoreContext());
vectoredIOContext = populateVectoredIOContext(conf);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
Expand All @@ -564,6 +568,23 @@ public void initialize(URI name, Configuration originalConf)
}
}

/**
* Populates the configurations related to vectored IO operation
* in the context which has to passed down to input streams.
* @param conf configuration object.
* @return VectoredIOContext.
*/
private VectoredIOContext populateVectoredIOContext(Configuration conf) {
final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
return new VectoredIOContext()
.setMinSeekForVectoredReads(minSeekVectored)
.setMaxReadSizeForVectoredReads(maxReadSizeVectored)
.build();
}

/**
* Set the client side encryption gauge to 0 or 1, indicating if CSE is
* enabled through the gauge or not.
Expand Down Expand Up @@ -1526,7 +1547,8 @@ protected S3AReadOpContext createReadContext(
seekPolicy,
changePolicy,
readAheadRange,
auditSpan);
auditSpan,
vectoredIOContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
private S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;

/** Vectored IO context. */
private final VectoredIOContext vectoredIOContext;

/**
* This is the actual position within the object, used by
* lazy seek to decide whether to seek on the next read or not.
Expand Down Expand Up @@ -197,6 +200,7 @@ public S3AInputStream(S3AReadOpContext ctx,
setInputPolicy(ctx.getInputPolicy());
setReadahead(ctx.getReadahead());
this.unboundedThreadPool = unboundedThreadPool;
this.vectoredIOContext = context.getVectoredIOContext();
}

/**
Expand Down Expand Up @@ -764,6 +768,7 @@ public String toString() {
sb.append(" remainingInCurrentRequest=")
.append(remainingInCurrentRequest());
sb.append(" ").append(changeTracker);
sb.append(" ").append(vectoredIOContext);
sb.append('\n').append(s);
sb.append('}');
return sb.toString();
Expand Down Expand Up @@ -810,6 +815,22 @@ public void readFully(long position, byte[] buffer, int offset, int length)
}
}

/**
* {@inheritDoc}.
*/
@Override
public int minSeekForVectorReads() {
return vectoredIOContext.getMinSeekForVectorReads();
}

/**
* {@inheritDoc}.
*/
@Override
public int maxReadSizeForVectorReads() {
return vectoredIOContext.getMaxReadSizeForVectorReads();
}

/**
* {@inheritDoc}
* Vectored read implementation for S3AInputStream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public class S3AReadOpContext extends S3AOpContext {

private final AuditSpan auditSpan;

/**
* Vectored IO context for vectored read api
* in {@code S3AInputStream#readVectored(List, IntFunction)}.
*/
private final VectoredIOContext vectoredIOContext;

/**
* Instantiate.
* @param path path of read
Expand All @@ -69,17 +75,19 @@ public class S3AReadOpContext extends S3AOpContext {
* @param changeDetectionPolicy change detection policy.
* @param readahead readahead for GET operations/skip, etc.
* @param auditSpan active audit
* @param vectoredIOContext context for vectored read operation.
*/
public S3AReadOpContext(
final Path path,
Invoker invoker,
@Nullable FileSystem.Statistics stats,
S3AStatisticsContext instrumentation,
FileStatus dstFileStatus,
S3AInputPolicy inputPolicy,
ChangeDetectionPolicy changeDetectionPolicy,
final long readahead,
final AuditSpan auditSpan) {
final Path path,
Invoker invoker,
@Nullable FileSystem.Statistics stats,
S3AStatisticsContext instrumentation,
FileStatus dstFileStatus,
S3AInputPolicy inputPolicy,
ChangeDetectionPolicy changeDetectionPolicy,
final long readahead,
final AuditSpan auditSpan,
VectoredIOContext vectoredIOContext) {

super(invoker, stats, instrumentation,
dstFileStatus);
Expand All @@ -90,6 +98,7 @@ public S3AReadOpContext(
this.inputPolicy = checkNotNull(inputPolicy);
this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy);
this.readahead = readahead;
this.vectoredIOContext = checkNotNull(vectoredIOContext);
}

/**
Expand Down Expand Up @@ -136,6 +145,14 @@ public AuditSpan getAuditSpan() {
return auditSpan;
}

/**
* Get Vectored IO context for this this read op.
* @return vectored IO context.
*/
public VectoredIOContext getVectoredIOContext() {
return vectoredIOContext;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.util.List;
import java.util.function.IntFunction;

/**
* Context related to vectored IO operation.
* See {@link S3AInputStream#readVectored(List, IntFunction)}.
*/
public class VectoredIOContext {

/**
* What is the smallest reasonable seek that we should group
* ranges together during vectored read operation.
*/
private int minSeekForVectorReads;

/**
* What is the largest size that we should group ranges
* together during vectored read operation.
* Setting this value 0 will disable merging of ranges.
*/
private int maxReadSizeForVectorReads;

/**
* Default no arg constructor.
*/
public VectoredIOContext() {
}

public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
this.minSeekForVectorReads = minSeek;
return this;
}

public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
this.maxReadSizeForVectorReads = maxSize;
return this;
}

public VectoredIOContext build() {
return this;
}

public int getMinSeekForVectorReads() {
return minSeekForVectorReads;
}

public int getMaxReadSizeForVectorReads() {
return maxReadSizeForVectorReads;
}

@Override
public String toString() {
return "VectoredIOContext{" +
"minSeekForVectorReads=" + minSeekForVectorReads +
", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
'}';
}
}
Loading