Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public interface StreamCapabilities {
* Support for vectored IO api.
* See {@code PositionedReadable#readVectored(List, IntFunction)}.
*/
String VECTOREDIO = "readvectored";
String VECTOREDIO = "in:readvectored";

/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,24 @@ private Constants() {
*/
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M

/**
* Maximum number of range reads a single input stream can have
* active (downloading, or queued) to the central FileSystem
* instance's pool of queued operations.
* This stops a single stream overloading the shared thread pool.
* {@value}
* <p>
* Default is {@link #DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS}
*/
public static final String AWS_S3_VECTOR_ACTIVE_RANGE_READS =
"fs.s3a.vectored.active.ranged.reads";

/**
* Limit of queued range data download operations during vectored
* read. Value: {@value}
*/
public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;

/**
* Prefix of auth classes in AWS SDK V1.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Vectored IO context. */
private VectoredIOContext vectoredIOContext;

/**
* Maximum number of active range read operation a single
* input stream can have.
*/
private int vectoredActiveRangeReads;

private long readAhead;
private ChangeDetectionPolicy changeDetectionPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false);
Expand Down Expand Up @@ -628,6 +634,8 @@ public void initialize(URI name, Configuration originalConf)
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
inputPolicy);
vectoredActiveRangeReads = intOption(conf,
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
vectoredIOContext = populateVectoredIOContext(conf);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
Expand Down Expand Up @@ -1561,7 +1569,11 @@ private FSDataInputStream executeOpen(
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats,
unboundedThreadPool));
new SemaphoredDelegatingExecutor(
boundedThreadPool,
vectoredActiveRangeReads,
true,
inputStreamStats)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -139,7 +139,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
/**
* Thread pool used for vectored IO operation.
*/
private final ThreadPoolExecutor unboundedThreadPool;
private final ExecutorService boundedThreadPool;
private final String bucket;
private final String key;
private final String pathStr;
Expand Down Expand Up @@ -196,13 +196,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* @param s3Attributes object attributes
* @param client S3 client to use
* @param streamStatistics stream io stats.
* @param unboundedThreadPool thread pool to use.
* @param boundedThreadPool thread pool to use.
*/
public S3AInputStream(S3AReadOpContext ctx,
S3ObjectAttributes s3Attributes,
InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics,
ThreadPoolExecutor unboundedThreadPool) {
ExecutorService boundedThreadPool) {
Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
"No Bucket");
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
Expand All @@ -224,7 +224,7 @@ public S3AInputStream(S3AReadOpContext ctx,
setInputPolicy(ctx.getInputPolicy());
setReadahead(ctx.getReadahead());
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
this.unboundedThreadPool = unboundedThreadPool;
this.boundedThreadPool = boundedThreadPool;
this.vectoredIOContext = context.getVectoredIOContext();
this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
}
Expand Down Expand Up @@ -882,7 +882,7 @@ public void readVectored(List<? extends FileRange> ranges,
streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
boundedThreadPool.submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
Expand All @@ -893,7 +893,7 @@ public void readVectored(List<? extends FileRange> ranges,
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
ranges.size(), combinedFileRanges.size());
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
unboundedThreadPool.submit(
boundedThreadPool.submit(
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,22 @@ on the client requirements.
</description>
</property>
<property>
<name>fs.s3a.vectored.read.max.merged.size</name>
<value>1M</value>
<description>
What is the largest merged read size in bytes such
that we group ranges together during vectored read.
Setting this value to 0 will disable merging of ranges.
</description>
<name>fs.s3a.vectored.read.max.merged.size</name>
<value>1M</value>
<description>
What is the largest merged read size in bytes such
that we group ranges together during vectored read.
Setting this value to 0 will disable merging of ranges.
</description>
<property>
<name>fs.s3a.vectored.active.ranged.reads</name>
<value>4</value>
<description>
Maximum number of range reads a single input stream can have
active (downloading, or queued) to the central FileSystem
instance's pool of queued operations.
This stops a single stream overloading the shared thread pool.
</description>
</property>
```

Expand Down