diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 50bbf45505cec..387706f5a6fca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -421,6 +421,36 @@ public final class StreamStatisticNames { public static final String STREAM_READ_PREFETCH_OPERATIONS = "stream_read_prefetch_operations"; + /** + * Count of prefetch blocks used. + */ + public static final String STREAM_READ_PREFETCH_BLOCKS_USED + = "stream_read_prefetch_blocks_used"; + /** + * Count of prefetch bytes used. + */ + public static final String STREAM_READ_PREFETCH_BYTES_USED + = "stream_read_prefetch_bytes_used"; + + /** + * Count of prefetch blocks discarded unused: {@value}. + */ + public static final String STREAM_READ_PREFETCH_BLOCKS_DISCARDED + = "stream_read_prefetch_blocks_discarded"; + + /** + * Count of prefetch bytes discarded from unused blocks: {@value}. + * May or may not include bytes from blocks which were partially accessed. + */ + public static final String STREAM_READ_PREFETCH_BYTES_DISCARDED + = "stream_read_prefetch_bytes_discarded"; + + /** + * Count of prefetch blocks evicted, used or unused: {@value}. + */ + public static final String STREAM_READ_PREFETCH_BLOCKS_EVICTED + = "stream_read_prefetch_blocks_evicted"; + /** * Total number of block in disk cache. */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 21501d28f4238..da6251714a2fa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -31,7 +31,6 @@ import java.util.Hashtable; import java.util.List; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.Map; import java.util.Optional; @@ -46,6 +45,7 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -291,7 +291,7 @@ protected CompletableFuture openFileWithOptions( LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path); AbstractFSBuilderImpl.rejectUnknownMandatoryKeys( parameters.getMandatoryKeys(), - Collections.emptySet(), + ConfigurationKeys.KNOWN_OPENFILE_KEYS, "for " + path); return LambdaUtils.eval( new CompletableFuture<>(), () -> @@ -1628,6 +1628,7 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.FS_PERMISSIONS: case CommonPathCapabilities.FS_APPEND: case CommonPathCapabilities.ETAGS_AVAILABLE: + case ConfigurationKeys.FS_AZURE_CAPABILITY_PREFETCH_SAFE: // safe from buffer sharing: return true; case CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME: diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index a59f76b6d0fe0..89c164b42d004 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -18,10 +18,17 @@ package org.apache.hadoop.fs.azurebfs.constants; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; + /** * Responsible to keep all the Azure Blob File System configurations keys in Hadoop configuration file. */ @@ -259,5 +266,27 @@ public static String accountProperty(String property, String account) { * @see FileSystem#openFile(org.apache.hadoop.fs.Path) */ public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; + + /** + * Has the ReadBufferManager fix of HADOOP-18521 been applied? + * This can be queried on {@code hasCapability()} and + * on the filesystem {@code hasPathCapability()} probes. + */ + public static final String FS_AZURE_CAPABILITY_PREFETCH_SAFE = "fs.azure.capability.prefetch.safe"; + + /** + * Known keys for openFile(), including the standard ones. + */ + public static final Set KNOWN_OPENFILE_KEYS; + + static { + Set collect = Stream.of( + FS_AZURE_CAPABILITY_PREFETCH_SAFE, + FS_AZURE_BUFFERED_PREAD_DISABLE) + .collect(Collectors.toSet()); + collect.addAll(FS_OPTION_OPENFILE_STANDARD_OPTIONS); + KNOWN_OPENFILE_KEYS = Collections.unmodifiableSet(collect); + } + private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 8f12484a55c9d..18c94ff0ce4cb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -23,9 +23,14 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -42,7 +47,6 @@ import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.Listener; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; -import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import static java.lang.Math.max; @@ -50,13 +54,17 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore; +import static org.apache.hadoop.util.Preconditions.checkState; import static org.apache.hadoop.util.StringUtils.toLowerCase; /** * The AbfsInputStream for AbfsClient. */ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, - StreamCapabilities, IOStatisticsSource { + StreamCapabilities, IOStatisticsSource, ReadBufferStreamOperations { private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); // Footer size is set to qualify for both ORC and parquet files public static final int FOOTER_SIZE = 16 * ONE_KB; @@ -95,7 +103,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 // of valid bytes in buffer) - private boolean closed = false; + + /** + * Closed flag. + */ + private final AtomicBoolean closed = new AtomicBoolean(false); private TracingContext tracingContext; // Optimisations modify the pointer fields. @@ -113,7 +125,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private Listener listener; private final AbfsInputStreamContext context; - private IOStatistics ioStatistics; + private IOStatisticsStore ioStatistics; /** * This is the actual position within the object, used by * lazy seek to decide whether to seek on the next read or not. @@ -155,9 +167,11 @@ public AbfsInputStream( // Propagate the config values to ReadBufferManager so that the first instance // to initialize can set the readAheadBlockSize ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize); - if (streamStatistics != null) { - ioStatistics = streamStatistics.getIOStatistics(); - } + // if no statistics was passed (happens in some tests) a stub iostatistics + // store is used instead. + ioStatistics = streamStatistics != null + ? streamStatistics.getIOStatistics() + : emptyStatisticsStore(); } public String getPath() { @@ -176,11 +190,7 @@ public int read(long position, byte[] buffer, int offset, int length) // kind of random reads on a shared file input stream will greatly get // benefited by such implementation. // Strict close check at the begin of the API only not for the entire flow. - synchronized (this) { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - } + checkNotClosed(); LOG.debug("pread requested offset = {} len = {} bufferedPreadDisabled = {}", offset, length, bufferedPreadDisabled); if (!bufferedPreadDisabled) { @@ -216,6 +226,7 @@ public int read() throws IOException { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + checkNotClosed(); // check if buffer is null before logging the length if (b != null) { LOG.debug("read requested b.length = {} offset = {} len = {}", b.length, @@ -428,7 +439,7 @@ private void restorePointerState() { private boolean validate(final byte[] b, final int off, final int len) throws IOException { - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } @@ -449,6 +460,12 @@ private boolean validate(final byte[] b, final int off, final int len) private int copyToUserBuffer(byte[] b, int off, int len){ //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) //(bytes returned may be less than requested) + + // unbuffer() or close() may free the buffer during an HTTP call. + // this triggers an NPE in arraycopy(); the check here simply to + // fail more meaningfully + checkState(buffer != null, + "read buffer is null in stream {}", this); int bytesRemaining = limit - bCursor; int bytesToRead = min(len, bytesRemaining); System.arraycopy(buffer, bCursor, b, off, bytesToRead); @@ -467,6 +484,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){ private int readInternal(final long position, final byte[] b, final int offset, final int length, final boolean bypassReadAhead) throws IOException { + checkNotClosed(); if (readAheadEnabled && !bypassReadAhead) { // try reading from read-ahead if (offset != 0) { @@ -515,7 +533,13 @@ private int readInternal(final long position, final byte[] b, final int offset, } } - int readRemote(long position, byte[] b, int offset, int length, TracingContext tracingContext) throws IOException { + @Override + public int readRemote(long position, + byte[] b, + int offset, + int length, + TracingContext tracingContext) throws IOException { + checkNotClosed(); if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } @@ -555,7 +579,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t throw new FileNotFoundException(ere.getMessage()); } } - throw new IOException(ex); + throw ex; } long bytesRead = op.getResult().getBytesReceived(); if (streamStatistics != null) { @@ -566,6 +590,13 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t } LOG.debug("HTTP request read bytes = {}", bytesRead); bytesFromRemoteRead += bytesRead; + + // now, the stream may have been closed during the read. + // this can be traumatic for direct read() as the buffer will + // now be null. + // For readahead it is less than ideal. + checkNotClosed(); + return (int) bytesRead; } @@ -587,7 +618,7 @@ private void incrementReadOps() { @Override public synchronized void seek(long n) throws IOException { LOG.debug("requested seek to position {}", n); - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } if (n < 0) { @@ -608,7 +639,7 @@ public synchronized void seek(long n) throws IOException { @Override public synchronized long skip(long n) throws IOException { - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } long currentPos = getPos(); @@ -641,7 +672,7 @@ public synchronized long skip(long n) throws IOException { */ @Override public synchronized int available() throws IOException { - if (closed) { + if (isClosed()) { throw new IOException( FSExceptionMessages.STREAM_IS_CLOSED); } @@ -659,7 +690,7 @@ public synchronized int available() throws IOException { * @throws IOException if the stream is closed */ public long length() throws IOException { - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } return contentLength; @@ -671,7 +702,7 @@ public long length() throws IOException { */ @Override public synchronized long getPos() throws IOException { - if (closed) { + if (isClosed()) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } return nextReadPos < 0 ? 0 : nextReadPos; @@ -692,11 +723,19 @@ public boolean seekToNewSource(long l) throws IOException { } @Override - public synchronized void close() throws IOException { + public void close() throws IOException { LOG.debug("Closing {}", this); - closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner - ReadBufferManager.getBufferManager().purgeBuffersForStream(this); + if (closed.compareAndSet(false, true)) { + buffer = null; // de-reference the buffer so it can be GC'ed sooner + // Tell the ReadBufferManager to clean up. + ReadBufferManager.getBufferManager().purgeBuffersForStream(this); + } + } + + @Override + @InterfaceAudience.Private + public boolean isClosed() { + return closed.get(); } /** @@ -738,7 +777,15 @@ public synchronized void unbuffer() { @Override public boolean hasCapability(String capability) { - return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability)); + switch (toLowerCase(capability)) { + case StreamCapabilities.UNBUFFER: + case ConfigurationKeys.FS_AZURE_CAPABILITY_PREFETCH_SAFE: // safe from buffer sharing + return true; + case ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE: + return bufferedPreadDisabled; + default: + return false; + } } byte[] getBuffer() { @@ -760,6 +807,7 @@ protected void setCachedSasToken(final CachedSASToken cachedSasToken) { this.cachedSasToken = cachedSasToken; } + @Override @VisibleForTesting public String getStreamID() { return inputStreamId; @@ -817,7 +865,7 @@ public boolean shouldAlwaysReadBufferSize() { } @Override - public IOStatistics getIOStatistics() { + public IOStatisticsStore getIOStatistics() { return ioStatistics; } @@ -827,13 +875,19 @@ public IOStatistics getIOStatistics() { */ @Override public String toString() { - final StringBuilder sb = new StringBuilder(super.toString()); - if (streamStatistics != null) { - sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); - sb.append(streamStatistics.toString()); - sb.append("}"); - } - return sb.toString(); + return "AbfsInputStream{(" + this.hashCode() + ") " + + "path='" + path + '\'' + + ", id=" + inputStreamId + + ", closed=" + closed.get() + + ", contentLength=" + contentLength + + ", nextReadPos=" + nextReadPos + + ", readAheadQueueDepth=" + readAheadQueueDepth + + ", readAheadEnabled=" + readAheadEnabled + + ", bufferedPreadDisabled=" + bufferedPreadDisabled + + ", firstRead=" + firstRead + + ", fCursor=" + fCursor + + ", " + ioStatisticsToPrettyString(retrieveIOStatistics(streamStatistics)) + + "}"; } @VisibleForTesting @@ -855,4 +909,14 @@ long getFCursorAfterLastRead() { long getLimit() { return this.limit; } + + /** + * Verify that the input stream is open. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (isClosed()) { + throw new PathIOException(getPath(), FSExceptionMessages.STREAM_IS_CLOSED); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java index 00663467fe233..024b0dbf748fd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java @@ -19,8 +19,8 @@ package org.apache.hadoop.fs.azurebfs.services; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; /** * Interface for statistics for the AbfsInputStream. @@ -99,7 +99,7 @@ public interface AbfsInputStreamStatistics extends IOStatisticsSource { * Get the IOStatisticsStore instance from AbfsInputStreamStatistics. * @return instance of IOStatisticsStore which extends IOStatistics. */ - IOStatistics getIOStatistics(); + IOStatisticsStore getIOStatistics(); /** * Makes the string of all the AbfsInputStream statistics. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java index f03ea1913e259..243fc163a46b4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java @@ -20,13 +20,20 @@ import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_EVICTED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; /** @@ -48,9 +55,15 @@ public class AbfsInputStreamStatisticsImpl StreamStatisticNames.BYTES_READ_BUFFER, StreamStatisticNames.REMOTE_READ_OP, StreamStatisticNames.READ_AHEAD_BYTES_READ, - StreamStatisticNames.REMOTE_BYTES_READ - ) - .withDurationTracking(ACTION_HTTP_GET_REQUEST) + StreamStatisticNames.REMOTE_BYTES_READ, + STREAM_READ_PREFETCH_BLOCKS_USED, + STREAM_READ_PREFETCH_BYTES_USED, + STREAM_READ_PREFETCH_BLOCKS_DISCARDED, + STREAM_READ_PREFETCH_BYTES_DISCARDED, + STREAM_READ_PREFETCH_BLOCKS_EVICTED) + .withGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS) + .withDurationTracking(ACTION_HTTP_GET_REQUEST, + STREAM_READ_PREFETCH_OPERATIONS) .build(); /* Reference to the atomic counter for frequently updated counters to avoid @@ -183,7 +196,7 @@ public void remoteReadOperation() { * @return IOStatisticsStore instance which extends IOStatistics. */ @Override - public IOStatistics getIOStatistics() { + public IOStatisticsStore getIOStatistics() { return ioStatisticsStore; } @@ -270,7 +283,7 @@ public double getActionHttpGetRequest() { public String toString() { final StringBuilder sb = new StringBuilder( "StreamStatistics{"); - sb.append(ioStatisticsStore.toString()); + sb.append(ioStatisticsToPrettyString(ioStatisticsStore)); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index 9ce926d841c84..ab0e7b6ef3443 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -23,12 +23,23 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; -import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_EVICTED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; class ReadBuffer { - private AbfsInputStream stream; + /** + * Stream operations. + */ + private ReadBufferStreamOperations stream; private long offset; // offset within the file for the buffer private int length; // actual length, set after the buffer is filles private int requestedLength; // requested length of the read @@ -47,11 +58,11 @@ class ReadBuffer { private IOException errException = null; - public AbfsInputStream getStream() { + public ReadBufferStreamOperations getStream() { return stream; } - public void setStream(AbfsInputStream stream) { + public void setStream(ReadBufferStreamOperations stream) { this.stream = stream; } @@ -103,6 +114,13 @@ public void setBufferindex(int bufferindex) { this.bufferindex = bufferindex; } + /** + * Does the buffer index refer to a valid buffer. + * @return true if the buffer index is to an entry in the array. + */ + public boolean hasIndexedBuffer() { + return getBufferindex() >= 0; + } public IOException getErrException() { return errException; } @@ -115,11 +133,13 @@ public ReadBufferStatus getStatus() { return status; } + /** + * Update the read status. + * If it was a read failure, the buffer index is set to -1; + * @param status status + */ public void setStatus(ReadBufferStatus status) { this.status = status; - if (status == READ_FAILED) { - bufferindex = -1; - } } public CountDownLatch getLatch() { @@ -162,4 +182,119 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) { this.isAnyByteConsumed = isAnyByteConsumed; } + @Override + public String toString() { + return super.toString() + + "{ status=" + status + + ", offset=" + offset + + ", length=" + length + + ", requestedLength=" + requestedLength + + ", bufferindex=" + bufferindex + + ", timeStamp=" + timeStamp + + ", isFirstByteConsumed=" + isFirstByteConsumed + + ", isLastByteConsumed=" + isLastByteConsumed + + ", isAnyByteConsumed=" + isAnyByteConsumed + + ", errException=" + errException + + ", stream=" + (stream != null ? stream.getStreamID() : "none") + + ", stream closed=" + isStreamClosed() + + ", latch=" + latch + + '}'; + } + + /** + * Is the stream closed. + * @return stream closed status. + */ + public boolean isStreamClosed() { + return stream != null && stream.isClosed(); + } + + /** + * IOStatistics of stream. + * @return the stream's IOStatisticsStore. + */ + public IOStatisticsStore getStreamIOStatistics() { + return stream.getIOStatistics(); + } + + /** + * Start using the buffer. + * Sets the byte consumption flags as appriopriate, then + * updates the stream statistics with the use of this buffer. + * @param offset offset in buffer where copy began + * @param bytesCopied bytes copied. + */ + void dataConsumedByStream(int offset, int bytesCopied) { + boolean isFirstUse = !isAnyByteConsumed; + setAnyByteConsumed(true); + if (offset == 0) { + setFirstByteConsumed(true); + } + if (offset + bytesCopied == getLength()) { + setLastByteConsumed(true); + } + IOStatisticsStore iostats = getStreamIOStatistics(); + if (isFirstUse) { + // first use, update the use + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_USED, 1); + } + // every use, update the count of bytes read + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_USED, bytesCopied); + } + + /** + * The (completed) buffer was evicted; update stream statistics + * as appropriate. + */ + void evicted() { + IOStatisticsStore iostats = getStreamIOStatistics(); + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED, 1); + if (getBufferindex() >= 0 && !isAnyByteConsumed()) { + // nothing was read, so consider it discarded. + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1); + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, getLength()); + } + } + + /** + * The (completed) buffer was discarded; no data was read. + */ + void discarded() { + if (getBufferindex() >= 0) { + IOStatisticsStore iostats = getStreamIOStatistics(); + iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1); + iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, getLength()); + } + } + + /** + * Release the buffer: update fields as appropriate. + */ + void releaseBuffer() { + setBuffer(null); + setBufferindex(-1); + } + + /** + * Prefetch started -update stream statistics. + */ + void prefetchStarted() { + getStreamIOStatistics().incrementGauge(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1); + } + + /** + * Prefetch started -update stream statistics. + */ + void prefetchFinished() { + getStreamIOStatistics().incrementGauge(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1); + } + + /** + * Get a duration tracker for the prefetch. + * @return a duration tracker. + */ + DurationTracker trackPrefetchOperation() { + return getStreamIOStatistics().trackDuration(STREAM_READ_PREFETCH_OPERATIONS); + } + } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index ac84f0b27cf12..6efd45c34d424 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -30,11 +30,14 @@ import java.util.Queue; import java.util.Stack; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.util.Preconditions.checkState; + /** * The Read Buffer Manager for Rest AbfsClient. */ @@ -43,7 +46,7 @@ final class ReadBufferManager { private static final int ONE_KB = 1024; private static final int ONE_MB = ONE_KB * ONE_KB; - private static final int NUM_BUFFERS = 16; + static final int NUM_BUFFERS = 16; private static final int NUM_THREADS = 8; private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold @@ -51,6 +54,12 @@ final class ReadBufferManager { private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; private Thread[] threads = new Thread[NUM_THREADS]; private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + + /** + * map of {@link #buffers} to {@link ReadBuffer} using it. + */ + private ReadBuffer[] bufferOwners; + private Stack freeList = new Stack<>(); // indices in buffers[] array that are available private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet @@ -59,7 +68,22 @@ final class ReadBufferManager { private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block private static final ReentrantLock LOCK = new ReentrantLock(); - static ReadBufferManager getBufferManager() { + /** + * How many completed blocks were discarded when a stream was closed? + */ + private final AtomicLong completedBlocksDiscarded = new AtomicLong(); + + /** + * How many queued blocks were discarded when a stream was closed? + */ + private final AtomicLong queuedBlocksDiscarded = new AtomicLong(); + + /** + * How many in progress blocks were discarded when a stream was closed? + */ + private final AtomicLong inProgressBlocksDiscarded = new AtomicLong(); + + public static ReadBufferManager getBufferManager() { if (bufferManager == null) { LOCK.lock(); try { @@ -85,6 +109,7 @@ static void setReadBufferManagerConfigs(int readAheadBlockSize) { private void init() { buffers = new byte[NUM_BUFFERS][]; + bufferOwners = new ReadBuffer[NUM_BUFFERS]; for (int i = 0; i < NUM_BUFFERS; i++) { buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC freeList.add(i); @@ -113,12 +138,13 @@ private ReadBufferManager() { /** * {@link AbfsInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link AbfsInputStream} for which to do the read-ahead + * @param stream The {@link AbfsInputStream} for which to do the read-ahead * @param requestedOffset The offset in the file which shoukd be read * @param requestedLength The length to read + * @return the queued read */ - void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + @VisibleForTesting + ReadBuffer queueReadAhead(final ReadBufferStreamOperations stream, final long requestedOffset, final int requestedLength, TracingContext tracingContext) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", @@ -127,10 +153,10 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi ReadBuffer buffer; synchronized (this) { if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again + return null; } if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything + return null; } buffer = new ReadBuffer(); @@ -142,10 +168,8 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi buffer.setLatch(new CountDownLatch(1)); buffer.setTracingContext(tracingContext); - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); + int bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + takeOwnershipOfBufferAtIndex(buffer, bufferIndex); readAheadQueue.add(buffer); notifyAll(); if (LOGGER.isTraceEnabled()) { @@ -153,6 +177,7 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi stream.getPath(), requestedOffset, buffer.getBufferindex()); } } + return buffer; } @@ -225,8 +250,8 @@ private void waitForProcess(final AbfsInputStream stream, final long position) { Thread.currentThread().interrupt(); } if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); + LOGGER.trace("latch done for file {} buffer {}", + stream.getPath(), readBuf); } } } @@ -239,15 +264,22 @@ private void waitForProcess(final AbfsInputStream stream, final long position) { */ private synchronized boolean tryEvict() { ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { + if (completedReadList.isEmpty()) { return false; // there are no evict-able buffers } long currentTimeInMs = currentTimeMillis(); - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + // first, look for easy targets for (ReadBuffer buf : completedReadList) { + if (buf.isStreamClosed() && buf.hasIndexedBuffer()) { + // the stream was closed since this was added to the completed + // list (it would not have a buffer if the stream was closed before) + nodeToEvict = buf; + break; + } if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + // buffers where all bytes have been consumed (approximated as first and last bytes consumed) nodeToEvict = buf; break; } @@ -278,12 +310,13 @@ private synchronized boolean tryEvict() { long earliestBirthday = Long.MAX_VALUE; ArrayList oldFailedBuffers = new ArrayList<>(); for (ReadBuffer buf : completedReadList) { - if ((buf.getBufferindex() != -1) + if (buf.hasIndexedBuffer() && (buf.getTimeStamp() < earliestBirthday)) { nodeToEvict = buf; earliestBirthday = buf.getTimeStamp(); - } else if ((buf.getBufferindex() == -1) - && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) { + } else if (!buf.hasIndexedBuffer() + && ((currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) + || buf.isStreamClosed()) { oldFailedBuffers.add(buf); } } @@ -302,33 +335,33 @@ private synchronized boolean tryEvict() { } private boolean evict(final ReadBuffer buf) { - // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, - // avoid adding it to freeList. - if (buf.getBufferindex() != -1) { - freeList.push(buf.getBufferindex()); - } - - completedReadList.remove(buf); buf.setTracingContext(null); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); + LOGGER.trace("Evicting buffer {}", buf); + } + completedReadList.remove(buf); + // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, + // avoid adding it to freeList. + if (buf.hasIndexedBuffer()) { + placeBufferOnFreeList("eviction", buf); } + // tell the buffer it was evicted so should update its statistics + buf.evicted(); return true; } - private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { + private boolean isAlreadyQueued(final ReadBufferStreamOperations stream, final long requestedOffset) { // returns true if any part of the buffer is already queued return (isInList(readAheadQueue, stream, requestedOffset) || isInList(inProgressList, stream, requestedOffset) || isInList(completedReadList, stream, requestedOffset)); } - private boolean isInList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + private boolean isInList(final Collection list, final ReadBufferStreamOperations stream, final long requestedOffset) { return (getFromList(list, stream, requestedOffset) != null); } - private ReadBuffer getFromList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + private ReadBuffer getFromList(final Collection list, final ReadBufferStreamOperations stream, final long requestedOffset) { for (ReadBuffer buffer : list) { if (buffer.getStream() == stream) { if (buffer.getStatus() == ReadBufferStatus.AVAILABLE @@ -350,7 +383,7 @@ private ReadBuffer getFromList(final Collection list, final AbfsInpu * @param requestedOffset * @return */ - private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { + private ReadBuffer getBufferFromCompletedQueue(final ReadBufferStreamOperations stream, final long requestedOffset) { for (ReadBuffer buffer : completedReadList) { // Buffer is returned if the requestedOffset is at or above buffer's // offset but less than buffer's length or the actual requestedLength @@ -365,16 +398,136 @@ private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, fin return null; } - private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { + private void clearFromReadAheadQueue(final ReadBufferStreamOperations stream, final long requestedOffset) { ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); if (buffer != null) { readAheadQueue.remove(buffer); notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); + placeBufferOnFreeList("clear from readahead", buffer); } } - private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, + /** + * Add a buffer to the free list. + * @param reason reason for eviction + * @param readBuffer read buffer which owns the buffer to free + */ + private void placeBufferOnFreeList(final String reason, final ReadBuffer readBuffer) { + int index = readBuffer.getBufferindex(); + LOGGER.debug("Returning buffer index {} to free list for '{}'; owner {}", + index, reason, readBuffer); + checkState(readBuffer.hasIndexedBuffer(), + "ReadBuffer buffer release for %s has no allocated buffer to free: %s", + reason, readBuffer); + checkState(!freeList.contains(index), + "Duplicate buffer %d added to free buffer list for '%s' by %s", + index, reason, readBuffer); + verifyReadBufferOwnsBufferAtIndex(readBuffer, index); + // declare it as unowned + bufferOwners[index] = null; + // set the buffer to null, because it is now free + // for other operations. + readBuffer.releaseBuffer(); + // once it is not owned/referenced, make available to others + freeList.push(index); + } + + /** + * Verify that at given read ReadBuffer a buffer. + * @param readBuffer read operation taking ownership + * @param index index of buffer in buffer array + */ + void verifyReadBufferOwnsBufferAtIndex(final ReadBuffer readBuffer, final int index) { + checkState(readBuffer == bufferOwner(index), + "ReadBufferManager buffer %s is not owned by %s", index, readBuffer); + } + + /** + * Verify a read buffer owns its indexed buffer. + * @param readBuffer read buffer to validate. + * @throws IllegalStateException if ownership not satisified. + */ + void verifyIsOwnerOfBuffer(final ReadBuffer readBuffer) { + checkState(readBuffer.hasIndexedBuffer(), "no buffer owned by %s", readBuffer); + verifyReadBufferOwnsBufferAtIndex(readBuffer, readBuffer.getBufferindex()); + } + + /** + * Take ownership of a buffer. + * This updates the {@link #bufferOwners} array. + * @param readBuffer read operation taking ownership + * @param index index of buffer in buffer array + */ + private void takeOwnershipOfBufferAtIndex(final ReadBuffer readBuffer, int index) { + checkState(null == bufferOwners[index], + "Buffer %d requested by %s already owned by %s", + index, readBuffer, bufferOwners[index]); + readBuffer.setBuffer(buffers[index]); + readBuffer.setBufferindex(index); + bufferOwners[index] = readBuffer; + } + + /** + * Verify a buffer is not in use anywhere. + * @param index buffer index. + * @throws IllegalStateException if the state is invalid. + */ + private void verifyByteBufferNotInUse(final int index) { + verifyByteBufferNotInCollection("completedReadList", index, completedReadList); + verifyByteBufferNotInCollection("inProgressList", index, inProgressList); + verifyByteBufferNotInCollection("readAheadQueue", index, readAheadQueue); + } + + /** + * Verify that a buffer is not referenced in the supplied collection. + * @param name collection name for exceptions. + * @param index buffer index. + * @param collection collection to validate + * @throws IllegalStateException if the state is invalid. + */ + private void verifyByteBufferNotInCollection(String name, int index, + Collection collection) { + checkState(collection.stream().noneMatch(rb -> rb.getBufferindex() == index), + "Buffer index %d found in buffer collection %s", index, name); + } + + /** + * Verify that a read buffer is not referenced in the supplied collection. + * @param name collection name for exceptions. + * @param rb read buffer + * @param collection collection to validate + * @throws IllegalStateException if the state is invalid. + */ + private void verifyReadBufferNotInCollection(String name, + ReadBuffer rb, + Collection collection) { + checkState(!collection.contains(rb), + "Collection %s contains buffer %s", name, rb); + } + + /** + * Validate all invariants of the read manager state. + * @throws IllegalStateException if the state is invalid. + */ + @VisibleForTesting + public synchronized void validateReadManagerState() { + // all buffers in free list are not in any of the other lists + freeList.forEach(this::verifyByteBufferNotInUse); + + // there is no in progress buffer in the other queues + inProgressList.forEach(rb -> { + verifyReadBufferNotInCollection("completedReadList", rb, + completedReadList); + verifyReadBufferNotInCollection("readAheadQueue", rb, readAheadQueue); + }); + // nothing completed is in the readahead queue + completedReadList.forEach(rb -> { + verifyReadBufferNotInCollection("readAheadQueue", rb, readAheadQueue); + }); + + } + + private int getBlockFromCompletedQueue(final ReadBufferStreamOperations stream, final long position, final int length, final byte[] buffer) throws IOException { ReadBuffer buf = getBufferFromCompletedQueue(stream, position); @@ -401,13 +554,7 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long int availableLengthInBuffer = buf.getLength() - cursor; int lengthToCopy = Math.min(length, availableLengthInBuffer); System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); + buf.dataConsumedByStream(cursor, lengthToCopy); return lengthToCopy; } @@ -435,6 +582,9 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { if (buffer == null) { return null; // should never happen } + // verify buffer index is owned. + verifyIsOwnerOfBuffer(buffer); + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); inProgressList.add(buffer); } @@ -442,6 +592,9 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { LOGGER.trace("ReadBufferWorker picked file {} for offset {}", buffer.getStream().getPath(), buffer.getOffset()); } + validateReadManagerState(); + // update stream gauge. + buffer.prefetchStarted(); return buffer; } @@ -454,31 +607,104 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { */ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setLength(bytesActuallyRead); - } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}; {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead, buffer); + } + // decrement counter. + buffer.prefetchFinished(); + + try { + synchronized (this) { + // remove from the list + if (!inProgressList.remove(buffer)) { + // this is a sign of inconsistent state, so a major problem, such as + // double invocation of this method with the same buffer. + String message = + String.format("Read completed from an operation not declared as in progress %s", + buffer); + LOGGER.warn(message); + if (buffer.hasIndexedBuffer()) { + // release the buffer (which may raise an exception) + placeBufferOnFreeList("read not in progress", buffer); + } + // report the failure + throw new IllegalStateException(message); } - // completed list also contains FAILED read buffers - // for sending exception message to clients. + + // does the buffer own the array it has just written to? + // this is a significant issue. + verifyIsOwnerOfBuffer(buffer); + // should the read buffer be added to the completed list? + boolean addCompleted; + // flag to indicate buffer should be freed + boolean shouldFreeBuffer = false; + // and the reason (for logging) + String freeBufferReason = ""; + buffer.setStatus(result); buffer.setTimeStamp(currentTimeMillis()); - completedReadList.add(buffer); + // did the read return any data? + if (result == ReadBufferStatus.AVAILABLE) { + if (bytesActuallyRead > 0) { + + // successful read of data; + addCompleted = true; + + // update buffer state. + buffer.setLength(bytesActuallyRead); + } else { + // there was no data; the buffer can be returned to the free list. + shouldFreeBuffer = true; + freeBufferReason = "no data"; + // don't + addCompleted = false; + } + } else { + // read failed or there was no data; the buffer can be returned to the free list. + shouldFreeBuffer = true; + freeBufferReason = "failed read"; + // completed list also contains FAILED read buffers + // for sending exception message to clients. + // NOTE: checks for closed state may update this. + addCompleted = true; + } + + // now check for closed streams, which are discarded + // and the buffers freed immediately, irrespective of + // outcome + if (buffer.isStreamClosed()) { + // stream was closed during the read. + // even if there is data, it should be discarded + LOGGER.trace("Discarding prefetch on closed stream {}", buffer); + inProgressBlocksDiscarded.incrementAndGet(); + // don't add + addCompleted = false; + // and the buffer recycled + shouldFreeBuffer = true; + freeBufferReason = "stream closed"; + } + if (shouldFreeBuffer) { + buffer.discarded(); + // buffer should be returned to the free list. + placeBufferOnFreeList(freeBufferReason, buffer); + } + if (addCompleted) { + // add to the completed list. + LOGGER.trace("Adding buffer to completed list {}", buffer); + completedReadList.add(buffer); + } } + } catch (IllegalStateException e) { + // update this for tests to validate + buffer.setStatus(ReadBufferStatus.READ_FAILED); + buffer.setErrException(new IOException(e)); + throw e; + } finally { + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + LOGGER.trace("releasing latch {}", buffer.getLatch()); + buffer.getLatch().countDown(); // wake up waiting threads (if any) } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) } /** @@ -530,48 +756,127 @@ public synchronized List getInProgressCopiedList() { } @VisibleForTesting - void callTryEvict() { - tryEvict(); + boolean callTryEvict() { + return tryEvict(); } - /** * Purging the buffers associated with an {@link AbfsInputStream} * from {@link ReadBufferManager} when stream is closed. + * Before HADOOP-18521 this would purge in progress reads, which + * would return the active buffer to the free pool while it was + * still in use. * @param stream input stream. */ - public synchronized void purgeBuffersForStream(AbfsInputStream stream) { - LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); + public synchronized void purgeBuffersForStream(ReadBufferStreamOperations stream) { + LOGGER.debug("Purging stale buffers for AbfsInputStream {}/{}", + stream.getStreamID(), stream.getPath()); + + // remove from the queue + int before = readAheadQueue.size(); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); - purgeList(stream, completedReadList); + int readaheadPurged = readAheadQueue.size() - before; + queuedBlocksDiscarded.addAndGet(readaheadPurged); + + // all completed entries + int completedPurged = purgeCompletedReads(stream); + completedBlocksDiscarded.addAndGet(completedPurged); + + // print a summary + LOGGER.debug("Purging outcome readahead={}, completed={} for {}", + readaheadPurged, completedPurged, stream); + validateReadManagerState(); } /** * Method to remove buffers associated with a {@link AbfsInputStream} - * when its close method is called. + * from the completed read list. + * Any allocated buffers will be returned to the pool. + * The buffers will have {@link ReadBuffer#evicted()} called to let + * them update their statistics. * NOTE: This method is not threadsafe and must be called inside a * synchronised block. See caller. * @param stream associated input stream. - * @param list list of buffers like {@link this#completedReadList} - * or {@link this#inProgressList}. */ - private void purgeList(AbfsInputStream stream, LinkedList list) { - for (Iterator it = list.iterator(); it.hasNext();) { + private int purgeCompletedReads(ReadBufferStreamOperations stream) { + int purged = 0; + for (Iterator it = completedReadList.iterator(); it.hasNext();) { ReadBuffer readBuffer = it.next(); if (readBuffer.getStream() == stream) { + purged++; it.remove(); // As failed ReadBuffers (bufferIndex = -1) are already pushed to free // list in doneReading method, we will skip adding those here again. - if (readBuffer.getBufferindex() != -1) { - freeList.push(readBuffer.getBufferindex()); + if (readBuffer.hasIndexedBuffer()) { + placeBufferOnFreeList("purge completed reads", readBuffer); } + // tell the buffer it was evicted so should update its statistics + readBuffer.evicted(); } } + return purged; + } + + /** + * Get the count of completed blocks discarded. + * @return the number of completed blocks discarded. + */ + @VisibleForTesting + public long getCompletedBlocksDiscarded() { + return completedBlocksDiscarded.get(); + } + + /** + * Get the count of completed blocks discarded. + * @return the number of queued block reads discarded. + */ + @VisibleForTesting + public long getQueuedBlocksDiscarded() { + return queuedBlocksDiscarded.get(); + } + + /** + * Get the count of in progress read blocks discarded. + * @return the number of blocks being read discarded. + */ + @VisibleForTesting + public long getInProgressBlocksDiscarded() { + return inProgressBlocksDiscarded.get(); + } + + @VisibleForTesting + public void resetBlocksDiscardedCounters() { + completedBlocksDiscarded.set(0); + queuedBlocksDiscarded.set(0); + inProgressBlocksDiscarded.set(0); + } + + /** + * Look up the owner of a buffer. + * @param index index of the buffer. + * @return the buffer owner, null if the buffer is free. + */ + ReadBuffer bufferOwner(int index) { + return bufferOwners[index]; + } + + @Override + public String toString() { + return "ReadBufferManager{" + + "readAheadQueue=" + readAheadQueue.size() + + ", inProgressList=" + inProgressList.size() + + ", completedReadList=" + completedReadList.size() + + ", completedBlocksDiscarded=" + completedBlocksDiscarded + + ", queuedBlocksDiscarded=" + queuedBlocksDiscarded + + ", inProgressBlocksDiscarded=" + inProgressBlocksDiscarded + + '}'; } /** * Test method that can clean up the current state of readAhead buffers and * the lists. Will also trigger a fresh init. + * Note: will cause problems with the static shared class if multiple tests + * are running in the same VM. */ @VisibleForTesting void testResetReadBufferManager() { @@ -593,6 +898,7 @@ void testResetReadBufferManager() { freeList.clear(); for (int i = 0; i < NUM_BUFFERS; i++) { buffers[i] = null; + bufferOwners[i] = null; } buffers = null; resetBufferManager(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java new file mode 100644 index 0000000000000..ffee632c00984 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; + +/** + * Interface which is required for read buffer stream + * calls. + * Extracted from {@code AbfsInputStream} to make testing + * easier and to isolate what operations the read buffer + * makes of the streams using it. + */ +interface ReadBufferStreamOperations { + + /** + * Read a block from the store. + * @param position position in file + * @param b destination buffer. + * @param offset offset in buffer + * @param length length of read + * @param tracingContext trace context + * @return count of bytes read. + * @throws IOException failure. + */ + int readRemote(long position, + byte[] b, + int offset, + int length, + TracingContext tracingContext) throws IOException; + + /** + * Is the stream closed? + * This must be thread safe as prefetch operations in + * different threads probe this before closure. + * @return true if the stream has been closed. + */ + boolean isClosed(); + + String getStreamID(); + + IOStatisticsStore getIOStatistics(); + + /** + * Get the stream path as a string. + * @return path string. + */ + String getPath(); + + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index a30f06261ef6f..7a0651baf1418 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -21,14 +21,26 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.statistics.DurationTracker; + +import static org.apache.hadoop.util.Preconditions.checkState; class ReadBufferWorker implements Runnable { + private static final Logger LOGGER = + LoggerFactory.getLogger(ReadBufferWorker.class); + protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); private int id; + private boolean doneReadingInvoked; + ReadBufferWorker(final int id) { this.id = id; } @@ -61,27 +73,93 @@ public void run() { return; } if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote( - buffer.getOffset(), - buffer.getBuffer(), - 0, - // If AbfsInputStream was created with bigger buffer size than - // read-ahead buffer size, make sure a valid length is passed - // for remote read - Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), - buffer.getTracingContext()); - - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (IOException ex) { - buffer.setErrException(ex); - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } catch (Exception ex) { - buffer.setErrException(new PathIOException(buffer.getStream().getPath(), ex)); - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } + fetchBuffer(bufferManager, buffer); } } } + + /** + * Fetch the data for the buffer. + * @param bufferManager buffer manager. + * @param buffer the buffer to read in. + */ + @VisibleForTesting + void fetchBuffer(final ReadBufferManager bufferManager, final ReadBuffer buffer) { + LOGGER.trace("Reading {}", buffer); + doneReadingInvoked = false; + // Stop network call if stream is closed + if (postFailureWhenStreamClosed(bufferManager, buffer)) { + // stream closed before the HTTP request was initiated. + // Immediately move to the next request in the queue. + return; + } + checkState(buffer.hasIndexedBuffer(), + "ReadBuffer buffer has no allocated buffer to read into: %s", + buffer); + // input stream is updated with count/duration of prefetching + DurationTracker tracker = buffer.trackPrefetchOperation(); + try { + // do the actual read, from the file. + int bytesRead = buffer.getStream().readRemote( + buffer.getOffset(), + buffer.getBuffer(), + 0, + // If AbfsInputStream was created with bigger buffer size than + // read-ahead buffer size, make sure a valid length is passed + // for remote read + Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), + buffer.getTracingContext()); + LOGGER.trace("Read {} bytes", bytesRead); + + // Update failure to completed list if stream is closed + if (!postFailureWhenStreamClosed(bufferManager, buffer)) { + // post result back to ReadBufferManager + doneReading(bufferManager, buffer, ReadBufferStatus.AVAILABLE, bytesRead); + } + tracker.close(); + } catch (Exception ex) { + tracker.failed(); + IOException ioe = ex instanceof IOException + ? (IOException) ex + : new PathIOException(buffer.getStream().getPath(), ex); + buffer.setErrException(ioe); + LOGGER.debug("prefetch failure for {}", buffer, ex); + + // if doneReading hasn't already been called for this iteration, call it. + // the check ensures that any exception raised in doneReading() doesn't trigger + // a second attempt. + if (!doneReadingInvoked) { + doneReading(bufferManager, buffer, ReadBufferStatus.READ_FAILED, 0); + } + } + } + + /** + * Check for the owning stream being closed; if so it + * reports it to the buffer manager and then returns "true". + * @param bufferManager buffer manager + * @param buffer buffer to review + * @return true if the stream is closed + */ + private boolean postFailureWhenStreamClosed( + ReadBufferManager bufferManager, + ReadBuffer buffer) { + + // When stream is closed report failure to be picked by eviction + if (buffer.isStreamClosed()) { + LOGGER.debug("Stream closed; failing read"); + // Fail read + doneReading(bufferManager, buffer, ReadBufferStatus.READ_FAILED, 0); + return true; + } + return false; + } + + private void doneReading(final ReadBufferManager bufferManager, + final ReadBuffer buffer, + final ReadBufferStatus result, + final int bytesActuallyRead) { + doneReadingInvoked = true; + bufferManager.doneReading(buffer, result, bytesActuallyRead); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index beada775ae87b..a52ad4dcaf137 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Random; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -35,6 +36,8 @@ import org.apache.hadoop.fs.statistics.IOStatisticsSource; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CAPABILITY_PREFETCH_SAFE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; @@ -89,6 +92,10 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { new Random().nextBytes(b); Path testPath = path(TEST_PATH); + Assertions.assertThat(fs.hasPathCapability(testPath, FS_AZURE_CAPABILITY_PREFETCH_SAFE)) + .describedAs("path capability %s on filesystem %s", FS_AZURE_CAPABILITY_PREFETCH_SAFE, fs) + .isTrue(); + FSDataOutputStream stream = fs.create(testPath); try { stream.write(b); @@ -100,13 +107,28 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { final byte[] readBuffer = new byte[2 * bufferSize]; int result; IOStatisticsSource statisticsSource = null; - try (FSDataInputStream inputStream = fs.open(testPath)) { + try (FSDataInputStream inputStream = fs.open(testPath); + FSDataInputStream stream2 = fs.openFile(testPath) + .must(FS_AZURE_CAPABILITY_PREFETCH_SAFE, "") + .must(FS_AZURE_BUFFERED_PREAD_DISABLE, true) + .build().get()) { statisticsSource = inputStream; ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.READ, true, 0, ((AbfsInputStream) inputStream.getWrappedStream()) .getStreamID())); + // check stream 2 capabilities + Assertions.assertThat(stream2.hasCapability(FS_AZURE_CAPABILITY_PREFETCH_SAFE)) + .describedAs("Stream capability %s on stream %s", + FS_AZURE_CAPABILITY_PREFETCH_SAFE, stream2) + .isTrue(); + Assertions.assertThat(stream2.hasCapability(FS_AZURE_BUFFERED_PREAD_DISABLE)) + .describedAs("Stream capability %s on stream %s", + FS_AZURE_BUFFERED_PREAD_DISABLE, stream2) + .isTrue(); + // trigger read at byte 0 on second input stream + stream2.read(); inputStream.seek(bufferSize); result = inputStream.read(readBuffer, bufferSize, bufferSize); assertNotEquals(-1, result); @@ -118,11 +140,16 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); + assertNotEquals("data read in final read()", -1, result); + assertArrayEquals(readBuffer, b); + inputStream.close(); + + // now look at stream2, including all prefetching + stream2.readFully(readBuffer, 0, bufferSize); + // and a bit of positioned read + stream2.readFully(bufferSize + 1, readBuffer, 0, bufferSize-1); } logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); - - assertNotEquals("data read in final read()", -1, result); - assertArrayEquals(readBuffer, b); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 66f072501dc4d..ab7f94581d44c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -21,25 +21,40 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.Map; +import java.util.Optional; import java.util.Random; +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.OpenFileParameters; + +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest { @@ -255,4 +270,120 @@ private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos) thro assertEquals(0, abfsInputStream.getLimit()); assertEquals(0, abfsInputStream.getBCursor()); } + + private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException { + AzureBlobFileSystem fs = getFileSystem(); + fs.create(testFile); + FSDataOutputStream out = fs.append(testFile); + out.write(buffer); + out.close(); + } + + private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus, + byte[] buf, AbfsRestOperationType source) + throws IOException, ExecutionException, InterruptedException { + byte[] readBuf = new byte[buf.length]; + AzureBlobFileSystem fs = getFileSystem(); + FutureDataInputStreamBuilder builder = fs.openFile(path); + builder.withFileStatus(fileStatus); + FSDataInputStream in = builder.build().get(); + assertEquals(String.format( + "Open with fileStatus [from %s result]: Incorrect number of bytes read", + source), buf.length, in.read(readBuf)); + assertArrayEquals(String + .format("Open with fileStatus [from %s result]: Incorrect read data", + source), readBuf, buf); + } + + private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, + AzureBlobFileSystemStore abfsStore, AbfsClient mockClient, + AbfsRestOperationType source, TracingContext tracingContext) + throws IOException { + + // verify GetPathStatus not invoked when FileStatus is provided + abfsStore.openFileForRead(testFile, Optional + .ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext); + verify(mockClient, times(0).description((String.format( + "FileStatus [from %s result] provided, GetFileStatus should not be invoked", + source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + // verify GetPathStatus invoked when FileStatus not provided + abfsStore.openFileForRead(testFile, + Optional.empty(), null, + tracingContext); + verify(mockClient, times(1).description( + "GetPathStatus should be invoked when FileStatus not provided")) + .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class)); + + Mockito.reset(mockClient); //clears invocation count for next test case + } + + @Test + public void testOpenFileWithOptions() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + String testFolder = "/testFolder"; + Path smallTestFile = new Path(testFolder + "/testFile0"); + Path largeTestFile = new Path(testFolder + "/testFile1"); + fs.mkdirs(new Path(testFolder)); + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] smallBuffer = new byte[5]; + byte[] largeBuffer = new byte[readBufferSize + 5]; + new Random().nextBytes(smallBuffer); + new Random().nextBytes(largeBuffer); + writeBufferToNewFile(smallTestFile, smallBuffer); + writeBufferToNewFile(largeTestFile, largeBuffer); + + FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile), + fs.getFileStatus(largeTestFile)}; + FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder)); + + // open with fileStatus from GetPathStatus + verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0], + smallBuffer, AbfsRestOperationType.GetPathStatus); + verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1], + largeBuffer, AbfsRestOperationType.GetPathStatus); + + // open with fileStatus from ListStatus + verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer, + AbfsRestOperationType.ListPaths); + verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer, + AbfsRestOperationType.ListPaths); + + // verify number of GetPathStatus invocations + AzureBlobFileSystemStore abfsStore = getAbfsStore(fs); + AbfsClient mockClient = spy(getAbfsClient(abfsStore)); + setAbfsClient(abfsStore, mockClient); + TracingContext tracingContext = getTestTracingContext(fs, false); + checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext); + checkGetPathStatusCalls(smallTestFile, listStatusResults[0], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + checkGetPathStatusCalls(largeTestFile, listStatusResults[1], + abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext); + + // Verify with incorrect filestatus + getFileStatusResults[0].setPath(new Path("wrongPath")); + intercept(ExecutionException.class, + () -> verifyOpenWithProvidedStatus(smallTestFile, + getFileStatusResults[0], smallBuffer, + AbfsRestOperationType.GetPathStatus)); + } + + @Test + public void testDefaultReadaheadQueueDepth() throws Exception { + Configuration config = getRawConfiguration(); + config.unset(FS_AZURE_READ_AHEAD_QUEUE_DEPTH); + AzureBlobFileSystem fs = getFileSystem(config); + Path testFile = path("/testFile"); + fs.create(testFile).close(); + FSDataInputStream in = fs.open(testFile); + Assertions.assertThat( + ((AbfsInputStream) in.getWrappedStream()).getReadAheadQueueDepth()) + .describedAs("readahead queue depth should be set to default value 2") + .isEqualTo(2); + in.close(); + } + } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index eca670fba9059..4cfc0928cb171 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -34,20 +34,38 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.io.IOUtils; import org.assertj.core.api.Assertions; import org.junit.Test; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_BLOCK_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticGauge; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.test.LambdaTestUtils.eventually; public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { - public ITestReadBufferManager() throws Exception { + /** + * Time before the JUnit test times out for eventually() clauses + * to fail. This copes with slow network connections and debugging + * sessions, yet still allows for tests to fail with meaningful + * messages. + */ + public static final int TIMEOUT_OFFSET = 5 * 60_000; + + /** + * Interval between eventually preobes. + */ + public static final int PROBE_INTERVAL_MILLIS = 1_000; + + public ITestReadBufferManager() throws Exception { } @Test @@ -93,55 +111,100 @@ private void assertListEmpty(String listName, List list) { @Test public void testPurgeBufferManagerForSequentialStream() throws Exception { - describe("Testing purging of buffers in ReadBufferManager for " - + "sequential input streams"); - AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); - final String fileName = methodName.getMethodName(); - byte[] fileContent = getRandomBytesArray(ONE_MB); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - - AbfsInputStream iStream1 = null; + describe("Testing purging of buffers in ReadBufferManager for " + + "sequential input streams"); + AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + final String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(ONE_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + AbfsInputStream iStream1 = null; // stream1 will be closed right away. - try { - iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); - // Just reading one byte will trigger all read ahead calls. - iStream1.read(); - } finally { - IOUtils.closeStream(iStream1); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - AbfsInputStream iStream2 = null; - try { - iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); - iStream2.read(); - // After closing stream1, no queued buffers of stream1 should be present - // assertions can't be made about the state of the other lists as it is - // too prone to race conditions. - assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); - } finally { - // closing the stream later. - IOUtils.closeStream(iStream2); - } - // After closing stream2, no queued buffers of stream2 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); + AbfsInputStream iStream2 = null; + try { + iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + // Just reading one byte will trigger all read ahead calls. + iStream1.read(); + assertThatStatisticGauge(iStream1.getIOStatistics(), STREAM_READ_ACTIVE_PREFETCH_OPERATIONS) + .isGreaterThan(0); + } finally { + IOUtils.closeStream(iStream1); + } + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + + try { + iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + iStream2.read(); + // After closing stream1, none of the buffers associated with stream1 should be present. + AbfsInputStream s1 = iStream1; + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListDoesnotContainBuffersForIstream("InProgressList", + bufferManager.getInProgressCopiedList(), s1)); + assertListDoesnotContainBuffersForIstream("CompletedList", + bufferManager.getCompletedReadListCopy(), iStream1); + assertListDoesnotContainBuffersForIstream("ReadAheadQueue", + bufferManager.getReadAheadQueueCopy(), iStream1); + } finally { + // closing the stream later. + IOUtils.closeStream(iStream2); + } + // After closing stream2, none of the buffers associated with stream2 should be present. + IOStatisticsStore stream2IOStatistics = iStream2.getIOStatistics(); + AbfsInputStream s2 = iStream2; + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListDoesnotContainBuffersForIstream("InProgressList", + bufferManager.getInProgressCopiedList(), s2)); + // no in progress reads in the stats + assertThatStatisticGauge(stream2IOStatistics, STREAM_READ_ACTIVE_PREFETCH_OPERATIONS) + .isEqualTo(0); + + assertListDoesnotContainBuffersForIstream("CompletedList", + bufferManager.getCompletedReadListCopy(), iStream2); + assertListDoesnotContainBuffersForIstream("ReadAheadQueue", + bufferManager.getReadAheadQueueCopy(), iStream2); + + // After closing both the streams, all lists should be empty. + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList())); + assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); + assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + } - // After closing both the streams, read queue should be empty. - assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + private void assertListDoesnotContainBuffersForIstream(String name, + List list, AbfsInputStream inputStream) { + Assertions.assertThat(list) + .describedAs("list %s contains entries for closed stream %s", + name, inputStream) + .filteredOn(b -> b.getStream() == inputStream) + .isEmpty(); } + private void assertListContainBuffersForIstream(List list, + AbfsInputStream inputStream) { + Assertions.assertThat(list) + .describedAs("buffer expected to contain closed stream %s", inputStream ) + .filteredOn(b -> b.getStream() == inputStream) + .isNotEmpty(); + } - private void assertListDoesnotContainBuffersForIstream(List list, - AbfsInputStream inputStream) { - for (ReadBuffer buffer : list) { - Assertions.assertThat(buffer.getStream()) - .describedAs("Buffers associated with closed input streams shouldn't be present") - .isNotEqualTo(inputStream); - } + /** + * Does a list contain a read buffer for stream? + * @param list list to scan + * @param inputStream stream to look for + * @return true if there is at least one reference in the list. + */ + boolean listContainsStreamRead(List list, + AbfsInputStream inputStream) { + return list.stream() + .filter(b -> b.getStream() == inputStream) + .count() > 0; } + private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception { Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true); conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8); conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 0395c4183b9b7..f3e6475db2a35 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -28,14 +28,18 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; @@ -45,10 +49,14 @@ import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_DISCARDED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -62,7 +70,13 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; /** - * Unit test AbfsInputStream. + * Unit test AbfsInputStream, especially the ReadAheadManager. + * Note 1: even though this uses mocking to simulate the client, + * it is still an integration test suite and will fail if offline. + * + * Note 2: maven runs different test methods in parallel, so + * making assertions about the state of the shared object is + * fairly dangerous. */ public class TestAbfsInputStream extends AbstractAbfsIntegrationTest { @@ -82,6 +96,8 @@ public class TestAbfsInputStream extends REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + private static final Logger LOG = LoggerFactory.getLogger(TestAbfsInputStream.class); + @Override public void teardown() throws Exception { super.teardown(); @@ -118,7 +134,11 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, null, FORWARD_SLASH + fileName, THREE_KB, - inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB), + inputStreamContext + .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) + .withReadBufferSize(ONE_KB) + .withReadAheadQueueDepth(10) + .withReadAheadBlockSize(ONE_KB), "eTag", getTestTracingContext(null, false)); @@ -144,6 +164,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, FORWARD_SLASH + fileName, fileSize, inputStreamContext.withReadBufferSize(readBufferSize) + .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withReadAheadQueueDepth(readAheadQueueDepth) .withShouldReadBufferSizeAlways(alwaysReadBufferSize) .withReadAheadBlockSize(readAheadBlockSize), @@ -166,13 +187,14 @@ private void queueReadAheads(AbfsInputStream inputStream) { ReadBufferManager.getBufferManager() .queueReadAhead(inputStream, TWO_KB, TWO_KB, inputStream.getTracingContext()); + } private void verifyReadCallCount(AbfsClient client, int count) throws AzureBlobFileSystemException, InterruptedException { // ReadAhead threads are triggered asynchronously. // Wait a second before verifying the number of total calls. - Thread.sleep(1000); + waitForPrefetchCompletion(); verify(client, times(count)).read(any(String.class), any(Long.class), any(byte[].class), any(Integer.class), any(Integer.class), any(String.class), any(String.class), any(TracingContext.class)); @@ -341,7 +363,7 @@ public void testFailedReadAhead() throws Exception { // In this test, a read should trigger 3 client.read() calls as file is 3 KB // and readahead buffer size set in AbfsInputStream is 1 KB // There should only be a total of 3 client.read() in this test. - intercept(IOException.class, + intercept(TimeoutException.class, () -> inputStream.read(new byte[ONE_KB])); // Only the 3 readAhead threads should have triggered client.read @@ -384,7 +406,6 @@ public void testFailedReadAheadEviction() throws Exception { } /** - * * The test expects AbfsInputStream to initiate a remote read request for * the request offset and length when previous read ahead on the offset had failed. * Also checks that the ReadBuffers are evicted as per the ReadBufferManager @@ -414,7 +435,7 @@ public void testOlderReadAheadFailure() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt"); // First read request that fails as the readahead triggered from this request failed. - intercept(IOException.class, + intercept(TimeoutException.class, "Internal Server error", () -> inputStream.read(new byte[ONE_KB])); // Only the 3 readAhead threads should have triggered client.read @@ -593,7 +614,7 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for a sec to // get the read ahead threads to complete - Thread.sleep(1000); + waitForPrefetchCompletion(); // if readAhead failed for specific offset, getBlock should // throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec @@ -700,7 +721,7 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for a sec to // get the read ahead threads to complete - Thread.sleep(1000); + waitForPrefetchCompletion(); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); @@ -725,6 +746,14 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * Wait for prefetches to complete. + * @throws InterruptedException interrupted. + */ + private static void waitForPrefetchCompletion() throws InterruptedException { + Thread.sleep(1000); + } + /** * Test readahead with different config settings for request request size and * readAhead block size @@ -918,4 +947,67 @@ private void resetReadBufferManager(int bufferSize, int threshold) { // by successive tests can lead to OOM based on the dev VM/machine capacity. System.gc(); } -} \ No newline at end of file + + /** + * The first readahead closes the stream. + */ + @Test + public void testStreamCloseInFirstReadAhead() throws Exception { + describe("close a stream during prefetch, verify outcome is good"); + + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + AbfsInputStream inputStream = getAbfsInputStream(client, getMethodName()); + ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + + final long initialInProgressBlocksDiscarded = bufferManager.getInProgressBlocksDiscarded(); + + // on first read, the op succeeds but the stream is closed, which + // means that the request should be considered a failure + doAnswer(invocation -> { + LOG.info("in read call with {}", inputStream); + inputStream.close(); + return successOp; + }).doReturn(successOp) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + // kick these off before the read() to avoid various race conditions. + queueReadAheads(inputStream); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for a sec to + // get the read ahead threads to complete + waitForPrefetchCompletion(); + + // this triggers prefetching, which closes the stream while the read + // is queued. which causes the prefetch to not return. + // which triggers a blocking read, which will then fail. + intercept(PathIOException.class, FSExceptionMessages.STREAM_IS_CLOSED, () -> { + // should fail + int bytesRead = inputStream.read(new byte[ONE_KB]); + // diagnostics info if failure wasn't raised + return "read " + bytesRead + " bytes from " + inputStream; + }); + + Assertions.assertThat(bufferManager.getCompletedReadListCopy()) + .filteredOn(rb -> rb.getStream() == inputStream) + .describedAs("list of completed reads") + .isEmpty(); + IOStatisticsStore ios = inputStream.getIOStatistics(); + assertThatStatisticCounter(ios, STREAM_READ_PREFETCH_BLOCKS_DISCARDED) + .describedAs("blocks discarded by %s", inputStream) + .isGreaterThan(0); + + // at least one of the blocks was discarded in progress. + // this is guaranteed because the mockito callback must have been invoked + // by the prefetcher + Assertions.assertThat(bufferManager.getInProgressBlocksDiscarded()) + .describedAs("in progress blocks discarded") + .isGreaterThan(initialInProgressBlocksDiscarded); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManager.java new file mode 100644 index 0000000000000..c9052383628e2 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManager.java @@ -0,0 +1,576 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.util.functional.FunctionRaisingIOE; + +import static org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.NUM_BUFFERS; +import static org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.getBufferManager; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_EVICTED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BLOCKS_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_DISCARDED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_BYTES_USED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for ReadBufferManager; uses stub ReadBufferStreamOperations + * to simulate failures, count statistics etc. + */ +public class TestReadBufferManager extends AbstractHadoopTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(TestReadBufferManager.class); + + private ReadBufferManager bufferManager; + + @Before + public void setup() throws Exception { + bufferManager = getBufferManager(); + } + + /** + * doneReading of a not in-progress ReadBuffer is an error. + */ + @Test + public void testBufferNotInProgressList() throws Throwable { + ReadBuffer buffer = readBuffer(new StubReadBufferOperations()); + intercept(IllegalStateException.class, () -> + bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0)); + } + + /** + * Create a read buffer. + * @param operations callbacks to use + * @return a buffer bonded to the operations, and with a countdown latch. + */ + private static ReadBuffer readBuffer(final ReadBufferStreamOperations operations) { + final ReadBuffer buffer = new ReadBuffer(); + buffer.setStream(operations); + buffer.setLatch(new CountDownLatch(1)); + return buffer; + } + + /** + * zero byte results are not considered successes, but not failures either. + */ + @Test + public void testZeroByteRead() throws Throwable { + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.setBytesToRead(0); + + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.AVAILABLE, "buffer is available") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .matches(b -> b.getLength() == 0, "buffer is empty"); + + // even though it succeeded, it didn't return any data, so + // not placed on the completed list. + assertNotInCompletedList(buffer); + } + + /** + * Returning a negative number is the same as a zero byte response. + */ + @Test + public void testNegativeByteRead() throws Throwable { + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.setBytesToRead(-1); + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.AVAILABLE, "buffer is available") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .matches(b -> b.getLength() == 0, "buffer is empty"); + + // even though it succeeded, it didn't return any data, so + // not placed on the completed list. + assertNotInCompletedList(buffer); + } + + /** + * Queue a prefetch against the passed in stream operations, then block + * awaiting its actual execution. + * @param operations operations to use + * @param offset offset of the read. + * @return the completed buffer. + * @throws InterruptedException await was interrupted + */ + private ReadBuffer prefetch(final ReadBufferStreamOperations operations, final int offset) + throws InterruptedException { + return awaitDone(bufferManager.queueReadAhead(operations, offset, 1, null)); + } + + /** + * read raises an IOE; must be caught, saved without wrapping to the ReadBuffer, + * which is declared {@code READ_FAILED}. + * The buffer is freed but it is still placed in the completed list. + * Evictions don't find it (no buffer to return) but will silently + * evict() it when it is out of date. + */ + @Test + public void testIOEInRead() throws Throwable { + + final StubReadBufferOperations operations = new StubReadBufferOperations(); + final EOFException ioe = new EOFException("eof"); + operations.exceptionToRaise = ioe; + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.READ_FAILED, "buffer is failed") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .extracting(ReadBuffer::getErrException) + .isEqualTo(ioe); + + // it is on the completed list for followup reads + assertInCompletedList(buffer); + + // evict doesn't find it, as there is no useful buffer + Assertions.assertThat(bufferManager.callTryEvict()) + .describedAs("evict() should not find failed buffer") + .isFalse(); + assertInCompletedList(buffer); + + // set the timestamp to be some time ago + buffer.setTimeStamp(1000); + + // evict still doesn't find it + Assertions.assertThat(bufferManager.callTryEvict()) + .describedAs("evict() should not find failed buffer") + .isFalse(); + + // but now it was evicted due to its age + assertNotInCompletedList(buffer); + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED) + .isEqualTo(1); + } + + /** + * Stream is closed before the read; this should trigger failure before + * {@code readRemote()} is invoked. + * The stream state change does not invoke + * {@link ReadBufferManager#purgeBuffersForStream(ReadBufferStreamOperations)}; + * success relies on ReadBufferWorker. + */ + @Test + public void testClosedBeforeRead() throws Throwable { + + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.setClosed(true); + // this exception won't get raised because the read was cancelled first + operations.exceptionToRaise = new EOFException("eof"); + + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.READ_FAILED, "buffer is failed") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .extracting(ReadBuffer::getErrException) + .isNull(); + + // its on the completed list for followup reads + assertNotInCompletedList(buffer); + } + + /** + * Stream is closed during the read; this should trigger failure. + * The stream state change does not invoke + * {@link ReadBufferManager#purgeBuffersForStream(ReadBufferStreamOperations)}; + * success relies on ReadBufferWorker. + */ + @Test + public void testClosedDuringRead() throws Throwable { + + // have a different handler for the read call. + final StubReadBufferOperations operations = + new StubReadBufferOperations((self) -> { + self.setClosed(true); + return 1; + }); + + ReadBuffer buffer = prefetch(operations, 0); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.READ_FAILED, "buffer is failed") + .matches(b -> !b.hasIndexedBuffer(), "no indexed buffer") + .extracting(ReadBuffer::getErrException) + .isNull(); + + // its on the completed list for followup reads + assertNotInCompletedList(buffer); + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED) + .isEqualTo(1); + } + + /** + * When a stream is closed, all entries in the completed list are closed. + */ + @Test + public void testStreamCloseEvictsCompleted() throws Throwable { + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.bytesToRead = 1; + // two successes + ReadBuffer buffer1 = prefetch(operations, 0); + ReadBuffer buffer2 = prefetch(operations, 1000); + + // final read is a failure + operations.exceptionToRaise = new FileNotFoundException("/fnfe"); + ReadBuffer failed = prefetch(operations, 2000); + + List buffers = Arrays.asList(buffer1, buffer2, failed); + + // all buffers must be in the completed list + buffers.forEach(TestReadBufferManager::assertInCompletedList); + + // xor for an exclusivity assertion. + // the read failed xor it succeeded with an indexed buffer. + Assertions.assertThat(buffers) + .allMatch(b -> + b.getStatus() == ReadBufferStatus.READ_FAILED + ^ b.hasIndexedBuffer(), + "either was failure or it has an indexed buffer but not both"); + + // now state that buffer 1 was read + buffer1.dataConsumedByStream(0, 1); + Assertions.assertThat(buffer1) + .matches(ReadBuffer::isAnyByteConsumed, "any byte consumed") + .matches(ReadBuffer::isFirstByteConsumed, "first byte consumed") + .matches(ReadBuffer::isLastByteConsumed, "last byte consumed"); + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_USED) + .isEqualTo(1); + + operations.assertCounter(STREAM_READ_PREFETCH_BYTES_USED) + .isEqualTo(1); + + // now tell buffer manager of the stream closure + operations.setClosed(true); + bufferManager.purgeBuffersForStream(operations); + + // now the buffers are out the list + buffers.forEach(TestReadBufferManager::assertNotInCompletedList); + Assertions.assertThat(buffers) + .allMatch(b -> !b.hasIndexedBuffer(), "no indexed buffer"); + + // all blocks declared evicted + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED) + .isEqualTo(3); + // only one buffer was evicted unused; the error buffer + // doesn't get included, while the first buffer was + // used. + operations.assertCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED) + .isEqualTo(1); + } + + /** + * Even if (somehow) the closed stream makes it to the completed list, + * if it succeeded it will be found and returned. + */ + @Test + public void testEvictFindsClosedStreams() throws Throwable { + final StubReadBufferOperations operations = new StubReadBufferOperations(); + operations.bytesToRead = 1; + // two successes + ReadBuffer buffer1 = prefetch(operations, 0); + + // final read is a failure + operations.exceptionToRaise = new FileNotFoundException("/fnfe"); + ReadBuffer failed = prefetch(operations, 2000); + + List buffers = Arrays.asList(buffer1, failed); + + // all buffers must be in the completed list + buffers.forEach(TestReadBufferManager::assertInCompletedList); + + // xor for an exclusivity assertion. + // the read failed xor it succeeded with an indexed buffer. + Assertions.assertThat(buffers) + .allMatch(b -> + b.getStatus() == ReadBufferStatus.READ_FAILED + ^ b.hasIndexedBuffer(), + "either was failure or it has an indexed buffer but not both"); + + // now state that buffer 1 was read + buffer1.dataConsumedByStream(0, 1); + + // now close the stream without telling the buffer manager + operations.setClosed(true); + + // evict finds the buffer with data + Assertions.assertThat(bufferManager.callTryEvict()) + .describedAs("evict() should return closed buffer with data") + .isTrue(); + + // but now it was evicted due to its age + assertNotInCompletedList(buffer1); + assertInCompletedList(failed); + + // second still doesn't find the failed buffer, as it has no data + Assertions.assertThat(bufferManager.callTryEvict()) + .describedAs("evict() should not find failed buffer") + .isFalse(); + + // but it was quietly evicted due to its state + assertNotInCompletedList(failed); + + } + + /** + * HADOOP-18521: if we remove ownership of a buffer during the remote read, + * things blow up. + */ + @Test + public void testOwnershipChangedDuringRead() throws Throwable { + + final Object blockUntilReadStarted = new Object(); + // this stores the reference and is the object waited on to + // block read operation until buffer index is patched. + + final AtomicReference bufferRef = new AtomicReference<>(); + + // handler blocks for buffer reference update. + final StubReadBufferOperations operations = + new StubReadBufferOperations((self) -> { + synchronized (blockUntilReadStarted) { + blockUntilReadStarted.notify(); + } + synchronized (bufferRef) { + if (bufferRef.get() == null) { + try { + bufferRef.wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException("interrupted"); + } + } + } + return 1; + }); + + // queue the operation + ReadBuffer buffer = bufferManager.queueReadAhead(operations, 0, 1, null); + + synchronized (blockUntilReadStarted) { + blockUntilReadStarted.wait(); + } + + // set the buffer index to a different value + final int index = buffer.getBufferindex(); + final int newIndex = (index + 1) % NUM_BUFFERS; + LOG.info("changing buffer index from {} to {}", index, newIndex); + buffer.setBufferindex(newIndex); + + // notify waiting buffer + synchronized (bufferRef) { + bufferRef.set(buffer); + bufferRef.notifyAll(); + } + + awaitDone(buffer); + + // buffer has its indexed buffer released and fields updated. + Assertions.assertThat(buffer) + .matches(b -> b.getStatus() == ReadBufferStatus.READ_FAILED, "buffer is failed") + .extracting(ReadBuffer::getErrException) + .isNotNull(); + + assertExceptionContains("not owned", buffer.getErrException()); + + // its on the completed list for followup reads + assertNotInCompletedList(buffer); + + bufferManager.testResetReadBufferManager(); + } + + /** + * Assert that a buffer is not in the completed list. + * @param buffer buffer to validate. + */ + private static void assertNotInCompletedList(final ReadBuffer buffer) { + Assertions.assertThat(getBufferManager().getCompletedReadListCopy()) + .describedAs("completed read list") + .doesNotContain(buffer); + } + + /** + * Assert that a buffer is in the completed list. + * @param buffer buffer to validate. + */ + private static void assertInCompletedList(final ReadBuffer buffer) { + Assertions.assertThat(getBufferManager().getCompletedReadListCopy()) + .describedAs("completed read list") + .contains(buffer); + } + + /** + * Awaits for a buffer to be read; times out eventually and then raises an + * assertion. + * @param buffer buffer to block on. + * @return + * @throws InterruptedException interrupted. + */ + private static ReadBuffer awaitDone(final ReadBuffer buffer) throws InterruptedException { + LOG.info("awaiting latch of {}", buffer); + Assertions.assertThat(buffer.getLatch().await(1, TimeUnit.MINUTES)) + .describedAs("awaiting release of latch on %s", buffer) + .isTrue(); + LOG.info("Prefetch completed of {}", buffer); + return buffer; + } + + /** + * Stub implementation of {@link ReadBufferStreamOperations}. + * IOStats are collected and a lambda-function can be provided + * which is evaluated on a readRemote call. The default one is + * {@link #raiseOrReturn(StubReadBufferOperations)}. + */ + private static final class StubReadBufferOperations + implements ReadBufferStreamOperations { + + private final IOStatisticsStore iostats = iostatisticsStore() + .withCounters( + STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, + STREAM_READ_PREFETCH_BLOCKS_DISCARDED, + STREAM_READ_PREFETCH_BLOCKS_USED, + STREAM_READ_PREFETCH_BYTES_DISCARDED, + STREAM_READ_PREFETCH_BLOCKS_EVICTED, + STREAM_READ_PREFETCH_BYTES_USED, + STREAM_READ_PREFETCH_OPERATIONS) + .build(); + + private boolean closed; + + private int bytesToRead; + + private IOException exceptionToRaise; + + /** + * Function which takes a ref to this object and + * returns the number of bytes read. + * Invoked in the {@link #readRemote(long, byte[], int, int, TracingContext)} + * call. + */ + private final FunctionRaisingIOE action; + + /** + * Simple constructor. + */ + private StubReadBufferOperations() { + // trivia, you can't use a this(this::raiseOrReturn) call here. + this.action = this::raiseOrReturn; + } + + /** + * constructor. + * @param action lambda expression to call in readRemote() + */ + private StubReadBufferOperations( + FunctionRaisingIOE action) { + this.action = action; + } + + @Override + public int readRemote( + final long position, + final byte[] b, + final int offset, + final int length, + final TracingContext tracingContext) throws IOException { + return action.apply(this); + } + + private int raiseOrReturn(StubReadBufferOperations self) throws IOException { + if (exceptionToRaise != null) { + throw exceptionToRaise; + } + return bytesToRead; + } + + @Override + public boolean isClosed() { + return closed; + } + + private void setClosed(final boolean closed) { + this.closed = closed; + } + + private int getBytesToRead() { + return bytesToRead; + } + + private void setBytesToRead(final int bytesToRead) { + this.bytesToRead = bytesToRead; + } + + @Override + public String getStreamID() { + return "streamid"; + } + + @Override + public IOStatisticsStore getIOStatistics() { + return iostats; + } + + @Override + public String getPath() { + return "/path"; + } + + /** + * start asserting about a counter. + * @param key key + * @return assert builder + */ + AbstractLongAssert assertCounter(String key) { + return assertThatStatisticCounter(iostats, key); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties index 9f72d03653306..2b0e72b1a16ce 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties @@ -26,6 +26,8 @@ log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG +log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferManager=TRACE +log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker=TRACE # after here: turn off log messages from other parts of the system # which only clutter test reports.