-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19767: [ABFS] Introduce Abfs Input Policy for detecting read patterns #8153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
bb27ec4
16253aa
68f905c
bb42e3c
94d2336
46b3e18
3560cc5
f3b6b57
c53a82f
2ed6c25
7eb974a
81763f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import org.apache.hadoop.classification.InterfaceAudience; | ||
| import org.apache.hadoop.classification.InterfaceStability; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Options; | ||
|
||
|
|
||
| import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT; | ||
|
|
||
|
|
@@ -215,6 +216,12 @@ public final class ConfigurationKeys { | |
| public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; | ||
| public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize"; | ||
| public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize"; | ||
| /** | ||
| * Provides hint for the read workload pattern. | ||
| * Possible Values Exposed in {@link Options.OpenFileOptions} | ||
|
||
| */ | ||
| public static final String FS_AZURE_READ_POLICY = "fs.azure.read.policy"; | ||
|
|
||
| /** Provides a config control to enable or disable ABFS Flush operations - | ||
| * HFlush and HSync. Default is true. **/ | ||
| public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush"; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import org.apache.hadoop.classification.InterfaceStability; | ||
| import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; | ||
|
|
||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; | ||
|
|
||
| /** | ||
|
|
@@ -108,6 +109,7 @@ public final class FileSystemConfigurations { | |
| public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB | ||
| public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost"; | ||
| public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000; | ||
| public static final String DEFAULT_AZURE_READ_POLICY = FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; | ||
|
|
||
| public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256"; | ||
|
|
||
|
|
@@ -416,7 +418,7 @@ public final class FileSystemConfigurations { | |
|
|
||
| public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true; | ||
|
|
||
| public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = true; | ||
| public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = false; | ||
|
||
|
|
||
| // The default traffic request priority is 3 (from service side) | ||
| // The lowest priority a request can get is 7 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.azurebfs.services; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.azurebfs.constants.ReadType; | ||
| import org.apache.hadoop.fs.azurebfs.utils.TracingContext; | ||
|
|
||
| /** | ||
| * Input stream implementation optimized for adaptive read patterns. | ||
| * This is the default implementation used for cases where user does not specify any input policy. | ||
| * It switches between sequential and random read optimizations based on the detected read pattern. | ||
| * It also keeps footer read and small file optimizations enabled. | ||
| */ | ||
| public class AbfsAdaptiveInputStream extends AbfsInputStream { | ||
|
|
||
| /** | ||
| * Constructs AbfsAdaptiveInputStream | ||
| * @param client AbfsClient to be used for read operations | ||
| * @param statistics to recordinput stream statistics | ||
|
||
| * @param path file path | ||
| * @param contentLength file content length | ||
| * @param abfsInputStreamContext input stream context | ||
| * @param eTag file eTag | ||
| * @param tracingContext tracing context to trace the read operations | ||
| */ | ||
| public AbfsAdaptiveInputStream( | ||
anujmodi2021 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final AbfsClient client, | ||
| final FileSystem.Statistics statistics, | ||
| final String path, | ||
| final long contentLength, | ||
| final AbfsInputStreamContext abfsInputStreamContext, | ||
| final String eTag, | ||
| TracingContext tracingContext) { | ||
| super(client, statistics, path, contentLength, | ||
| abfsInputStreamContext, eTag, tracingContext); | ||
| } | ||
|
|
||
| /** | ||
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| protected int readOneBlock(final byte[] b, final int off, final int len) throws IOException { | ||
| if (len == 0) { | ||
| return 0; | ||
| } | ||
| if (!validate(b, off, len)) { | ||
| return -1; | ||
| } | ||
| //If buffer is empty, then fill the buffer. | ||
|
||
| if (getBCursor() == getLimit()) { | ||
| //If EOF, then return -1 | ||
|
||
| if (getFCursor() >= getContentLength()) { | ||
| return -1; | ||
| } | ||
|
|
||
| long bytesRead = 0; | ||
| //reset buffer to initial state - i.e., throw away existing data | ||
|
||
| setBCursor(0); | ||
| setLimit(0); | ||
| if (getBuffer() == null) { | ||
| LOG.debug("created new buffer size {}", getBufferSize()); | ||
| setBuffer(new byte[getBufferSize()]); | ||
| } | ||
|
|
||
| // Reset Read Type back to normal and set again based on code flow. | ||
| getTracingContext().setReadType(ReadType.NORMAL_READ); | ||
| if (shouldAlwaysReadBufferSize()) { | ||
| bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false); | ||
| } else { | ||
| // Enable readAhead when reading sequentially | ||
| if (-1 == getFCursorAfterLastRead() || getFCursorAfterLastRead() == getFCursor() || b.length >= getBufferSize()) { | ||
| LOG.debug("Sequential read with read ahead size of {}", getBufferSize()); | ||
| bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false); | ||
| } else { | ||
| /* | ||
| * Disable queuing prefetches when random read pattern detected. | ||
| * Instead, read ahead only for readAheadRange above what is asked by caller. | ||
| */ | ||
| getTracingContext().setReadType(ReadType.RANDOM_READ); | ||
| int lengthWithReadAhead = Math.min(b.length + getReadAheadRange(), getBufferSize()); | ||
| LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); | ||
| bytesRead = readInternal(getFCursor(), getBuffer(), 0, lengthWithReadAhead, true); | ||
| } | ||
| } | ||
| if (isFirstRead()) { | ||
| setFirstRead(false); | ||
| } | ||
| if (bytesRead == -1) { | ||
| return -1; | ||
| } | ||
|
|
||
| setLimit(getLimit() + (int) bytesRead); | ||
| setFCursor(getFCursor() + bytesRead); | ||
| setFCursorAfterLastRead(getFCursor()); | ||
| } | ||
| return copyToUserBuffer(b, off, len); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method contains lot of lines already. Instead of defining this switch case here, it would be better to define a new method so that tomorrow if new read pattern is introduced, we can just update that method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken