Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
private int prefetchRequestPriorityValue;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_POLICY,
DefaultValue = DEFAULT_FS_AZURE_READ_POLICY)
private String abfsReadPolicy;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1381,6 +1385,14 @@ public String getPrefetchRequestPriorityValue() {
return Integer.toString(prefetchRequestPriorityValue);
}

/**
* Get the ABFS read policy set by user.
* @return the ABFS read policy.
*/
public String getAbfsReadPolicy() {
return abfsReadPolicy;
}

/**
* Enum config to allow user to pick format of x-ms-client-request-id header
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
Expand Down Expand Up @@ -2079,6 +2091,15 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled)
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}

/**
* Sets the ABFS read policy for testing purposes.
* @param readPolicy the read policy to set.
*/
@VisibleForTesting
public void setAbfsReadPolicy(String readPolicy) {
abfsReadPolicy = readPolicy;
}

public boolean isFullBlobChecksumValidationEnabled() {
return isFullBlobChecksumValidationEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
Expand All @@ -90,13 +89,15 @@
import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
import org.apache.hadoop.fs.azurebfs.services.AbfsAdaptiveInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientRenameResult;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputPolicy;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
Expand All @@ -107,10 +108,13 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsPrefetchInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRandomInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.TailLatencyRequestTimeoutRetryPolicy;
Expand Down Expand Up @@ -946,12 +950,30 @@ public AbfsInputStream openFileForRead(Path path,

perfInfo.registerSuccess(true);

// Add statistics for InputStream
return new AbfsInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
eTag, tracingContext);
AbfsInputPolicy inputPolicy = AbfsInputPolicy.getPolicy(getAbfsConfiguration().getAbfsReadPolicy());
switch (inputPolicy) {
case SEQUENTIAL:
return new AbfsPrefetchInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
eTag, tracingContext);

case RANDOM:
return new AbfsRandomInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
eTag, tracingContext);

case ADAPTIVE:
default:
return new AbfsAdaptiveInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
eTag, tracingContext);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of importing entire Options class, we can just import OpenFileOptions class and directly mention OpenFileOptions class below in comments.
import org.apache.hadoop.fs.Options.OpenFileOptions;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken


import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;

Expand Down Expand Up @@ -215,6 +216,12 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
/**
* Provides hint for the read workload pattern.
* Possible Values Exposed in {@link Options.OpenFileOptions}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES?
OpenFileOptions has other values as well.
If so, we can do
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICIES;
and mention FS_OPTION_OPENFILE_READ_POLICIES only

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried but that variable is not resolvable here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do something like this:
import org.apache.hadoop.fs.Options.OpenFileOptions;

  • Possible Values Exposed in {@link OpenFileOptions#FS_OPTION_OPENFILE_READ_POLICIES}

*/
public static final String FS_AZURE_READ_POLICY = "fs.azure.read.policy";

/** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;

/**
Expand Down Expand Up @@ -93,7 +94,7 @@

/** Default buffer sizes and optimization flags. */
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB

Check failure on line 97 in hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java#L97

javadoc: warning: no comment

Check failure on line 97 in hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java#L97

javadoc: warning: no comment
public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
Expand All @@ -108,6 +109,7 @@
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
public static final String DEFAULT_FS_AZURE_READ_POLICY = FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;

public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256";

Expand Down Expand Up @@ -416,7 +418,7 @@

public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true;

public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = true;
public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we dont require to disable it I think. We only add the request header if the read type is set to prefetch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching. Will revert


// The default traffic request priority is 3 (from service side)
// The lowest priority a request can get is 7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public enum ReadType {
* Only triggered when small file read optimization kicks in.
*/
SMALLFILE_READ("SR"),
/**
* Reads from Random Input Stream with read ahead up to readAheadRange
*/
RANDOM_READ("RR"),
/**
* None of the above read types were applicable.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ReadType;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

import static java.lang.Math.max;

/**
* Input stream implementation optimized for adaptive read patterns.
* This is the default implementation used for cases where user does not specify any input policy.
* It switches between sequential and random read optimizations based on the detected read pattern.
* It also keeps footer read and small file optimizations enabled.
*/
public class AbfsAdaptiveInputStream extends AbfsInputStream {

public AbfsAdaptiveInputStream(

Check failure on line 37 in hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java#L37

javadoc: warning: no comment
final AbfsClient client,
final FileSystem.Statistics statistics,
final String path,
final long contentLength,
final AbfsInputStreamContext abfsInputStreamContext,
final String eTag,
TracingContext tracingContext) {
super(client, statistics, path, contentLength,
abfsInputStreamContext, eTag, tracingContext);
}

/**
* {@inheritDoc}
*/
@Override
protected int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (len == 0) {
return 0;
}
if (!validate(b, off, len)) {
return -1;
}
//If buffer is empty, then fill the buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space missing between // and If.
Same thing at multiple other places as well, please correct it at all other places as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing is present in AbfsRandomInputStream and AbfsPrefetchInputStream as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

if (bCursor == limit) {
//If EOF, then return -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

if (fCursor >= contentLength) {
return -1;
}

long bytesRead = 0;
//reset buffer to initial state - i.e., throw away existing data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

bCursor = 0;
limit = 0;
if (buffer == null) {
LOG.debug("created new buffer size {}", bufferSize);
buffer = new byte[bufferSize];
}

// Reset Read Type back to normal and set again based on code flow.
tracingContext.setReadType(ReadType.NORMAL_READ);
if (alwaysReadBufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
LOG.debug("Sequential read with read ahead size of {}", bufferSize);
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
/*
* Disable queuing prefetches when random read pattern detected.
* Instead, read ahead only for readAheadRange above what is asked by caller.
*/
tracingContext.setReadType(ReadType.RANDOM_READ);
int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize);
LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead);
bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true);
}
}
if (firstRead) {
firstRead = false;
}
if (bytesRead == -1) {
return -1;
}

limit += bytesRead;
fCursor += bytesRead;
fCursorAfterLastRead = fCursor;
}
return copyToUserBuffer(b, off, len);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.util.Locale;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;

/**
* Enum for ABFS Input Policies.
* Each policy maps to a particular implementation of {@link AbfsInputStream}
*/
public enum AbfsInputPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be ReadPolicy or input stream policy instead of input policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to ReadPolicy.
Anything is fine IMO


SEQUENTIAL(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL),
RANDOM(FS_OPTION_OPENFILE_READ_POLICY_RANDOM),
ADAPTIVE(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE);

private final String policy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken


AbfsInputPolicy(String policy) {
this.policy = policy;
}

@Override
public String toString() {
return policy;
}

/**
* Get the enum constant from the string name.
* @param name policy name as configured by user
* @return the corresponding AbsInputPolicy to be used
*/
public static AbfsInputPolicy getPolicy(String name) {
String trimmed = name.trim().toLowerCase(Locale.ENGLISH);
switch (trimmed) {
// all these options currently map to random IO.
case FS_OPTION_OPENFILE_READ_POLICY_RANDOM:
case FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR:
case FS_OPTION_OPENFILE_READ_POLICY_ORC:
case FS_OPTION_OPENFILE_READ_POLICY_PARQUET:
return RANDOM;

// handle the sequential formats.
case FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL:
case FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE:
return SEQUENTIAL;

// Everything else including ABFS Default Policy maps to Adaptive
case FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE:
default:
return ADAPTIVE;
}
}
}
Loading
Loading