Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@
*/
@AllArgsConstructor
public enum ReadMode {
SYNC(true, false),
ASYNC(true, false),
SMALL_OBJECT_PREFETCH(true, false),
SEQUENTIAL_FILE_PREFETCH(true, false),
DICTIONARY_PREFETCH(false, false),
COLUMN_PREFETCH(false, true),
REMAINING_COLUMN_PREFETCH(false, false),
PREFETCH_TAIL(false, false),
READ_VECTORED(false, true);
SYNC(true, false, false),
ASYNC(true, false, true),
SMALL_OBJECT_PREFETCH(true, false, true),
SEQUENTIAL_FILE_PREFETCH(true, false, true),
DICTIONARY_PREFETCH(false, false, true),
COLUMN_PREFETCH(false, true, true),
REMAINING_COLUMN_PREFETCH(false, false, true),
PREFETCH_TAIL(false, false, true),
READ_VECTORED(false, true, false);

private final boolean allowRequestExtension;
private final boolean coalesceRequests;
private final boolean isPrefetch;

/**
* Should requests be extended for this read mode?
Expand All @@ -60,4 +61,13 @@ public boolean allowRequestExtension() {
public boolean coalesceRequests() {
return coalesceRequests;
}

/**
* Is the read mode a prefetch?
*
* @return true if read originates from a prefetch operation.
*/
public boolean isPrefetch() {
return isPrefetch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,24 @@ public void onGetRequest() {
public void onHeadRequest() {
LOG.trace("HEAD request made");
}

@Override
public void onBlockPrefetch(long start, long end) {
LOG.trace("Block prefetch made");
}

@Override
public void footerParsingFailed() {
LOG.trace("Footer parsing failed");
}

@Override
public void onReadVectored(int numIncomingRanges, int numCombinedRanges) {
LOG.trace("Read vectored made");
}

@Override
public void onCacheHit() {
LOG.trace("Data was present in cache");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,32 @@
* GET and HEAD requests.
*/
public interface RequestCallback {

/** Called when a GET request is made. */
void onGetRequest();

/** Called when a HEAD request is made. */
void onHeadRequest();

/**
* Called when a block prefetch is made.
*
* @param start start of prefetch block
* @param end end of prefetch block
*/
void onBlockPrefetch(long start, long end);

/** Called when footer parsing fails. */
void footerParsingFailed();

/**
* Called when a read vectored is made.
*
* @param numIncomingRanges num of ranges in the read request
* @param numCombinedRanges num of ranges after range coalescing
*/
void onReadVectored(int numIncomingRanges, int numCombinedRanges);

/** Called when the request read range is present in the block store. */
void onCacheHit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
*/
package software.amazon.s3.analyticsaccelerator.request;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.*;

import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -50,6 +49,23 @@ void testSize() {
assertEquals(100, new Range(0, 99).getLength());
}

@Test
void testContains() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really get the point of this test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had to add it to ensure test coverage passes

Range range = new Range(0, 100);
assertTrue(range.contains(50));
assertFalse(range.contains(120));
}

@Test
void testCompareTo() {
Range range1 = new Range(0, 100);
Range range2 = new Range(100, 200);
Range range3 = new Range(200, 300);
assertTrue(range1.compareTo(range2) < 0);
assertTrue(range2.compareTo(range3) < 0);
assertTrue(range3.compareTo(range1) > 0);
}

static Stream<Arguments> validStringRanges() {
return Stream.of(
Arguments.of(1, 5, "1-5"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ LogicalIO createLogicalIO(S3URI s3URI, OpenStreamInformation openStreamInformati
createPhysicalIO(s3URI, openStreamInformation),
telemetry,
configuration.getLogicalIOConfiguration(),
parquetColumnPrefetchStore);
parquetColumnPrefetchStore,
openStreamInformation.getRequestCallback());

case SEQUENTIAL:
return new SequentialLogicalIOImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import software.amazon.s3.analyticsaccelerator.common.telemetry.Telemetry;
import software.amazon.s3.analyticsaccelerator.io.logical.LogicalIOConfiguration;
import software.amazon.s3.analyticsaccelerator.io.physical.PhysicalIO;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

/**
Expand All @@ -38,19 +39,26 @@ public class ParquetLogicalIOImpl extends DefaultLogicalIOImpl {
* @param telemetry an instance of {@link Telemetry} to use
* @param logicalIOConfiguration configuration for this logical IO implementation
* @param parquetColumnPrefetchStore object where Parquet usage information is aggregated
* @param requestCallback callback for tracking IoStats to upstream integrations such as S3A
*/
public ParquetLogicalIOImpl(
@NonNull S3URI s3Uri,
@NonNull PhysicalIO physicalIO,
@NonNull Telemetry telemetry,
@NonNull LogicalIOConfiguration logicalIOConfiguration,
@NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore) {
@NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore,
@NonNull RequestCallback requestCallback) {
super(s3Uri, physicalIO, telemetry);

// Initialise prefetcher and start prefetching
this.parquetPrefetcher =
new ParquetPrefetcher(
s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore);
s3Uri,
physicalIO,
telemetry,
logicalIOConfiguration,
parquetColumnPrefetchStore,
requestCallback);
this.parquetPrefetcher.prefetchFooterAndBuildMetadata();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanExecution;
import software.amazon.s3.analyticsaccelerator.io.physical.plan.IOPlanState;
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;

Expand Down Expand Up @@ -71,19 +72,21 @@ public class ParquetPrefetcher {
* @param telemetry an instance of {@link Telemetry} to use
* @param logicalIOConfiguration the LogicalIO's configuration
* @param parquetColumnPrefetchStore a common place for Parquet usage information
* @param requestCallback callback for tracking IoStats to upstream integrations such as S3A
*/
public ParquetPrefetcher(
S3URI s3Uri,
PhysicalIO physicalIO,
Telemetry telemetry,
LogicalIOConfiguration logicalIOConfiguration,
ParquetColumnPrefetchStore parquetColumnPrefetchStore) {
ParquetColumnPrefetchStore parquetColumnPrefetchStore,
RequestCallback requestCallback) {
this(
s3Uri,
logicalIOConfiguration,
parquetColumnPrefetchStore,
telemetry,
new ParquetMetadataParsingTask(s3Uri, parquetColumnPrefetchStore),
new ParquetMetadataParsingTask(s3Uri, parquetColumnPrefetchStore, requestCallback),
new ParquetPrefetchTailTask(s3Uri, telemetry, logicalIOConfiguration, physicalIO),
new ParquetReadTailTask(s3Uri, telemetry, logicalIOConfiguration, physicalIO),
new ParquetPrefetchRemainingColumnTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.analyticsaccelerator.io.logical.impl.ParquetColumnPrefetchStore;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

/**
Expand All @@ -37,6 +38,7 @@ public class ParquetMetadataParsingTask {
private final S3URI s3URI;
private final ParquetParser parquetParser;
private final ParquetColumnPrefetchStore parquetColumnPrefetchStore;
private final RequestCallback requestCallback;

private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataParsingTask.class);

Expand All @@ -45,10 +47,13 @@ public class ParquetMetadataParsingTask {
*
* @param s3URI the S3Uri of the object
* @param parquetColumnPrefetchStore object containing Parquet usage information
* @param requestCallback callback for tracking IoStats to upstream integrations such as S3A
*/
public ParquetMetadataParsingTask(
S3URI s3URI, ParquetColumnPrefetchStore parquetColumnPrefetchStore) {
this(s3URI, parquetColumnPrefetchStore, new ParquetParser());
S3URI s3URI,
ParquetColumnPrefetchStore parquetColumnPrefetchStore,
RequestCallback requestCallback) {
this(s3URI, parquetColumnPrefetchStore, new ParquetParser(), requestCallback);
}

/**
Expand All @@ -58,14 +63,17 @@ public ParquetMetadataParsingTask(
* @param s3URI the S3Uri of the object
* @param parquetColumnPrefetchStore object containing Parquet usage information
* @param parquetParser parser for getting the file metadata
* @param requestCallback callback for tracking IoStats to upstream integrations such as S3A
*/
ParquetMetadataParsingTask(
@NonNull S3URI s3URI,
@NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore,
@NonNull ParquetParser parquetParser) {
@NonNull ParquetParser parquetParser,
@NonNull RequestCallback requestCallback) {
this.s3URI = s3URI;
this.parquetParser = parquetParser;
this.parquetColumnPrefetchStore = parquetColumnPrefetchStore;
this.requestCallback = requestCallback;
}

/**
Expand All @@ -83,6 +91,7 @@ public ColumnMappers storeColumnMappers(FileTail fileTail) {
parquetColumnPrefetchStore.putColumnMappers(this.s3URI, columnMappers);
return columnMappers;
} catch (Exception e) {
requestCallback.footerParsingFailed();
LOG.debug(
"Unable to parse parquet footer for {}, parquet prefetch optimisations will be disabled for this key.",
this.s3URI.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public int read(long pos) throws IOException {
}

/**
* Reads data into the provided buffer
* Reads data into the provided buffer.
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
Expand All @@ -120,6 +120,21 @@ public int read(long pos) throws IOException {
* @throws IOException if an I/O error occurs
*/
public int read(byte[] buf, int off, int len, long pos) throws IOException {
return read(buf, off, len, pos, ReadMode.SYNC);
}

/**
* Reads data into the provided buffer and accepts a readMode.
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
* @param len length of data to be read
* @param pos the position to begin reading from
* @param readMode mode to define the read type
* @return the total number of bytes read into the buffer
* @throws IOException if an I/O error occurs
*/
public int read(byte[] buf, int off, int len, long pos, ReadMode readMode) throws IOException {
Preconditions.checkArgument(0 <= pos, "`pos` must not be negative");
Preconditions.checkArgument(pos < contentLength(), "`pos` must be less than content length");
Preconditions.checkArgument(0 <= off, "`off` must not be negative");
Expand All @@ -128,7 +143,7 @@ public int read(byte[] buf, int off, int len, long pos) throws IOException {

try {
lock.readLock().lock();
blockManager.makeRangeAvailable(pos, len, ReadMode.SYNC);
blockManager.makeRangeAvailable(pos, len, readMode);

long nextPosition = pos;
int numBytesRead = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import software.amazon.s3.analyticsaccelerator.util.BlockKey;
import software.amazon.s3.analyticsaccelerator.util.ObjectKey;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
import software.amazon.s3.analyticsaccelerator.util.StreamAttributes;

/** Implements a Block Manager responsible for planning and scheduling reads on a key. */
Expand All @@ -59,6 +60,7 @@ public class BlockManager implements Closeable {
private final SequentialReadProgression sequentialReadProgression;
private final RangeOptimiser rangeOptimiser;
private final OpenStreamInformation openStreamInformation;
private final RequestCallback requestCallback;
private final int maxGeneration;

private static final String OPERATION_MAKE_RANGE_AVAILABLE = "block.manager.make.range.available";
Expand Down Expand Up @@ -95,6 +97,7 @@ public BlockManager(
this.indexCache = indexCache;
this.blockStore = new BlockStore(indexCache, aggregatingMetrics, configuration);
this.openStreamInformation = openStreamInformation;
this.requestCallback = openStreamInformation.getRequestCallback();
this.streamReader =
new StreamReader(
objectClient,
Expand Down Expand Up @@ -153,7 +156,16 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod
long endPos = pos + len - 1;

// Range is available, return
if (isRangeAvailable(pos, endPos)) return;
if (isRangeAvailable(pos, endPos)) {
if (readMode == ReadMode.SYNC) {
openStreamInformation.getRequestCallback().onCacheHit();
}
return;
}

if (readMode.isPrefetch()) {
requestCallback.onBlockPrefetch(pos, endPos);
}

long generation = getGeneration(pos, readMode);

Expand All @@ -171,6 +183,9 @@ public synchronized void makeRangeAvailable(long pos, long len, ReadMode readMod
if (generation > 0) {
maxReadLength =
Math.max(maxReadLength, sequentialReadProgression.getSizeForGeneration(generation));

// Record any range extension due to sequential prefetching
requestCallback.onBlockPrefetch(endPos + 1, truncatePos(pos + maxReadLength - 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u explain this a bit more for me?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are the number of extra bytes AAL requested that the user did not ask for.

the user request was for till endPos. And then AAL extended the request due to sequential prefetching, so we report these as prefetched bytes

}
// Truncate end position to the object length
long effectiveEnd = truncatePos(pos + maxReadLength - 1);
Expand Down
Loading
Loading