Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,12 @@ public final class StreamStatisticNames {
public static final String STREAM_READ_PREFETCH_OPERATIONS
= "stream_read_prefetch_operations";

/**
* Total number of failed prefetching operations.
*/
public static final String STREAM_READ_FAILED_PREFETCH_OPERATIONS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed. every duration type you build for a store automatically gets .failures stats entries (count, min, mean, max), use StoreStatisticNames.SUFFIX_FAILURES if you want to see usages

= "stream_read_failed_prefetch_operations";

/**
* Total number of block in disk cache.
*/
Expand All @@ -413,19 +419,19 @@ public final class StreamStatisticNames {

/**
* count/duration of reading a remote block.
* IO.
*
* Value: {@value}.
*/
public static final String STREAM_READ_REMOTE_BLOCK_READ
= "stream_read_block_read";

/**
* count/duration of acquiring a buffer and reading to it
* IO.
* count/duration of acquiring a buffer and reading to it.
*
* Value: {@value}.
*/
public static final String STREAM_READ_BLOCK_ACQUIRE_AND_READ
= "stream_read_block_read";
= "stream_read_block_acquire_read";

private StreamStatisticNames() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,15 @@ public synchronized void close() {
}
}

int currentPoolSize = this.pool.numCreated();
int currentPoolSize = pool.numCreated();

this.pool.close();
this.pool = null;
pool.close();
pool = null;

this.allocated.clear();
this.allocated = null;
allocated.clear();
allocated = null;

this.prefetchingStatistics.memoryFreed(currentPoolSize * bufferSize);
prefetchingStatistics.memoryFreed(currentPoolSize * bufferSize);
}

// For debugging purposes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private void read(BufferData data) throws IOException {

private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOException {
synchronized (data) {
this.prefetchingStatistics.executorAcquired(
prefetchingStatistics.executorAcquired(
Duration.between(taskQueuedStartTime, Instant.now()));
this.readBlock(
data,
Expand Down Expand Up @@ -328,7 +328,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
}

if (isPrefetch) {
this.prefetchingStatistics.prefetchOperationStarted();
prefetchingStatistics.prefetchOperationStarted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have this return a DurationTracker whose close() will update the statistic. on an an exception, call its failed() method to update the .failure keys intead

op = this.ops.prefetch(data.getBlockNumber());
} else {
op = this.ops.getRead(data.getBlockNumber());
Expand All @@ -341,14 +341,16 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
this.read(buffer, offset, size);
buffer.flip();
data.setReady(expectedState);

if(isPrefetch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, spacing

this.prefetchingStatistics.prefetchOperationCompleted();
prefetchingStatistics.prefetchOperationCompleted(true);
}
} catch (Exception e) {
String message = String.format("error during readBlock(%s)", data.getBlockNumber());
LOG.error(message, e);
this.numReadErrors.incrementAndGet();
if(isPrefetch) {
prefetchingStatistics.prefetchOperationCompleted(false);
}
data.setDone();
throw e;
} finally {
Expand Down Expand Up @@ -446,7 +448,7 @@ public void requestCaching(BufferData data) {

private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
Instant taskQueuedStartTime) {
this.prefetchingStatistics.executorAcquired(
prefetchingStatistics.executorAcquired(
Duration.between(taskQueuedStartTime, Instant.now()));

if (this.closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface PrefetchingStatistics extends IOStatisticsSource {

void blockRemovedFromFileCache();

void prefetchOperationCompleted();
void prefetchOperationCompleted(boolean prefetchSucceeded);

void executorAcquired(Duration timeInQueue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hadoop.fs.s3a.Constants.STREAM_READ_GAUGE_INPUT_POLICY;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
Expand Down Expand Up @@ -807,6 +804,7 @@ private final class InputStreamStatistics
private final AtomicLong readFullyOperations;
private final AtomicLong seekOperations;
private final AtomicLong prefetchReadOperations;
private final AtomicLong failedPrefetchReadOperations;

/** Bytes read by the application and any when draining streams . */
private final AtomicLong totalBytesRead;
Expand Down Expand Up @@ -841,7 +839,8 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
StreamStatisticNames.STREAM_READ_UNBUFFERED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
STREAM_READ_PREFETCH_OPERATIONS)
StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
StreamStatisticNames.STREAM_READ_FAILED_PREFETCH_OPERATIONS)
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
Expand All @@ -852,8 +851,8 @@ private InputStreamStatistics(
StoreStatisticNames.ACTION_FILE_OPENED,
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
STREAM_READ_REMOTE_BLOCK_READ,
STREAM_READ_BLOCK_ACQUIRE_AND_READ)
StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ,
StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ)
.build();
setIOStatistics(st);
aborted = st.getCounterReference(
Expand Down Expand Up @@ -891,7 +890,9 @@ private InputStreamStatistics(
totalBytesRead = st.getCounterReference(
StreamStatisticNames.STREAM_READ_TOTAL_BYTES);
prefetchReadOperations =
st.getCounterReference(STREAM_READ_PREFETCH_OPERATIONS);
st.getCounterReference(StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS);
failedPrefetchReadOperations =
st.getCounterReference(StreamStatisticNames.STREAM_READ_FAILED_PREFETCH_OPERATIONS);
setIOStatistics(st);
// create initial snapshot of merged statistics
mergedStats = snapshotIOStatistics(st);
Expand Down Expand Up @@ -1331,8 +1332,12 @@ public void blockRemovedFromFileCache() {
}

@Override
public void prefetchOperationCompleted() {
public void prefetchOperationCompleted(boolean prefetchSucceeded) {
incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);

if(!prefetchSucceeded) {
failedPrefetchReadOperations.incrementAndGet();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void prefetchOperationStarted() {
}

@Override
public void prefetchOperationCompleted() {
public void prefetchOperationCompleted(boolean prefetchSucceeded) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void blockRemovedFromFileCache() {
}

@Override
public void prefetchOperationCompleted() {
public void prefetchOperationCompleted(boolean prefetchSucceeded) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
Expand Down Expand Up @@ -104,31 +105,41 @@ private static int calculateNumBlocks(long largeFileSize, int blockSize) {
@Test
public void testReadLargeFileFully() throws Throwable {
describe("read a large file fully, uses S3CachingInputStream");
IOStatistics ioStats;
openFS();

try (FSDataInputStream in = largeFileFS.open(largeFile)) {
IOStatistics ioStats = in.getIOStatistics();
ioStats = in.getIOStatistics();

byte[] buffer = new byte[S_1M * 10];
long bytesRead = 0;

while (bytesRead < largeFileSize) {
in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead));
bytesRead += buffer.length;
// Blocks are fully read, no blocks should be cached
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}

// Assert that first block is read synchronously, following blocks are prefetched
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
numBlocks - 1);
verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
}
// Verify that once stream is closed, all memory is freed
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
}

@Test
public void testRandomReadLargeFile() throws Throwable {
describe("random read on a large file, uses S3CachingInputStream");
IOStatistics ioStats;
openFS();

try (FSDataInputStream in = largeFileFS.open(largeFile)) {
IOStatistics ioStats = in.getIOStatistics();
ioStats = in.getIOStatistics();

byte[] buffer = new byte[blockSize];

Expand All @@ -141,7 +152,13 @@ public void testRandomReadLargeFile() throws Throwable {

verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 1);
// block 0 is cached when we seek to block 1, block 1 is cached as it is being prefetched
// when we seek out of block 0, see cancelPrefetches()
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 2);
}
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
}

@Test
Expand All @@ -163,6 +180,9 @@ public void testRandomReadSmallFile() throws Throwable {

verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
// The buffer pool is not used
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
}
}

Expand Down