Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
private int httpReadTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT,
DefaultValue = DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT)
private int expect100ContinueWaitTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
MinValue = 0,
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
Expand Down Expand Up @@ -1033,6 +1037,10 @@ public int getHttpReadTimeout() {
return this.httpReadTimeout;
}

public int getExpect100ContinueWaitTimeout() {
return this.expect100ContinueWaitTimeout;
}

public long getAzureBlockSize() {
return this.azureBlockSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public final class ConfigurationKeys {
*/
public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout";

/**
* Config to set HTTP Expect100-Continue Wait Timeout Value for Rest Operations.
* Value: {@value}.
*/
public static final String AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT
= "fs.azure.http.expect.100continue.wait.timeout";

// Retry strategy for getToken calls
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public final class FileSystemConfigurations {
*/
public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs

/**
* Default value of connection request timeout to be used when 100continue is enabled.
* Value: {@value}.
*/
public static final int DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT = 3_000; // 3s

// Retry parameter defaults.
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,14 @@ static boolean usable() {
final HttpClientBuilder builder = HttpClients.custom();
builder.setConnectionManager(connMgr)
.setRequestExecutor(
// In case of Expect:100-continue, the timeout for waiting for
// the 100-continue response from the server is set using
// ExpectWaitContinueTimeout. For other requests, the read timeout
// is set using SocketTimeout.
new AbfsManagedHttpRequestExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we were discussing last time, if we keep it to read timeout the 100 continue timeout would become 30 seconds, this should be another config for 100 continue timeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will create a new config for read timeout and use that when 100 continue is enabled.

abfsConfiguration.getHttpReadTimeout()))
abfsConfiguration.isExpectHeaderEnabled()
? abfsConfiguration.getExpect100ContinueWaitTimeout()
: abfsConfiguration.getHttpReadTimeout()))
.disableContentCompression()
.disableRedirectHandling()
.disableAutomaticRetries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ private AbfsClient(final URL baseUrl,
== HttpOperationType.APACHE_HTTP_CLIENT) {
keepAliveCache = new KeepAliveCache(abfsConfiguration);

// Warm up the connection pool during client initialization to avoid latency during first request.
// Since for every filesystem instance, we create both DFS and Blob client instance,
// so warmup is done only for the default client.
abfsApacheHttpClient = new AbfsApacheHttpClient(
DelegatingSSLSocketFactory.getDefaultFactory(),
abfsConfiguration, keepAliveCache, baseUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public AbfsClientHandler(final URL baseUrl,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
// This will initialize the default and ingress service types.
// This is needed before crating the clients so that we can do cache warmup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo creating

// only for default client.
initServiceType(abfsConfiguration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will initialize the default and ingress service types. This is needed before crating the clients so that we can do cache warmup only for default client.

this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ private int cacheExtraConnection(final HttpRoute route,
} catch (RejectedExecutionException e) {
LOG.debug("Task rejected for connection creation: {}",
e.getMessage());
return 0;
return -1;
}
}

Expand Down
4 changes: 2 additions & 2 deletions hadoop-tools/hadoop-azure/src/site/markdown/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,8 @@ ABFS Driver can use the following networking libraries:
The networking library can be configured using the configuration `fs.azure.networking.library`
while initializing the filesystem.
Following are the supported values:
- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library [Default]
- `APACHE_HTTP_CLIENT` : Use Apache HttpClient
- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library
- `APACHE_HTTP_CLIENT` : Use Apache HttpClient [Default]

#### <a href="ahc_networking_conf"></a>ApacheHttpClient networking layer configuration Options:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.functional.Tuples;
import org.apache.http.HttpHost;
Expand All @@ -45,8 +46,10 @@
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JDK_FALLBACK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY;
Expand Down Expand Up @@ -74,7 +77,9 @@ public void testKacIsClosed() throws Throwable {
configuration.unset(FS_AZURE_METRIC_FORMAT);
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration)) {
KeepAliveCache kac = fs.getAbfsStore().getClientHandler().getIngressClient()
KeepAliveCache kac = fs.getAbfsStore()
.getClientHandler()
.getIngressClient()
.getKeepAliveCache();
kac.close();
AbfsDriverException ex = intercept(AbfsDriverException.class,
Expand Down Expand Up @@ -149,10 +154,33 @@ public void testApacheClientFallbackDuringConnectionWarmup()
Assertions.assertThat(AbfsApacheHttpClient.usable())
.describedAs("Apache HttpClient should be not usable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make one http call and validate now jdk is being used, user agent can be used for validation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do that.

.isFalse();
// Make a rest API call to verify that the client falls back to JDK client.
AzureBlobFileSystem fs = getFileSystem();
verifyClientRequestId(fs, JDK_FALLBACK);
AbfsApacheHttpClient.setUsable();
verifyClientRequestId(fs, APACHE_IMPL);
}
}

/**
* Verify that the client request id contains the expected client.
* @param fs AzureBlobFileSystem instance
* @param expectedClient Expected client in the client request id.
* @throws AzureBlobFileSystemException if any failure occurs during the operation.
*/
private void verifyClientRequestId(AzureBlobFileSystem fs,
String expectedClient)
throws AzureBlobFileSystemException {
AbfsRestOperation op = fs.getAbfsStore()
.getClient()
.getFilesystemProperties(getTestTracingContext(fs, true));
String[] clientRequestIdList = op.getResult()
.getClientRequestId().split(COLON);
Assertions.assertThat(clientRequestIdList[clientRequestIdList.length - 1])
.describedAs("Http Client in use should be %s", expectedClient)
.isEqualTo(expectedClient);
}

private Map.Entry<HttpRoute, AbfsManagedApacheHttpConnection> getTestConnection()
throws IOException {
HttpHost host = new HttpHost(getFileSystem().getUri().getHost(),
Expand Down