From 940c9b629508e3e298c727f2e4da9625d11bf65a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 7 Nov 2022 14:20:39 +0000 Subject: [PATCH] HADOOP-18521. ABFS input stream prefetching This is the roll-up of my work on prefetching rebased onto trunk and all merge conflicts addressed HADOOP-18521. ABFS ReadBufferManager does not reuse in-progress buffers Addresses the issue by not trying to cancel in-progress reads when a stream is closed()...they are allowed to continue and then their data discarded. To enable discarding, AbfsInputStreams export their `closed` state in which is now AtomicBool internally so reader threads can probe it. The shared buffers now have owner tracking, which will reject * attempts to acquire an owned buffer * attempts to return a buffer not owned Plus * Lots of other invariants added to validate the state * useful to string values HADOOP-18521. ABFS ReadBufferManager does not reuse in-progress buffers Adds path stream capability probe for the bug abfs, which you can demand in an openFile() call. That will block your code ever working on a version without the race condition HADOOP-18521. prune map of buffer to reader as the array does it HADOOP-18521. stats collection and use in itest HADOOP-18521. isolating read buffer invocations on stream for testing This should now be set up for unit tests to simulate the failure conditions HADOOP-18521. cut a check for freeing buffer while still in progress HADOOP-18521. abfs ReadBufferManager and closed streams * working on the tests * during step-through debugging identified where the abfs input stream needs to be hardened against unbuffer/close invoked HADOOP-18521. improve completed read eviction in close -always call ReadBuffer.evict(), which adds stream stats on whether a block was used before it was evicted. This helps assess the value of prefetching HADOOP-18521. testing more of the failure conditions HADOOP-18521. Unit tests of ReadBufferManager logic. Now its possible to have tests which yetus can run on the details of fetching and error handling HADOOP-18521. ReadBufferManager evict() test/tweak closed read buffers with data are always found, even if somehow they didn't get purged (is this possible? the synchronized blocks say otherwise). closed read buffers without data (failure buffers) are silently discarded HADOOP-18521. review comments about newlines HADOOP-18521. checkstyles...mostly + symbols on generated toString() --- .../fs/statistics/StreamStatisticNames.java | 30 + .../fs/azurebfs/AzureBlobFileSystem.java | 5 +- .../azurebfs/constants/ConfigurationKeys.java | 29 + .../fs/azurebfs/services/AbfsInputStream.java | 130 +++- .../services/AbfsInputStreamStatistics.java | 4 +- .../AbfsInputStreamStatisticsImpl.java | 25 +- .../fs/azurebfs/services/ReadBuffer.java | 149 ++++- .../azurebfs/services/ReadBufferManager.java | 452 +++++++++++--- .../services/ReadBufferStreamOperations.java | 70 +++ .../azurebfs/services/ReadBufferWorker.java | 118 +++- .../azurebfs/ITestAbfsReadWriteAndSeek.java | 35 +- .../services/ITestAbfsInputStream.java | 131 ++++ .../services/ITestReadBufferManager.java | 143 +++-- .../services/TestAbfsInputStream.java | 110 +++- .../services/TestReadBufferManager.java | 576 ++++++++++++++++++ .../src/test/resources/log4j.properties | 2 + 16 files changed, 1813 insertions(+), 196 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManager.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 50bbf45505cec..387706f5a6fca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -421,6 +421,36 @@ public final class StreamStatisticNames { public static final String STREAM_READ_PREFETCH_OPERATIONS = "stream_read_prefetch_operations"; + /** + * Count of prefetch blocks used. + */ + public static final String STREAM_READ_PREFETCH_BLOCKS_USED + = "stream_read_prefetch_blocks_used"; + /** + * Count of prefetch bytes used. + */ + public static final String STREAM_READ_PREFETCH_BYTES_USED + = "stream_read_prefetch_bytes_used"; + + /** + * Count of prefetch blocks discarded unused: {@value}. + */ + public static final String STREAM_READ_PREFETCH_BLOCKS_DISCARDED + = "stream_read_prefetch_blocks_discarded"; + + /** + * Count of prefetch bytes discarded from unused blocks: {@value}. + * May or may not include bytes from blocks which were partially accessed. + */ + public static final String STREAM_READ_PREFETCH_BYTES_DISCARDED + = "stream_read_prefetch_bytes_discarded"; + + /** + * Count of prefetch blocks evicted, used or unused: {@value}. + */ + public static final String STREAM_READ_PREFETCH_BLOCKS_EVICTED + = "stream_read_prefetch_blocks_evicted"; + /** * Total number of block in disk cache. */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 21501d28f4238..da6251714a2fa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -31,7 +31,6 @@ import java.util.Hashtable; import java.util.List; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.Map; import java.util.Optional; @@ -46,6 +45,7 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -291,7 +291,7 @@ protected CompletableFuture openFileWithOptions( LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path); AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), + ConfigurationKeys.KNOWN_OPENFILE_KEYS, "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> @@ -1628,6 +1628,7 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.FS_PERMISSIONS: case CommonPathCapabilities.FS_APPEND: case CommonPathCapabilities.ETAGS_AVAILABLE: + case ConfigurationKeys.FS_AZURE_CAPABILITY_PREFETCH_SAFE: // safe from buffer sharing: return true; case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME: diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index a59f76b6d0fe0..89c164b42d004 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -18,10 +18,17 @@ package org.apache.hadoop.fs.azurebfs.constants; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; + /** * Responsible to keep all the Azure Blob File System configurations keys in Hadoop configuration file. */ @@ -259,5 +266,27 @@ public static String accountProperty(String property, String account) { * @see FileSystem#openFile(org.apache.hadoop.fs.Path) */ public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; + + /** + * Has the ReadBufferManager fix of HADOOP-18521 been applied? + * This can be queried on {@code hasCapability()} and + * on the filesystem {@code hasPathCapability()} probes. + */ + public static final String FS_AZURE_CAPABILITY_PREFETCH_SAFE = "fs.azure.capability.prefetch.safe"; + + /** + * Known keys for openFile(), including the standard ones. + */ + public static final Set KNOWN_OPENFILE_KEYS; + + static { + Set collect = Stream.of( + FS_AZURE_CAPABILITY_PREFETCH_SAFE, + FS_AZURE_BUFFERED_PREAD_DISABLE) + .collect(Collectors.toSet()); + collect.addAll(FS_OPTION_OPENFILE_STANDARD_OPTIONS); + KNOWN_OPENFILE_KEYS = Collections.unmodifiableSet(collect); + } + private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 8f12484a55c9d..18c94ff0ce4cb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -23,9 +23,14 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -42,7 +47,6 @@ import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.Listener; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; -import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import static java.lang.Math.max; @@ -50,13 +54,17 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore; +import static org.apache.hadoop.util.Preconditions.checkState; import static org.apache.hadoop.util.StringUtils.toLowerCase; /** * The AbfsInputStream for AbfsClient. */ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, - StreamCapabilities, IOStatisticsSource { + StreamCapabilities, IOStatisticsSource, ReadBufferStreamOperations { private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); // Footer size is set to qualify for both ORC and parquet files public static final int FOOTER_SIZE = 16 * ONE_KB; @@ -95,7 +103,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 // of valid bytes in buffer) - private boolean closed = false; + + /** + * Closed flag. + */ + private final AtomicBoolean closed = new AtomicBoolean(false); private TracingContext tracingContext; // Optimisations modify the pointer fields. @@ -113,7 +125,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private Listener listener; private final AbfsInputStreamContext context; - private IOStatistics ioStatistics; + private IOStatisticsStore ioStatistics; /** * This is the actual position within the object, used by * lazy seek to decide whether to seek on the next read or not. @@ -155,9 +167,11 @@ public AbfsInputStream( // Propagate the config values to ReadBufferManager so that the first instance // to initialize can set the readAheadBlockSize ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize); - if (streamStatistics != null) { - ioStatistics = streamStatistics.getIOStatistics(); - } + // if no statistics was passed (happens in some tests) a stub iostatistics + // store is used instead. + ioStatistics = streamStatistics != null + ? streamStatistics.getIOStatistics() + : emptyStatisticsStore(); } public String getPath() { @@ -176,11 +190,7 @@ public int read(long position, byte[] buffer, int offset, int length) // kind of random reads on a shared file input stream will greatly get // benefited by such implementation. // Strict close check at the begin of the API only not for the entire flow. - synchronized (this) { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - } + checkNotClosed(); LOG.debug("pread requested offset = {} len = {} bufferedPreadDisabled = {}", offset, length, bufferedPreadDisabled); if (!bufferedPreadDisabled) { @@ -216,6 +226,7 @@ public int read() throws IOException { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + checkNotClosed(); // check if buffer is null before logging the length if (b != null) { LOG.debug("read requested b.length = {} offset = {} len = {}", b.length, @@ -428,7 +439,7 @@ private void restorePointerState() { private boolean validate(final byte[] b, final int off, final int len) throws IOException { - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } @@ -449,6 +460,12 @@ private boolean validate(final byte[] b, final int off, final int len) private int copyToUserBuffer(byte[] b, int off, int len){ //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) //(bytes returned may be less than requested) + + // unbuffer() or close() may free the buffer during an HTTP call. + // this triggers an NPE in arraycopy(); the check here simply to + // fail more meaningfully + checkState(buffer != null, + "read buffer is null in stream {}", this); int bytesRemaining = limit - bCursor; int bytesToRead = min(len, bytesRemaining); System.arraycopy(buffer, bCursor, b, off, bytesToRead); @@ -467,6 +484,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){ private int readInternal(final long position, final byte[] b, final int offset, final int length, final boolean bypassReadAhead) throws IOException { + checkNotClosed(); if (readAheadEnabled && !bypassReadAhead) { // try reading from read-ahead if (offset != 0) { @@ -515,7 +533,13 @@ private int readInternal(final long position, final byte[] b, final int offset, } } - int readRemote(long position, byte[] b, int offset, int length, TracingContext tracingContext) throws IOException { + @Override + public int readRemote(long position, + byte[] b, + int offset, + int length, + TracingContext tracingContext) throws IOException { + checkNotClosed(); if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } @@ -555,7 +579,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t throw new FileNotFoundException(ere.getMessage()); } } - throw new IOException(ex); + throw ex; } long bytesRead = op.getResult().getBytesReceived(); if (streamStatistics != null) { @@ -566,6 +590,13 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t } LOG.debug("HTTP request read bytes = {}", bytesRead); bytesFromRemoteRead += bytesRead; + + // now, the stream may have been closed during the read. + // this can be traumatic for direct read() as the buffer will + // now be null. + // For readahead it is less than ideal. + checkNotClosed(); + return (int) bytesRead; } @@ -587,7 +618,7 @@ private void incrementReadOps() { @Override public synchronized void seek(long n) throws IOException { LOG.debug("requested seek to position {}", n); - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } if (n < 0) { @@ -608,7 +639,7 @@ public synchronized void seek(long n) throws IOException { @Override public synchronized long skip(long n) throws IOException { - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } long currentPos = getPos(); @@ -641,7 +672,7 @@ public synchronized long skip(long n) throws IOException { */ @Override public synchronized int available() throws IOException { - if (closed) { + if (isClosed()) { throw new IOException( FSExceptionMessages.STREAM_IS_CLOSED); } @@ -659,7 +690,7 @@ public synchronized int available() throws IOException { * @throws IOException if the stream is closed */ public long length() throws IOException { - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } return contentLength; @@ -671,7 +702,7 @@ public long length() throws IOException { */ @Override public synchronized long getPos() throws IOException { - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } return nextReadPos < 0 ? 0 : nextReadPos; @@ -692,11 +723,19 @@ public boolean seekToNewSource(long l) throws IOException { } @Override - public synchronized void close() throws IOException { + public void close() throws IOException { LOG.debug("Closing {}", this); - closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner - ReadBufferManager.getBufferManager().purgeBuffersForStream(this); + if (closed.compareAndSet(false, true)) { + buffer = null; // de-reference the buffer so it can be GC'ed sooner + // Tell the ReadBufferManager to clean up. + ReadBufferManager.getBufferManager().purgeBuffersForStream(this); + } + } + + @Override + @InterfaceAudience.Private + public boolean isClosed() { + return closed.get(); } /** @@ -738,7 +777,15 @@ public synchronized void unbuffer() { @Override public boolean hasCapability(String capability) { - return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability)); + switch (toLowerCase(capability)) { + case StreamCapabilities.UNBUFFER: + case ConfigurationKeys.FS_AZURE_CAPABILITY_PREFETCH_SAFE: // safe from buffer sharing + return true; + case ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE: + return bufferedPreadDisabled; + default: + return false; + } } byte[] getBuffer() { @@ -760,6 +807,7 @@ protected void setCachedSasToken(final CachedSASToken cachedSasToken) { this.cachedSasToken = cachedSasToken; } + @Override @VisibleForTesting public String getStreamID() { return inputStreamId; @@ -817,7 +865,7 @@ public boolean shouldAlwaysReadBufferSize() { } @Override - public IOStatistics getIOStatistics() { + public IOStatisticsStore getIOStatistics() { return ioStatistics; } @@ -827,13 +875,19 @@ public IOStatistics getIOStatistics() { */ @Override public String toString() { - final StringBuilder sb = new StringBuilder(super.toString()); - if (streamStatistics != null) { - sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); - sb.append(streamStatistics.toString()); - sb.append("}"); - } - return sb.toString(); + return "AbfsInputStream{(" + this.hashCode() + ") " + + "path='" + path + '\'' + + ", id=" + inputStreamId + + ", closed=" + closed.get() + + ", contentLength=" + contentLength + + ", nextReadPos=" + nextReadPos + + ", readAheadQueueDepth=" + readAheadQueueDepth + + ", readAheadEnabled=" + readAheadEnabled + + ", bufferedPreadDisabled=" + bufferedPreadDisabled + + ", firstRead=" + firstRead + + ", fCursor=" + fCursor + + ", " + ioStatisticsToPrettyString(retrieveIOStatistics(streamStatistics)) + + "}"; } @VisibleForTesting @@ -855,4 +909,14 @@ long getFCursorAfterLastRead() { long getLimit() { return this.limit; } + + /** + * Verify that the input stream is open. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (isClosed()) { + throw new PathIOException(getPath(), FSExceptionMessages.STREAM_IS_CLOSED); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java index 00663467fe233..024b0dbf748fd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java @@ -19,8 +19,8 @@ package org.apache.hadoop.fs.azurebfs.services; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; /** * Interface for statistics for the AbfsInputStream. @@ -99,7 +99,7 @@ public interface AbfsInputStreamStatistics extends IOStatisticsSource { * Get the IOStatisticsStore instance from AbfsInputStreamStatistics. * @return instance of IOStatisticsStore which extends IOStatistics. */ - IOStatistics getIOStatistics(); + IOStatisticsStore getIOStatistics(); /** * Makes the string of all the AbfsInputStream statistics. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java index f03ea1913e259..243fc163a46b4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java @@ -20,13 +20,20 @@ import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_EVICTED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; /** @@ -48,9 +55,15 @@ public class AbfsInputStreamStatisticsImpl StreamStatisticNames.BYTES_READ_BUFFER, StreamStatisticNames.REMOTE_READ_OP, StreamStatisticNames.READ_AHEAD_BYTES_READ, - StreamStatisticNames.REMOTE_BYTES_READ - ) - .withDurationTracking(ACTION_HTTP_GET_REQUEST) + StreamStatisticNames.REMOTE_BYTES_READ, + STREAM_READ_PREFETCH_BLOCKS_USED, + STREAM_READ_PREFETCH_BYTES_USED, + STREAM_READ_PREFETCH_BLOCKS_DISCARDED, + STREAM_READ_PREFETCH_BYTES_DISCARDED, + STREAM_READ_PREFETCH_BLOCKS_EVICTED) + .withGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS) + .withDurationTracking(ACTION_HTTP_GET_REQUEST, + STREAM_READ_PREFETCH_OPERATIONS) .build(); /* Reference to the atomic counter for frequently updated counters to avoid @@ -183,7 +196,7 @@ public void remoteReadOperation() { * @return IOStatisticsStore instance which extends IOStatistics. */ @Override - public IOStatistics getIOStatistics() { + public IOStatisticsStore getIOStatistics() { return ioStatisticsStore; } @@ -270,7 +283,7 @@ public double getActionHttpGetRequest() { public String toString() { final StringBuilder sb = new StringBuilder( "StreamStatistics{"); - sb.append(ioStatisticsStore.toString()); + sb.append(ioStatisticsToPrettyString(ioStatisticsStore)); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index 9ce926d841c84..ab0e7b6ef3443 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -23,12 +23,23 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; -import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_EVICTED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; class ReadBuffer { - private AbfsInputStream stream; + /** + * Stream operations. + */ + private ReadBufferStreamOperations stream; private long offset; // offset within the file for the buffer private int length; // actual length, set after the buffer is filles private int requestedLength; // requested length of the read @@ -47,11 +58,11 @@ class ReadBuffer { private IOException errException = null; - public AbfsInputStream getStream() { + public ReadBufferStreamOperations getStream() { return stream; } - public void setStream(AbfsInputStream stream) { + public void setStream(ReadBufferStreamOperations stream) { this.stream = stream; } @@ -103,6 +114,13 @@ public void setBufferindex(int bufferindex) { this.bufferindex = bufferindex; } + /** + * Does the buffer index refer to a valid buffer. + * @return true if the buffer index is to an entry in the array. + */ + public boolean hasIndexedBuffer() { + return getBufferindex() >= 0; + } public IOException getErrException() { return errException; } @@ -115,11 +133,13 @@ public ReadBufferStatus getStatus() { return status; } + /** + * Update the read status. + * If it was a read failure, the buffer index is set to -1; + * @param status status + */ public void setStatus(ReadBufferStatus status) { this.status = status; - if (status == READ_FAILED) { - bufferindex = -1; - } } public CountDownLatch getLatch() { @@ -162,4 +182,119 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) { this.isAnyByteConsumed = isAnyByteConsumed; } + @Override + public String toString() { + return super.toString() + + "{ status=" + status + + ", offset=" + offset + + ", length=" + length + + ", requestedLength=" + requestedLength + + ", bufferindex=" + bufferindex + + ", timeStamp=" + timeStamp + + ", isFirstByteConsumed=" + isFirstByteConsumed + + ", isLastByteConsumed=" + isLastByteConsumed + + ", isAnyByteConsumed=" + isAnyByteConsumed + + ", errException=" + errException + + ", stream=" + (stream != null ? stream.getStreamID() : "none") + + ", stream closed=" + isStreamClosed() + + ", latch=" + latch + + '}'; + } + + /** + * Is the stream closed. + * @return stream closed status. + */ + public boolean isStreamClosed() { + return stream != null && stream.isClosed(); + } + + /** + * IOStatistics of stream. + * @return the stream's IOStatisticsStore. + */ + public IOStatisticsStore getStreamIOStatistics() { + return stream.getIOStatistics(); + } + + /** + * Start using the buffer. + * Sets the byte consumption flags as appriopriate, then + * updates the stream statistics with the use of this buffer. + * @param offset offset in buffer where copy began + * @param bytesCopied bytes copied. + */ + void dataConsumedByStream(int offset, int bytesCopied) { + boolean isFirstUse = !isAnyByteConsumed; + setAnyByteConsumed(true); + if (offset == 0) { + setFirstByteConsumed(true); + } + if (offset + bytesCopied == getLength()) { + setLastByteConsumed(true); + } + IOStatisticsStore iostats = getStreamIOStatistics(); + if (isFirstUse) { + // first use, update the use + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_USED, 1); + } + // every use, update the count of bytes read + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_USED, bytesCopied); + } + + /** + * The (completed) buffer was evicted; update stream statistics + * as appropriate. + */ + void evicted() { + IOStatisticsStore iostats = getStreamIOStatistics(); + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED, 1); + if (getBufferindex() >= 0 && !isAnyByteConsumed()) { + // nothing was read, so consider it discarded. + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1); + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, getLength()); + } + } + + /** + * The (completed) buffer was discarded; no data was read. + */ + void discarded() { + if (getBufferindex() >= 0) { + IOStatisticsStore iostats = getStreamIOStatistics(); + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1); + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, getLength()); + } + } + + /** + * Release the buffer: update fields as appropriate. + */ + void releaseBuffer() { + setBuffer(null); + setBufferindex(-1); + } + + /** + * Prefetch started -update stream statistics. + */ + void prefetchStarted() { + getStreamIOStatistics().incrementGauge(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1); + } + + /** + * Prefetch started -update stream statistics. + */ + void prefetchFinished() { + getStreamIOStatistics().incrementGauge(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1); + } + + /** + * Get a duration tracker for the prefetch. + * @return a duration tracker. + */ + DurationTracker trackPrefetchOperation() { + return getStreamIOStatistics().trackDuration(STREAM_READ_PREFETCH_OPERATIONS); + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index ac84f0b27cf12..6efd45c34d424 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -30,11 +30,14 @@ import java.util.Queue; import java.util.Stack; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.util.Preconditions.checkState; + /** * The Read Buffer Manager for Rest AbfsClient. */ @@ -43,7 +46,7 @@ final class ReadBufferManager { private static final int ONE_KB = 1024; private static final int ONE_MB = ONE_KB * ONE_KB; - private static final int NUM_BUFFERS = 16; + static final int NUM_BUFFERS = 16; private static final int NUM_THREADS = 8; private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold @@ -51,6 +54,12 @@ final class ReadBufferManager { private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; private Thread[] threads = new Thread[NUM_THREADS]; private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + + /** + * map of {@link #buffers} to {@link ReadBuffer} using it. + */ + private ReadBuffer[] bufferOwners; + private Stack freeList = new Stack<>(); // indices in buffers[] array that are available private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet @@ -59,7 +68,22 @@ final class ReadBufferManager { private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block private static final ReentrantLock LOCK = new ReentrantLock(); - static ReadBufferManager getBufferManager() { + /** + * How many completed blocks were discarded when a stream was closed? + */ + private final AtomicLong completedBlocksDiscarded = new AtomicLong(); + + /** + * How many queued blocks were discarded when a stream was closed? + */ + private final AtomicLong queuedBlocksDiscarded = new AtomicLong(); + + /** + * How many in progress blocks were discarded when a stream was closed? + */ + private final AtomicLong inProgressBlocksDiscarded = new AtomicLong(); + + public static ReadBufferManager getBufferManager() { if (bufferManager == null) { LOCK.lock(); try { @@ -85,6 +109,7 @@ static void setReadBufferManagerConfigs(int readAheadBlockSize) { private void init() { buffers = new byte[NUM_BUFFERS][]; + bufferOwners = new ReadBuffer[NUM_BUFFERS]; for (int i = 0; i < NUM_BUFFERS; i++) { buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC freeList.add(i); @@ -113,12 +138,13 @@ private ReadBufferManager() { /** * {@link AbfsInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link AbfsInputStream} for which to do the read-ahead + * @param stream The {@link AbfsInputStream} for which to do the read-ahead * @param requestedOffset The offset in the file which shoukd be read * @param requestedLength The length to read + * @return the queued read */ - void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + @VisibleForTesting + ReadBuffer queueReadAhead(final ReadBufferStreamOperations stream, final long requestedOffset, final int requestedLength, TracingContext tracingContext) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", @@ -127,10 +153,10 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi ReadBuffer buffer; synchronized (this) { if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again + return null; } if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything + return null; } buffer = new ReadBuffer(); @@ -142,10 +168,8 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi buffer.setLatch(new CountDownLatch(1)); buffer.setTracingContext(tracingContext); - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); + int bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + takeOwnershipOfBufferAtIndex(buffer, bufferIndex); readAheadQueue.add(buffer); notifyAll(); if (LOGGER.isTraceEnabled()) { @@ -153,6 +177,7 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi stream.getPath(), requestedOffset, buffer.getBufferindex()); } } + return buffer; } @@ -225,8 +250,8 @@ private void waitForProcess(final AbfsInputStream stream, final long position) { Thread.currentThread().interrupt(); } if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); + LOGGER.trace("latch done for file {} buffer {}", + stream.getPath(), readBuf); } } } @@ -239,15 +264,22 @@ private void waitForProcess(final AbfsInputStream stream, final long position) { */ private synchronized boolean tryEvict() { ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { + if (completedReadList.isEmpty()) { return false; // there are no evict-able buffers } long currentTimeInMs = currentTimeMillis(); - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + // first, look for easy targets for (ReadBuffer buf : completedReadList) { + if (buf.isStreamClosed() && buf.hasIndexedBuffer()) { + // the stream was closed since this was added to the completed + // list (it would not have a buffer if the stream was closed before) + nodeToEvict = buf; + break; + } if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + // buffers where all bytes have been consumed (approximated as first and last bytes consumed) nodeToEvict = buf; break; } @@ -278,12 +310,13 @@ private synchronized boolean tryEvict() { long earliestBirthday = Long.MAX_VALUE; ArrayList oldFailedBuffers = new ArrayList<>(); for (ReadBuffer buf : completedReadList) { - if ((buf.getBufferindex() != -1) + if (buf.hasIndexedBuffer() && (buf.getTimeStamp() < earliestBirthday)) { nodeToEvict = buf; earliestBirthday = buf.getTimeStamp(); - } else if ((buf.getBufferindex() == -1) - && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) { + } else if (!buf.hasIndexedBuffer() + && ((currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) + || buf.isStreamClosed()) { oldFailedBuffers.add(buf); } } @@ -302,33 +335,33 @@ private synchronized boolean tryEvict() { } private boolean evict(final ReadBuffer buf) { - // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, - // avoid adding it to freeList. - if (buf.getBufferindex() != -1) { - freeList.push(buf.getBufferindex()); - } - - completedReadList.remove(buf); buf.setTracingContext(null); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); + LOGGER.trace("Evicting buffer {}", buf); + } + completedReadList.remove(buf); + // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, + // avoid adding it to freeList. + if (buf.hasIndexedBuffer()) { + placeBufferOnFreeList("eviction", buf); } + // tell the buffer it was evicted so should update its statistics + buf.evicted(); return true; } - private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { + private boolean isAlreadyQueued(final ReadBufferStreamOperations stream, final long requestedOffset) { // returns true if any part of the buffer is already queued return (isInList(readAheadQueue, stream, requestedOffset) || isInList(inProgressList, stream, requestedOffset) || isInList(completedReadList, stream, requestedOffset)); } - private boolean isInList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + private boolean isInList(final Collection list, final ReadBufferStreamOperations stream, final long requestedOffset) { return (getFromList(list, stream, requestedOffset) != null); } - private ReadBuffer getFromList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + private ReadBuffer getFromList(final Collection list, final ReadBufferStreamOperations stream, final long requestedOffset) { for (ReadBuffer buffer : list) { if (buffer.getStream() == stream) { if (buffer.getStatus() == ReadBufferStatus.AVAILABLE @@ -350,7 +383,7 @@ private ReadBuffer getFromList(final Collection list, final AbfsInpu * @param requestedOffset * @return */ - private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { + private ReadBuffer getBufferFromCompletedQueue(final ReadBufferStreamOperations stream, final long requestedOffset) { for (ReadBuffer buffer : completedReadList) { // Buffer is returned if the requestedOffset is at or above buffer's // offset but less than buffer's length or the actual requestedLength @@ -365,16 +398,136 @@ private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, fin return null; } - private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { + private void clearFromReadAheadQueue(final ReadBufferStreamOperations stream, final long requestedOffset) { ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); if (buffer != null) { readAheadQueue.remove(buffer); notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); + placeBufferOnFreeList("clear from readahead", buffer); } } - private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, + /** + * Add a buffer to the free list. + * @param reason reason for eviction + * @param readBuffer read buffer which owns the buffer to free + */ + private void placeBufferOnFreeList(final String reason, final ReadBuffer readBuffer) { + int index = readBuffer.getBufferindex(); + LOGGER.debug("Returning buffer index {} to free list for '{}'; owner {}", + index, reason, readBuffer); + checkState(readBuffer.hasIndexedBuffer(), + "ReadBuffer buffer release for %s has no allocated buffer to free: %s", + reason, readBuffer); + checkState(!freeList.contains(index), + "Duplicate buffer %d added to free buffer list for '%s' by %s", + index, reason, readBuffer); + verifyReadBufferOwnsBufferAtIndex(readBuffer, index); + // declare it as unowned + bufferOwners[index] = null; + // set the buffer to null, because it is now free + // for other operations. + readBuffer.releaseBuffer(); + // once it is not owned/referenced, make available to others + freeList.push(index); + } + + /** + * Verify that at given read ReadBuffer a buffer. + * @param readBuffer read operation taking ownership + * @param index index of buffer in buffer array + */ + void verifyReadBufferOwnsBufferAtIndex(final ReadBuffer readBuffer, final int index) { + checkState(readBuffer == bufferOwner(index), + "ReadBufferManager buffer %s is not owned by %s", index, readBuffer); + } + + /** + * Verify a read buffer owns its indexed buffer. + * @param readBuffer read buffer to validate. + * @throws IllegalStateException if ownership not satisified. + */ + void verifyIsOwnerOfBuffer(final ReadBuffer readBuffer) { + checkState(readBuffer.hasIndexedBuffer(), "no buffer owned by %s", readBuffer); + verifyReadBufferOwnsBufferAtIndex(readBuffer, readBuffer.getBufferindex()); + } + + /** + * Take ownership of a buffer. + * This updates the {@link #bufferOwners} array. + * @param readBuffer read operation taking ownership + * @param index index of buffer in buffer array + */ + private void takeOwnershipOfBufferAtIndex(final ReadBuffer readBuffer, int index) { + checkState(null == bufferOwners[index], + "Buffer %d requested by %s already owned by %s", + index, readBuffer, bufferOwners[index]); + readBuffer.setBuffer(buffers[index]); + readBuffer.setBufferindex(index); + bufferOwners[index] = readBuffer; + } + + /** + * Verify a buffer is not in use anywhere. + * @param index buffer index. + * @throws IllegalStateException if the state is invalid. + */ + private void verifyByteBufferNotInUse(final int index) { + verifyByteBufferNotInCollection("completedReadList", index, completedReadList); + verifyByteBufferNotInCollection("inProgressList", index, inProgressList); + verifyByteBufferNotInCollection("readAheadQueue", index, readAheadQueue); + } + + /** + * Verify that a buffer is not referenced in the supplied collection. + * @param name collection name for exceptions. + * @param index buffer index. + * @param collection collection to validate + * @throws IllegalStateException if the state is invalid. + */ + private void verifyByteBufferNotInCollection(String name, int index, + Collection collection) { + checkState(collection.stream().noneMatch(rb -> rb.getBufferindex() == index), + "Buffer index %d found in buffer collection %s", index, name); + } + + /** + * Verify that a read buffer is not referenced in the supplied collection. + * @param name collection name for exceptions. + * @param rb read buffer + * @param collection collection to validate + * @throws IllegalStateException if the state is invalid. + */ + private void verifyReadBufferNotInCollection(String name, + ReadBuffer rb, + Collection collection) { + checkState(!collection.contains(rb), + "Collection %s contains buffer %s", name, rb); + } + + /** + * Validate all invariants of the read manager state. + * @throws IllegalStateException if the state is invalid. + */ + @VisibleForTesting + public synchronized void validateReadManagerState() { + // all buffers in free list are not in any of the other lists + freeList.forEach(this::verifyByteBufferNotInUse); + + // there is no in progress buffer in the other queues + inProgressList.forEach(rb -> { + verifyReadBufferNotInCollection("completedReadList", rb, + completedReadList); + verifyReadBufferNotInCollection("readAheadQueue", rb, readAheadQueue); + }); + // nothing completed is in the readahead queue + completedReadList.forEach(rb -> { + verifyReadBufferNotInCollection("readAheadQueue", rb, readAheadQueue); + }); + + } + + private int getBlockFromCompletedQueue(final ReadBufferStreamOperations stream, final long position, final int length, final byte[] buffer) throws IOException { ReadBuffer buf = getBufferFromCompletedQueue(stream, position); @@ -401,13 +554,7 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long int availableLengthInBuffer = buf.getLength() - cursor; int lengthToCopy = Math.min(length, availableLengthInBuffer); System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); + buf.dataConsumedByStream(cursor, lengthToCopy); return lengthToCopy; } @@ -435,6 +582,9 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { if (buffer == null) { return null; // should never happen } + // verify buffer index is owned. + verifyIsOwnerOfBuffer(buffer); + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); inProgressList.add(buffer); } @@ -442,6 +592,9 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { LOGGER.trace("ReadBufferWorker picked file {} for offset {}", buffer.getStream().getPath(), buffer.getOffset()); } + validateReadManagerState(); + // update stream gauge. + buffer.prefetchStarted(); return buffer; } @@ -454,31 +607,104 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { */ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setLength(bytesActuallyRead); - } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}; {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead, buffer); + } + // decrement counter. + buffer.prefetchFinished(); + + try { + synchronized (this) { + // remove from the list + if (!inProgressList.remove(buffer)) { + // this is a sign of inconsistent state, so a major problem, such as + // double invocation of this method with the same buffer. + String message = + String.format("Read completed from an operation not declared as in progress %s", + buffer); + LOGGER.warn(message); + if (buffer.hasIndexedBuffer()) { + // release the buffer (which may raise an exception) + placeBufferOnFreeList("read not in progress", buffer); + } + // report the failure + throw new IllegalStateException(message); } - // completed list also contains FAILED read buffers - // for sending exception message to clients. + + // does the buffer own the array it has just written to? + // this is a significant issue. + verifyIsOwnerOfBuffer(buffer); + // should the read buffer be added to the completed list? + boolean addCompleted; + // flag to indicate buffer should be freed + boolean shouldFreeBuffer = false; + // and the reason (for logging) + String freeBufferReason = ""; + buffer.setStatus(result); buffer.setTimeStamp(currentTimeMillis()); - completedReadList.add(buffer); + // did the read return any data? + if (result == ReadBufferStatus.AVAILABLE) { + if (bytesActuallyRead > 0) { + + // successful read of data; + addCompleted = true; + + // update buffer state. + buffer.setLength(bytesActuallyRead); + } else { + // there was no data; the buffer can be returned to the free list. + shouldFreeBuffer = true; + freeBufferReason = "no data"; + // don't + addCompleted = false; + } + } else { + // read failed or there was no data; the buffer can be returned to the free list. + shouldFreeBuffer = true; + freeBufferReason = "failed read"; + // completed list also contains FAILED read buffers + // for sending exception message to clients. + // NOTE: checks for closed state may update this. + addCompleted = true; + } + + // now check for closed streams, which are discarded + // and the buffers freed immediately, irrespective of + // outcome + if (buffer.isStreamClosed()) { + // stream was closed during the read. + // even if there is data, it should be discarded + LOGGER.trace("Discarding prefetch on closed stream {}", buffer); + inProgressBlocksDiscarded.incrementAndGet(); + // don't add + addCompleted = false; + // and the buffer recycled + shouldFreeBuffer = true; + freeBufferReason = "stream closed"; + } + if (shouldFreeBuffer) { + buffer.discarded(); + // buffer should be returned to the free list. + placeBufferOnFreeList(freeBufferReason, buffer); + } + if (addCompleted) { + // add to the completed list. + LOGGER.trace("Adding buffer to completed list {}", buffer); + completedReadList.add(buffer); + } } + } catch (IllegalStateException e) { + // update this for tests to validate + buffer.setStatus(ReadBufferStatus.READ_FAILED); + buffer.setErrException(new IOException(e)); + throw e; + } finally { + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + LOGGER.trace("releasing latch {}", buffer.getLatch()); + buffer.getLatch().countDown(); // wake up waiting threads (if any) } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) } /** @@ -530,48 +756,127 @@ public synchronized List getInProgressCopiedList() { } @VisibleForTesting - void callTryEvict() { - tryEvict(); + boolean callTryEvict() { + return tryEvict(); } - /** * Purging the buffers associated with an {@link AbfsInputStream} * from {@link ReadBufferManager} when stream is closed. + * Before HADOOP-18521 this would purge in progress reads, which + * would return the active buffer to the free pool while it was + * still in use. * @param stream input stream. */ - public synchronized void purgeBuffersForStream(AbfsInputStream stream) { - LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); + public synchronized void purgeBuffersForStream(ReadBufferStreamOperations stream) { + LOGGER.debug("Purging stale buffers for AbfsInputStream {}/{}", + stream.getStreamID(), stream.getPath()); + + // remove from the queue + int before = readAheadQueue.size(); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); - purgeList(stream, completedReadList); + int readaheadPurged = readAheadQueue.size() - before; + queuedBlocksDiscarded.addAndGet(readaheadPurged); + + // all completed entries + int completedPurged = purgeCompletedReads(stream); + completedBlocksDiscarded.addAndGet(completedPurged); + + // print a summary + LOGGER.debug("Purging outcome readahead={}, completed={} for {}", + readaheadPurged, completedPurged, stream); + validateReadManagerState(); } /** * Method to remove buffers associated with a {@link AbfsInputStream} - * when its close method is called. + * from the completed read list. + * Any allocated buffers will be returned to the pool. + * The buffers will have {@link ReadBuffer#evicted()} called to let + * them update their statistics. * NOTE: This method is not threadsafe and must be called inside a * synchronised block. See caller. * @param stream associated input stream. - * @param list list of buffers like {@link this#completedReadList} - * or {@link this#inProgressList}. */ - private void purgeList(AbfsInputStream stream, LinkedList list) { - for (Iterator it = list.iterator(); it.hasNext();) { + private int purgeCompletedReads(ReadBufferStreamOperations stream) { + int purged = 0; + for (Iterator it = completedReadList.iterator(); it.hasNext();) { ReadBuffer readBuffer = it.next(); if (readBuffer.getStream() == stream) { + purged++; it.remove(); // As failed ReadBuffers (bufferIndex = -1) are already pushed to free // list in doneReading method, we will skip adding those here again. - if (readBuffer.getBufferindex() != -1) { - freeList.push(readBuffer.getBufferindex()); + if (readBuffer.hasIndexedBuffer()) { + placeBufferOnFreeList("purge completed reads", readBuffer); } + // tell the buffer it was evicted so should update its statistics + readBuffer.evicted(); } } + return purged; + } + + /** + * Get the count of completed blocks discarded. + * @return the number of completed blocks discarded. + */ + @VisibleForTesting + public long getCompletedBlocksDiscarded() { + return completedBlocksDiscarded.get(); + } + + /** + * Get the count of completed blocks discarded. + * @return the number of queued block reads discarded. + */ + @VisibleForTesting + public long getQueuedBlocksDiscarded() { + return queuedBlocksDiscarded.get(); + } + + /** + * Get the count of in progress read blocks discarded. + * @return the number of blocks being read discarded. + */ + @VisibleForTesting + public long getInProgressBlocksDiscarded() { + return inProgressBlocksDiscarded.get(); + } + + @VisibleForTesting + public void resetBlocksDiscardedCounters() { + completedBlocksDiscarded.set(0); + queuedBlocksDiscarded.set(0); + inProgressBlocksDiscarded.set(0); + } + + /** + * Look up the owner of a buffer. + * @param index index of the buffer. + * @return the buffer owner, null if the buffer is free. + */ + ReadBuffer bufferOwner(int index) { + return bufferOwners[index]; + } + + @Override + public String toString() { + return "ReadBufferManager{" + + "readAheadQueue=" + readAheadQueue.size() + + ", inProgressList=" + inProgressList.size() + + ", completedReadList=" + completedReadList.size() + + ", completedBlocksDiscarded=" + completedBlocksDiscarded + + ", queuedBlocksDiscarded=" + queuedBlocksDiscarded + + ", inProgressBlocksDiscarded=" + inProgressBlocksDiscarded + + '}'; } /** * Test method that can clean up the current state of readAhead buffers and * the lists. Will also trigger a fresh init. + * Note: will cause problems with the static shared class if multiple tests + * are running in the same VM. */ @VisibleForTesting void testResetReadBufferManager() { @@ -593,6 +898,7 @@ void testResetReadBufferManager() { freeList.clear(); for (int i = 0; i < NUM_BUFFERS; i++) { buffers[i] = null; + bufferOwners[i] = null; } buffers = null; resetBufferManager(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java new file mode 100644 index 0000000000000..ffee632c00984 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; + +/** + * Interface which is required for read buffer stream + * calls. + * Extracted from {@code AbfsInputStream} to make testing + * easier and to isolate what operations the read buffer + * makes of the streams using it. + */ +interface ReadBufferStreamOperations { + + /** + * Read a block from the store. + * @param position position in file + * @param b destination buffer. + * @param offset offset in buffer + * @param length length of read + * @param tracingContext trace context + * @return count of bytes read. + * @throws IOException failure. + */ + int readRemote(long position, + byte[] b, + int offset, + int length, + TracingContext tracingContext) throws IOException; + + /** + * Is the stream closed? + * This must be thread safe as prefetch operations in + * different threads probe this before closure. + * @return true if the stream has been closed. + */ + boolean isClosed(); + + String getStreamID(); + + IOStatisticsStore getIOStatistics(); + + /** + * Get the stream path as a string. + * @return path string. + */ + String getPath(); + + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index a30f06261ef6f..7a0651baf1418 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -21,14 +21,26 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.statistics.DurationTracker; + +import static org.apache.hadoop.util.Preconditions.checkState; class ReadBufferWorker implements Runnable { + private static final Logger LOGGER = + LoggerFactory.getLogger(ReadBufferWorker.class); + protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); private int id; + private boolean doneReadingInvoked; + ReadBufferWorker(final int id) { this.id = id; } @@ -61,27 +73,93 @@ public void run() { return; } if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote( - buffer.getOffset(), - buffer.getBuffer(), - 0, - // If AbfsInputStream was created with bigger buffer size than - // read-ahead buffer size, make sure a valid length is passed - // for remote read - Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), - buffer.getTracingContext()); - - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (IOException ex) { - buffer.setErrException(ex); - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } catch (Exception ex) { - buffer.setErrException(new PathIOException(buffer.getStream().getPath(), ex)); - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } + fetchBuffer(bufferManager, buffer); } } } + + /** + * Fetch the data for the buffer. + * @param bufferManager buffer manager. + * @param buffer the buffer to read in. + */ + @VisibleForTesting + void fetchBuffer(final ReadBufferManager bufferManager, final ReadBuffer buffer) { + LOGGER.trace("Reading {}", buffer); + doneReadingInvoked = false; + // Stop network call if stream is closed + if (postFailureWhenStreamClosed(bufferManager, buffer)) { + // stream closed before the HTTP request was initiated. + // Immediately move to the next request in the queue. + return; + } + checkState(buffer.hasIndexedBuffer(), + "ReadBuffer buffer has no allocated buffer to read into: %s", + buffer); + // input stream is updated with count/duration of prefetching + DurationTracker tracker = buffer.trackPrefetchOperation(); + try { + // do the actual read, from the file. + int bytesRead = buffer.getStream().readRemote( + buffer.getOffset(), + buffer.getBuffer(), + 0, + // If AbfsInputStream was created with bigger buffer size than + // read-ahead buffer size, make sure a valid length is passed + // for remote read + Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), + buffer.getTracingContext()); + LOGGER.trace("Read {} bytes", bytesRead); + + // Update failure to completed list if stream is closed + if (!postFailureWhenStreamClosed(bufferManager, buffer)) { + // post result back to ReadBufferManager + doneReading(bufferManager, buffer, ReadBufferStatus.AVAILABLE, bytesRead); + } + tracker.close(); + } catch (Exception ex) { + tracker.failed(); + IOException ioe = ex instanceof IOException + ? (IOException) ex + : new PathIOException(buffer.getStream().getPath(), ex); + buffer.setErrException(ioe); + LOGGER.debug("prefetch failure for {}", buffer, ex); + + // if doneReading hasn't already been called for this iteration, call it. + // the check ensures that any exception raised in doneReading() doesn't trigger + // a second attempt. + if (!doneReadingInvoked) { + doneReading(bufferManager, buffer, ReadBufferStatus.READ_FAILED, 0); + } + } + } + + /** + * Check for the owning stream being closed; if so it + * reports it to the buffer manager and then returns "true". + * @param bufferManager buffer manager + * @param buffer buffer to review + * @return true if the stream is closed + */ + private boolean postFailureWhenStreamClosed( + ReadBufferManager bufferManager, + ReadBuffer buffer) { + + // When stream is closed report failure to be picked by eviction + if (buffer.isStreamClosed()) { + LOGGER.debug("Stream closed; failing read"); + // Fail read + doneReading(bufferManager, buffer, ReadBufferStatus.READ_FAILED, 0); + return true; + } + return false; + } + + private void doneReading(final ReadBufferManager bufferManager, + final ReadBuffer buffer, + final ReadBufferStatus result, + final int bytesActuallyRead) { + doneReadingInvoked = true; + bufferManager.doneReading(buffer, result, bytesActuallyRead); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index beada775ae87b..a52ad4dcaf137 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Random; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -35,6 +36,8 @@ import org.apache.hadoop.fs.statistics.IOStatisticsSource; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CAPABILITY_PREFETCH_SAFE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; @@ -89,6 +92,10 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { new Random().nextBytes(b); Path testPath = path(TEST_PATH); + Assertions.assertThat(fs.hasPathCapability(testPath, FS_AZURE_CAPABILITY_PREFETCH_SAFE)) + .describedAs("path capability %s on filesystem %s", FS_AZURE_CAPABILITY_PREFETCH_SAFE, fs) + .isTrue(); + FSDataOutputStream stream = fs.create(testPath); try { stream.write(b); @@ -100,13 +107,28 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { final byte[] readBuffer = new byte[2 * bufferSize]; int result; IOStatisticsSource statisticsSource = null; - try (FSDataInputStream inputStream = fs.open(testPath)) { + try (FSDataInputStream inputStream = fs.open(testPath); + FSDataInputStream stream2 = fs.openFile(testPath) + .must(FS_AZURE_CAPABILITY_PREFETCH_SAFE, "") + .must(FS_AZURE_BUFFERED_PREAD_DISABLE, true) + .build().get()) { statisticsSource = inputStream; ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.READ, true, 0, ((AbfsInputStream) inputStream.getWrappedStream()) .getStreamID())); + // check stream 2 capabilities + Assertions.assertThat(stream2.hasCapability(FS_AZURE_CAPABILITY_PREFETCH_SAFE)) + .describedAs("Stream capability %s on stream %s", + FS_AZURE_CAPABILITY_PREFETCH_SAFE, stream2) + .isTrue(); + Assertions.assertThat(stream2.hasCapability(FS_AZURE_BUFFERED_PREAD_DISABLE)) + .describedAs("Stream capability %s on stream %s", + FS_AZURE_BUFFERED_PREAD_DISABLE, stream2) + .isTrue(); + // trigger read at byte 0 on second input stream + stream2.read(); inputStream.seek(bufferSize); result = inputStream.read(readBuffer, bufferSize, bufferSize); assertNotEquals(-1, result); @@ -118,11 +140,16 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); + assertNotEquals("data read in final read()", -1, result); + assertArrayEquals(readBuffer, b); + inputStream.close(); + + // now look at stream2, including all prefetching + stream2.readFully(readBuffer, 0, bufferSize); + // and a bit of positioned read + stream2.readFully(bufferSize + 1, readBuffer, 0, bufferSize-1); } logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); - - assertNotEquals("data read in final read()", -1, result); - assertArrayEquals(readBuffer, b); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 66f072501dc4d..ab7f94581d44c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -21,25 +21,40 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.Map; +import java.util.Optional; import java.util.Random; +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.OpenFileParameters; + +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest { @@ -255,4 +270,120 @@ private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos) thro assertEquals(0, abfsInputStream.getLimit()); assertEquals(0, abfsInputStream.getBCursor()); } + + private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + fs.create(testFile); + FSDataOutputStream out = fs.append(testFile); + out.write(buffer); + out.close(); + } + + private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, + byte[] buf, AbfsRestOperationType source) + throws IOException, ExecutionException, InterruptedException { + byte[] readBuf = new byte[buf.length]; + AzureBlobFileSystem fs = getFileSystem(); + FutureDataInputStreamBuilder builder = fs.openFile(path); + builder.withFileStatus(fileStatus); + FSDataInputStream in = builder.build().get(); + assertEquals(String.format( + "Open with fileStatus [from %s result]: Incorrect number of bytes read", + source), buf.length, in.read(readBuf)); + assertArrayEquals(String + .format("Open with fileStatus [from %s result]: Incorrect read data", + source), readBuf, buf); + } + + private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, + AzureBlobFileSystemStore abfsStore, AbfsClient mockClient, + AbfsRestOperationType source, TracingContext tracingContext) + throws IOException { + + // verify GetPathStatus not invoked when FileStatus is provided + abfsStore.openFileForRead(testFile, Optional + .ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext); + verify(mockClient, times(0).description((String.format( + "FileStatus [from %s result] provided, GetFileStatus should not be invoked", + source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + // verify GetPathStatus invoked when FileStatus not provided + abfsStore.openFileForRead(testFile, + Optional.empty(), null, + tracingContext); + verify(mockClient, times(1).description( + "GetPathStatus should be invoked when FileStatus not provided")) + .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + Mockito.reset(mockClient); //clears invocation count for next test case + } + + @Test + public void testOpenFileWithOptions() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + String testFolder = "/testFolder"; + Path smallTestFile = new Path(testFolder + "/testFile0"); + Path largeTestFile = new Path(testFolder + "/testFile1"); + fs.mkdirs(new Path(testFolder)); + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] smallBuffer = new byte[5]; + byte[] largeBuffer = new byte[readBufferSize + 5]; + new Random().nextBytes(smallBuffer); + new Random().nextBytes(largeBuffer); + writeBufferToNewFile(smallTestFile, smallBuffer); + writeBufferToNewFile(largeTestFile, largeBuffer); + + FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile), + fs.getFileStatus(largeTestFile)}; + FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder)); + + // open with fileStatus from GetPathStatus + verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0], + smallBuffer, AbfsRestOperationType.GetPathStatus); + verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1], + largeBuffer, AbfsRestOperationType.GetPathStatus); + + // open with fileStatus from ListStatus + verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer, + AbfsRestOperationType.ListPaths); + verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer, + AbfsRestOperationType.ListPaths); + + // verify number of GetPathStatus invocations + AzureBlobFileSystemStore abfsStore = getAbfsStore(fs); + AbfsClient mockClient = spy(getAbfsClient(abfsStore)); + setAbfsClient(abfsStore, mockClient); + TracingContext tracingContext = getTestTracingContext(fs, false); + checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(smallTestFile, listStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + checkGetPathStatusCalls(largeTestFile, listStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + + // Verify with incorrect filestatus + getFileStatusResults[0].setPath(new Path("wrongPath")); + intercept(ExecutionException.class, + () -> verifyOpenWithProvidedStatus(smallTestFile, + getFileStatusResults[0], smallBuffer, + AbfsRestOperationType.GetPathStatus)); + } + + @Test + public void testDefaultReadaheadQueueDepth() throws Exception { + Configuration config = getRawConfiguration(); + config.unset(FS_AZURE_READ_AHEAD_QUEUE_DEPTH); + AzureBlobFileSystem fs = getFileSystem(config); + Path testFile = path("/testFile"); + fs.create(testFile).close(); + FSDataInputStream in = fs.open(testFile); + Assertions.assertThat( + ((AbfsInputStream) in.getWrappedStream()).getReadAheadQueueDepth()) + .describedAs("readahead queue depth should be set to default value 2") + .isEqualTo(2); + in.close(); + } + } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index eca670fba9059..4cfc0928cb171 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -34,20 +34,38 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.io.IOUtils; import org.assertj.core.api.Assertions; import org.junit.Test; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_BLOCK_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticGauge; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.test.LambdaTestUtils.eventually; public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { - public ITestReadBufferManager() throws Exception { + /** + * Time before the JUnit test times out for eventually() clauses + * to fail. This copes with slow network connections and debugging + * sessions, yet still allows for tests to fail with meaningful + * messages. + */ + public static final int TIMEOUT_OFFSET = 5 * 60_000; + + /** + * Interval between eventually preobes. + */ + public static final int PROBE_INTERVAL_MILLIS = 1_000; + + public ITestReadBufferManager() throws Exception { } @Test @@ -93,55 +111,100 @@ private void assertListEmpty(String listName, List list) { @Test public void testPurgeBufferManagerForSequentialStream() throws Exception { - describe("Testing purging of buffers in ReadBufferManager for " - + "sequential input streams"); - AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); - final String fileName = methodName.getMethodName(); - byte[] fileContent = getRandomBytesArray(ONE_MB); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - - AbfsInputStream iStream1 = null; + describe("Testing purging of buffers in ReadBufferManager for " + + "sequential input streams"); + AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + final String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(ONE_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + AbfsInputStream iStream1 = null; // stream1 will be closed right away. - try { - iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); - // Just reading one byte will trigger all read ahead calls. - iStream1.read(); - } finally { - IOUtils.closeStream(iStream1); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - AbfsInputStream iStream2 = null; - try { - iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); - iStream2.read(); - // After closing stream1, no queued buffers of stream1 should be present - // assertions can't be made about the state of the other lists as it is - // too prone to race conditions. - assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); - } finally { - // closing the stream later. - IOUtils.closeStream(iStream2); - } - // After closing stream2, no queued buffers of stream2 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); + AbfsInputStream iStream2 = null; + try { + iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + // Just reading one byte will trigger all read ahead calls. + iStream1.read(); + assertThatStatisticGauge(iStream1.getIOStatistics(), STREAM_READ_ACTIVE_PREFETCH_OPERATIONS) + .isGreaterThan(0); + } finally { + IOUtils.closeStream(iStream1); + } + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + + try { + iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + iStream2.read(); + // After closing stream1, none of the buffers associated with stream1 should be present. + AbfsInputStream s1 = iStream1; + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListDoesnotContainBuffersForIstream("InProgressList", + bufferManager.getInProgressCopiedList(), s1)); + assertListDoesnotContainBuffersForIstream("CompletedList", + bufferManager.getCompletedReadListCopy(), iStream1); + assertListDoesnotContainBuffersForIstream("ReadAheadQueue", + bufferManager.getReadAheadQueueCopy(), iStream1); + } finally { + // closing the stream later. + IOUtils.closeStream(iStream2); + } + // After closing stream2, none of the buffers associated with stream2 should be present. + IOStatisticsStore stream2IOStatistics = iStream2.getIOStatistics(); + AbfsInputStream s2 = iStream2; + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListDoesnotContainBuffersForIstream("InProgressList", + bufferManager.getInProgressCopiedList(), s2)); + // no in progress reads in the stats + assertThatStatisticGauge(stream2IOStatistics, STREAM_READ_ACTIVE_PREFETCH_OPERATIONS) + .isEqualTo(0); + + assertListDoesnotContainBuffersForIstream("CompletedList", + bufferManager.getCompletedReadListCopy(), iStream2); + assertListDoesnotContainBuffersForIstream("ReadAheadQueue", + bufferManager.getReadAheadQueueCopy(), iStream2); + + // After closing both the streams, all lists should be empty. + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList())); + assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); + assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + } - // After closing both the streams, read queue should be empty. - assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + private void assertListDoesnotContainBuffersForIstream(String name, + List list, AbfsInputStream inputStream) { + Assertions.assertThat(list) + .describedAs("list %s contains entries for closed stream %s", + name, inputStream) + .filteredOn(b -> b.getStream() == inputStream) + .isEmpty(); } + private void assertListContainBuffersForIstream(List list, + AbfsInputStream inputStream) { + Assertions.assertThat(list) + .describedAs("buffer expected to contain closed stream %s", inputStream ) + .filteredOn(b -> b.getStream() == inputStream) + .isNotEmpty(); + } - private void assertListDoesnotContainBuffersForIstream(List list, - AbfsInputStream inputStream) { - for (ReadBuffer buffer : list) { - Assertions.assertThat(buffer.getStream()) - .describedAs("Buffers associated with closed input streams shouldn't be present") - .isNotEqualTo(inputStream); - } + /** + * Does a list contain a read buffer for stream? + * @param list list to scan + * @param inputStream stream to look for + * @return true if there is at least one reference in the list. + */ + boolean listContainsStreamRead(List list, + AbfsInputStream inputStream) { + return list.stream() + .filter(b -> b.getStream() == inputStream) + .count() > 0; } + private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception { Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true); conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8); conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 0395c4183b9b7..f3e6475db2a35 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -28,14 +28,18 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; @@ -45,10 +49,14 @@ import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_DISCARDED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -62,7 +70,13 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; /** - * Unit test AbfsInputStream. + * Unit test AbfsInputStream, especially the ReadAheadManager. + * Note 1: even though this uses mocking to simulate the client, + * it is still an integration test suite and will fail if offline. + * + * Note 2: maven runs different test methods in parallel, so + * making assertions about the state of the shared object is + * fairly dangerous. */ public class TestAbfsInputStream extends AbstractAbfsIntegrationTest { @@ -82,6 +96,8 @@ public class TestAbfsInputStream extends REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + private static final Logger LOG = LoggerFactory.getLogger(TestAbfsInputStream.class); + @Override public void teardown() throws Exception { super.teardown(); @@ -118,7 +134,11 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, null, FORWARD_SLASH + fileName, THREE_KB, - inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB), + inputStreamContext + .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) + .withReadBufferSize(ONE_KB) + .withReadAheadQueueDepth(10) + .withReadAheadBlockSize(ONE_KB), "eTag", getTestTracingContext(null, false)); @@ -144,6 +164,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, FORWARD_SLASH + fileName, fileSize, inputStreamContext.withReadBufferSize(readBufferSize) + .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withReadAheadQueueDepth(readAheadQueueDepth) .withShouldReadBufferSizeAlways(alwaysReadBufferSize) .withReadAheadBlockSize(readAheadBlockSize), @@ -166,13 +187,14 @@ private void queueReadAheads(AbfsInputStream inputStream) { ReadBufferManager.getBufferManager() .queueReadAhead(inputStream, TWO_KB, TWO_KB, inputStream.getTracingContext()); + } private void verifyReadCallCount(AbfsClient client, int count) throws AzureBlobFileSystemException, InterruptedException { // ReadAhead threads are triggered asynchronously. // Wait a second before verifying the number of total calls. - Thread.sleep(1000); + waitForPrefetchCompletion(); verify(client, times(count)).read(any(String.class), any(Long.class), any(byte[].class), any(Integer.class), any(Integer.class), any(String.class), any(String.class), any(TracingContext.class)); @@ -341,7 +363,7 @@ public void testFailedReadAhead() throws Exception { // In this test, a read should trigger 3 client.read() calls as file is 3 KB // and readahead buffer size set in AbfsInputStream is 1 KB // There should only be a total of 3 client.read() in this test. - intercept(IOException.class, + intercept(TimeoutException.class, () -> inputStream.read(new byte[ONE_KB])); // Only the 3 readAhead threads should have triggered client.read @@ -384,7 +406,6 @@ public void testFailedReadAheadEviction() throws Exception { } /** - * * The test expects AbfsInputStream to initiate a remote read request for * the request offset and length when previous read ahead on the offset had failed. * Also checks that the ReadBuffers are evicted as per the ReadBufferManager @@ -414,7 +435,7 @@ public void testOlderReadAheadFailure() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt"); // First read request that fails as the readahead triggered from this request failed. - intercept(IOException.class, + intercept(TimeoutException.class, "Internal Server error", () -> inputStream.read(new byte[ONE_KB])); // Only the 3 readAhead threads should have triggered client.read @@ -593,7 +614,7 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for a sec to // get the read ahead threads to complete - Thread.sleep(1000); + waitForPrefetchCompletion(); // if readAhead failed for specific offset, getBlock should // throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec @@ -700,7 +721,7 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for a sec to // get the read ahead threads to complete - Thread.sleep(1000); + waitForPrefetchCompletion(); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); @@ -725,6 +746,14 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * Wait for prefetches to complete. + * @throws InterruptedException interrupted. + */ + private static void waitForPrefetchCompletion() throws InterruptedException { + Thread.sleep(1000); + } + /** * Test readahead with different config settings for request request size and * readAhead block size @@ -918,4 +947,67 @@ private void resetReadBufferManager(int bufferSize, int threshold) { // by successive tests can lead to OOM based on the dev VM/machine capacity. System.gc(); } -} \ No newline at end of file + + /** + * The first readahead closes the stream. + */ + @Test + public void testStreamCloseInFirstReadAhead() throws Exception { + describe("close a stream during prefetch, verify outcome is good"); + + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + AbfsInputStream inputStream = getAbfsInputStream(client, getMethodName()); + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + + final long initialInProgressBlocksDiscarded = bufferManager.getInProgressBlocksDiscarded(); + + // on first read, the op succeeds but the stream is closed, which + // means that the request should be considered a failure + doAnswer(invocation -> { + LOG.info("in read call with {}", inputStream); + inputStream.close(); + return successOp; + }).doReturn(successOp) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + // kick these off before the read() to avoid various race conditions. + queueReadAheads(inputStream); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for a sec to + // get the read ahead threads to complete + waitForPrefetchCompletion(); + + // this triggers prefetching, which closes the stream while the read + // is queued. which causes the prefetch to not return. + // which triggers a blocking read, which will then fail. + intercept(PathIOException.class, FSExceptionMessages.STREAM_IS_CLOSED, () -> { + // should fail + int bytesRead = inputStream.read(new byte[ONE_KB]); + // diagnostics info if failure wasn't raised + return "read " + bytesRead + " bytes from " + inputStream; + }); + + Assertions.assertThat(bufferManager.getCompletedReadListCopy()) + .filteredOn(rb -> rb.getStream() == inputStream) + .describedAs("list of completed reads") + .isEmpty(); + IOStatisticsStore ios = inputStream.getIOStatistics(); + assertThatStatisticCounter(ios, STREAM_READ_PREFETCH_BLOCKS_DISCARDED) + .describedAs("blocks discarded by %s", inputStream) + .isGreaterThan(0); + + // at least one of the blocks was discarded in progress. + // this is guaranteed because the mockito callback must have been invoked + // by the prefetcher + Assertions.assertThat(bufferManager.getInProgressBlocksDiscarded()) + .describedAs("in progress blocks discarded") + .isGreaterThan(initialInProgressBlocksDiscarded); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManager.java new file mode 100644 index 0000000000000..c9052383628e2 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManager.java @@ -0,0 +1,576 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.util.functional.FunctionRaisingIOE; + +import static org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.NUM_BUFFERS; +import static org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.getBufferManager; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_EVICTED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for ReadBufferManager; uses stub ReadBufferStreamOperations + * to simulate failures, count statistics etc. + */ +public class TestReadBufferManager extends AbstractHadoopTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(TestReadBufferManager.class); + + private ReadBufferManager bufferManager; + + @Before + public void setup() throws Exception { + bufferManager = getBufferManager(); + } + + /** + * doneReading of a not in-progress ReadBuffer is an error. + */ + @Test + public void testBufferNotInProgressList() throws Throwable { + ReadBuffer buffer = readBuffer(new StubReadBufferOperations()); + intercept(IllegalStateException.class, () -> + bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0)); + } + + /** + * Create a read buffer. + * @param operations callbacks to use + * @return a buffer bonded to the operations, and with a countdown latch. + */ + private static ReadBuffer readBuffer(final ReadBufferStreamOperations operations) { + final ReadBuffer buffer = new ReadBuffer(); + buffer.setStream(operations); + buffer.setLatch(new CountDownLatch(1)); + return buffer; + } + + /** + * zero byte results are not considered successes, but not failures either. + */ + @Test + public void testZeroByteRead() throws Throwable { + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.setBytesToRead(0); + + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.AVAILABLE, "buffer is available") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .matches(b -> b.getLength() == 0, "buffer is empty"); + + // even though it succeeded, it didn't return any data, so + // not placed on the completed list. + assertNotInCompletedList(buffer); + } + + /** + * Returning a negative number is the same as a zero byte response. + */ + @Test + public void testNegativeByteRead() throws Throwable { + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.setBytesToRead(-1); + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.AVAILABLE, "buffer is available") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .matches(b -> b.getLength() == 0, "buffer is empty"); + + // even though it succeeded, it didn't return any data, so + // not placed on the completed list. + assertNotInCompletedList(buffer); + } + + /** + * Queue a prefetch against the passed in stream operations, then block + * awaiting its actual execution. + * @param operations operations to use + * @param offset offset of the read. + * @return the completed buffer. + * @throws InterruptedException await was interrupted + */ + private ReadBuffer prefetch(final ReadBufferStreamOperations operations, final int offset) + throws InterruptedException { + return awaitDone(bufferManager.queueReadAhead(operations, offset, 1, null)); + } + + /** + * read raises an IOE; must be caught, saved without wrapping to the ReadBuffer, + * which is declared {@code READ_FAILED}. + * The buffer is freed but it is still placed in the completed list. + * Evictions don't find it (no buffer to return) but will silently + * evict() it when it is out of date. + */ + @Test + public void testIOEInRead() throws Throwable { + + final StubReadBufferOperations operations = new StubReadBufferOperations(); + final EOFException ioe = new EOFException("eof"); + operations.exceptionToRaise = ioe; + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.READ_FAILED, "buffer is failed") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .extracting(ReadBuffer::getErrException) + .isEqualTo(ioe); + + // it is on the completed list for followup reads + assertInCompletedList(buffer); + + // evict doesn't find it, as there is no useful buffer + Assertions.assertThat(bufferManager.callTryEvict()) + .describedAs("evict() should not find failed buffer") + .isFalse(); + assertInCompletedList(buffer); + + // set the timestamp to be some time ago + buffer.setTimeStamp(1000); + + // evict still doesn't find it + Assertions.assertThat(bufferManager.callTryEvict()) + .describedAs("evict() should not find failed buffer") + .isFalse(); + + // but now it was evicted due to its age + assertNotInCompletedList(buffer); + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED) + .isEqualTo(1); + } + + /** + * Stream is closed before the read; this should trigger failure before + * {@code readRemote()} is invoked. + * The stream state change does not invoke + * {@link ReadBufferManager#purgeBuffersForStream(ReadBufferStreamOperations)}; + * success relies on ReadBufferWorker. + */ + @Test + public void testClosedBeforeRead() throws Throwable { + + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.setClosed(true); + // this exception won't get raised because the read was cancelled first + operations.exceptionToRaise = new EOFException("eof"); + + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.READ_FAILED, "buffer is failed") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .extracting(ReadBuffer::getErrException) + .isNull(); + + // its on the completed list for followup reads + assertNotInCompletedList(buffer); + } + + /** + * Stream is closed during the read; this should trigger failure. + * The stream state change does not invoke + * {@link ReadBufferManager#purgeBuffersForStream(ReadBufferStreamOperations)}; + * success relies on ReadBufferWorker. + */ + @Test + public void testClosedDuringRead() throws Throwable { + + // have a different handler for the read call. + final StubReadBufferOperations operations = + new StubReadBufferOperations((self) -> { + self.setClosed(true); + return 1; + }); + + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.READ_FAILED, "buffer is failed") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .extracting(ReadBuffer::getErrException) + .isNull(); + + // its on the completed list for followup reads + assertNotInCompletedList(buffer); + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED) + .isEqualTo(1); + } + + /** + * When a stream is closed, all entries in the completed list are closed. + */ + @Test + public void testStreamCloseEvictsCompleted() throws Throwable { + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.bytesToRead = 1; + // two successes + ReadBuffer buffer1 = prefetch(operations, 0); + ReadBuffer buffer2 = prefetch(operations, 1000); + + // final read is a failure + operations.exceptionToRaise = new FileNotFoundException("/fnfe"); + ReadBuffer failed = prefetch(operations, 2000); + + List buffers = Arrays.asList(buffer1, buffer2, failed); + + // all buffers must be in the completed list + buffers.forEach(TestReadBufferManager::assertInCompletedList); + + // xor for an exclusivity assertion. + // the read failed xor it succeeded with an indexed buffer. + Assertions.assertThat(buffers) + .allMatch(b -> + b.getStatus() == ReadBufferStatus.READ_FAILED + ^ b.hasIndexedBuffer(), + "either was failure or it has an indexed buffer but not both"); + + // now state that buffer 1 was read + buffer1.dataConsumedByStream(0, 1); + Assertions.assertThat(buffer1) + .matches(ReadBuffer::isAnyByteConsumed, "any byte consumed") + .matches(ReadBuffer::isFirstByteConsumed, "first byte consumed") + .matches(ReadBuffer::isLastByteConsumed, "last byte consumed"); + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_USED) + .isEqualTo(1); + + operations.assertCounter(STREAM_READ_PREFETCH_BYTES_USED) + .isEqualTo(1); + + // now tell buffer manager of the stream closure + operations.setClosed(true); + bufferManager.purgeBuffersForStream(operations); + + // now the buffers are out the list + buffers.forEach(TestReadBufferManager::assertNotInCompletedList); + Assertions.assertThat(buffers) + .allMatch(b -> !b.hasIndexedBuffer(), "no indexed buffer"); + + // all blocks declared evicted + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED) + .isEqualTo(3); + // only one buffer was evicted unused; the error buffer + // doesn't get included, while the first buffer was + // used. + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED) + .isEqualTo(1); + } + + /** + * Even if (somehow) the closed stream makes it to the completed list, + * if it succeeded it will be found and returned. + */ + @Test + public void testEvictFindsClosedStreams() throws Throwable { + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.bytesToRead = 1; + // two successes + ReadBuffer buffer1 = prefetch(operations, 0); + + // final read is a failure + operations.exceptionToRaise = new FileNotFoundException("/fnfe"); + ReadBuffer failed = prefetch(operations, 2000); + + List buffers = Arrays.asList(buffer1, failed); + + // all buffers must be in the completed list + buffers.forEach(TestReadBufferManager::assertInCompletedList); + + // xor for an exclusivity assertion. + // the read failed xor it succeeded with an indexed buffer. + Assertions.assertThat(buffers) + .allMatch(b -> + b.getStatus() == ReadBufferStatus.READ_FAILED + ^ b.hasIndexedBuffer(), + "either was failure or it has an indexed buffer but not both"); + + // now state that buffer 1 was read + buffer1.dataConsumedByStream(0, 1); + + // now close the stream without telling the buffer manager + operations.setClosed(true); + + // evict finds the buffer with data + Assertions.assertThat(bufferManager.callTryEvict()) + .describedAs("evict() should return closed buffer with data") + .isTrue(); + + // but now it was evicted due to its age + assertNotInCompletedList(buffer1); + assertInCompletedList(failed); + + // second still doesn't find the failed buffer, as it has no data + Assertions.assertThat(bufferManager.callTryEvict()) + .describedAs("evict() should not find failed buffer") + .isFalse(); + + // but it was quietly evicted due to its state + assertNotInCompletedList(failed); + + } + + /** + * HADOOP-18521: if we remove ownership of a buffer during the remote read, + * things blow up. + */ + @Test + public void testOwnershipChangedDuringRead() throws Throwable { + + final Object blockUntilReadStarted = new Object(); + // this stores the reference and is the object waited on to + // block read operation until buffer index is patched. + + final AtomicReference bufferRef = new AtomicReference<>(); + + // handler blocks for buffer reference update. + final StubReadBufferOperations operations = + new StubReadBufferOperations((self) -> { + synchronized (blockUntilReadStarted) { + blockUntilReadStarted.notify(); + } + synchronized (bufferRef) { + if (bufferRef.get() == null) { + try { + bufferRef.wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException("interrupted"); + } + } + } + return 1; + }); + + // queue the operation + ReadBuffer buffer = bufferManager.queueReadAhead(operations, 0, 1, null); + + synchronized (blockUntilReadStarted) { + blockUntilReadStarted.wait(); + } + + // set the buffer index to a different value + final int index = buffer.getBufferindex(); + final int newIndex = (index + 1) % NUM_BUFFERS; + LOG.info("changing buffer index from {} to {}", index, newIndex); + buffer.setBufferindex(newIndex); + + // notify waiting buffer + synchronized (bufferRef) { + bufferRef.set(buffer); + bufferRef.notifyAll(); + } + + awaitDone(buffer); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.READ_FAILED, "buffer is failed") + .extracting(ReadBuffer::getErrException) + .isNotNull(); + + assertExceptionContains("not owned", buffer.getErrException()); + + // its on the completed list for followup reads + assertNotInCompletedList(buffer); + + bufferManager.testResetReadBufferManager(); + } + + /** + * Assert that a buffer is not in the completed list. + * @param buffer buffer to validate. + */ + private static void assertNotInCompletedList(final ReadBuffer buffer) { + Assertions.assertThat(getBufferManager().getCompletedReadListCopy()) + .describedAs("completed read list") + .doesNotContain(buffer); + } + + /** + * Assert that a buffer is in the completed list. + * @param buffer buffer to validate. + */ + private static void assertInCompletedList(final ReadBuffer buffer) { + Assertions.assertThat(getBufferManager().getCompletedReadListCopy()) + .describedAs("completed read list") + .contains(buffer); + } + + /** + * Awaits for a buffer to be read; times out eventually and then raises an + * assertion. + * @param buffer buffer to block on. + * @return + * @throws InterruptedException interrupted. + */ + private static ReadBuffer awaitDone(final ReadBuffer buffer) throws InterruptedException { + LOG.info("awaiting latch of {}", buffer); + Assertions.assertThat(buffer.getLatch().await(1, TimeUnit.MINUTES)) + .describedAs("awaiting release of latch on %s", buffer) + .isTrue(); + LOG.info("Prefetch completed of {}", buffer); + return buffer; + } + + /** + * Stub implementation of {@link ReadBufferStreamOperations}. + * IOStats are collected and a lambda-function can be provided + * which is evaluated on a readRemote call. The default one is + * {@link #raiseOrReturn(StubReadBufferOperations)}. + */ + private static final class StubReadBufferOperations + implements ReadBufferStreamOperations { + + private final IOStatisticsStore iostats = iostatisticsStore() + .withCounters( + STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, + STREAM_READ_PREFETCH_BLOCKS_DISCARDED, + STREAM_READ_PREFETCH_BLOCKS_USED, + STREAM_READ_PREFETCH_BYTES_DISCARDED, + STREAM_READ_PREFETCH_BLOCKS_EVICTED, + STREAM_READ_PREFETCH_BYTES_USED, + STREAM_READ_PREFETCH_OPERATIONS) + .build(); + + private boolean closed; + + private int bytesToRead; + + private IOException exceptionToRaise; + + /** + * Function which takes a ref to this object and + * returns the number of bytes read. + * Invoked in the {@link #readRemote(long, byte[], int, int, TracingContext)} + * call. + */ + private final FunctionRaisingIOE action; + + /** + * Simple constructor. + */ + private StubReadBufferOperations() { + // trivia, you can't use a this(this::raiseOrReturn) call here. + this.action = this::raiseOrReturn; + } + + /** + * constructor. + * @param action lambda expression to call in readRemote() + */ + private StubReadBufferOperations( + FunctionRaisingIOE action) { + this.action = action; + } + + @Override + public int readRemote( + final long position, + final byte[] b, + final int offset, + final int length, + final TracingContext tracingContext) throws IOException { + return action.apply(this); + } + + private int raiseOrReturn(StubReadBufferOperations self) throws IOException { + if (exceptionToRaise != null) { + throw exceptionToRaise; + } + return bytesToRead; + } + + @Override + public boolean isClosed() { + return closed; + } + + private void setClosed(final boolean closed) { + this.closed = closed; + } + + private int getBytesToRead() { + return bytesToRead; + } + + private void setBytesToRead(final int bytesToRead) { + this.bytesToRead = bytesToRead; + } + + @Override + public String getStreamID() { + return "streamid"; + } + + @Override + public IOStatisticsStore getIOStatistics() { + return iostats; + } + + @Override + public String getPath() { + return "/path"; + } + + /** + * start asserting about a counter. + * @param key key + * @return assert builder + */ + AbstractLongAssert assertCounter(String key) { + return assertThatStatisticCounter(iostats, key); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties index 9f72d03653306..2b0e72b1a16ce 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties @@ -26,6 +26,8 @@ log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG +log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferManager=TRACE +log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker=TRACE # after here: turn off log messages from other parts of the system # which only clutter test reports.