Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ public enum AzureServiceErrorCode {
INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null),
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND("RenameDestinationParentPathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null),
INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."),
EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."),
INGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"Ingress is over the account limit."),
EGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"Egress is over the account limit."),
TPS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"Operations per second is over the account limit."),
OTHER_SERVER_THROTTLING("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"The server is currently unable to receive requests. Please retry your request."),
INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null),
ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private AbfsClient(final URL baseUrl,
DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
} catch (IOException e) {
// Suppress exception. Failure to init DelegatingSSLSocketFactory would have only performance impact.
// Suppress exception, failure to init DelegatingSSLSocketFactory would have only performance impact.
LOG.trace("NonCritFailure: DelegatingSSLSocketFactory Init failed : "
+ "{}", e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION;

/**
* The AbfsRestOperation for Rest AbfsClient.
Expand Down Expand Up @@ -283,7 +285,7 @@ String getClientLatency() {
private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsHttpOperation httpOperation;
boolean wasIOExceptionThrown = false;
boolean wasExceptionThrown = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment on the usage of this,

Copy link
Contributor Author

@anujmodi2021 anujmodi2021 Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment


try {
// initialize the HTTP request and open the connection
Expand Down Expand Up @@ -321,7 +323,27 @@ private boolean executeHttpOperation(final int retryCount,
} else if (httpOperation.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1);
}

// If no exception occurred till here it means http operation was successfully complete and
// a response from server has been received which might be failure or success.
// If any kind of exception has occurred it will be caught below.
// If request failed determine failure reason and retry policy here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : failed to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

// else simply return with success after saving the result.
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);

int status = httpOperation.getStatusCode();
failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage());
retryPolicy = client.getRetryPolicy(failureReason);

if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
return false;
}

// If the request has succeeded or failed with non-retrial error, save the operation and return.
result = httpOperation;

} catch (UnknownHostException ex) {
wasExceptionThrown = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename this? because in case of other exceptions, we update the metric below in finally block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to wasKnownExceptionThrown.

So that if any other exception is thrown we will still update metrics

String hostname = null;
hostname = httpOperation.getHost();
failureReason = RetryReason.getAbbreviation(ex, null, null);
Expand All @@ -333,57 +355,44 @@ private boolean executeHttpOperation(final int retryCount,
}
return false;
} catch (IOException ex) {
wasExceptionThrown = true;
if (LOG.isDebugEnabled()) {
LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex);
}

failureReason = RetryReason.getAbbreviation(ex, -1, "");
retryPolicy = client.getRetryPolicy(failureReason);
wasIOExceptionThrown = true;
if (!retryPolicy.shouldRetry(retryCount, -1)) {
throw new InvalidAbfsRestOperationException(ex, retryCount);
}

return false;
} finally {
int status = httpOperation.getStatusCode();
/*
A status less than 300 (2xx range) or greater than or equal
to 500 (5xx range) should contribute to throttling metrics being updated.
Less than 200 or greater than or equal to 500 show failed operations. 2xx
range contributes to successful operations. 3xx range is for redirects
and 4xx range is for user errors. These should not be a part of
throttling backoff computation.
Updating Client Side Throttling Metrics for relevant response status codes.
1. Status code in 2xx range: Successful Operations should contribute
2. Status code in 3xx range: Redirection Operations should not contribute
3. Status code in 4xx range: User Errors should not contribute
4. Status code is 503: Throttling Error should contribute as following:
a. 503, Ingress Over Account Limit: Should Contribute
b. 503, Egress Over Account Limit: Should Contribute
c. 503, TPS Over Account Limit: Should Contribute
d. 503, Other Server Throttling: Should not contribute
5. Status code in 5xx range other than 503: Should not contribute
6. IOException and UnknownHostExceptions: Should not contribute
*/
boolean updateMetricsResponseCode = (status < HttpURLConnection.HTTP_MULT_CHOICE
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);

/*
Connection Timeout failures should not contribute to throttling
In case the current request fails with Connection Timeout we will have
ioExceptionThrown true and failure reason as CT
In case the current request failed with 5xx, failure reason will be
updated after finally block but wasIOExceptionThrown will be false;
*/
boolean isCTFailure = CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason) && wasIOExceptionThrown;

if (updateMetricsResponseCode && !isCTFailure) {
int statusCode = httpOperation.getStatusCode();
boolean shouldUpdateCSTMetrics = (statusCode < HttpURLConnection.HTTP_MULT_CHOICE // Case 1
|| INGRESS_LIMIT_BREACH_ABBREVIATION.equals(failureReason) // Case 4.a
|| EGRESS_LIMIT_BREACH_ABBREVIATION.equals(failureReason) // Case 4.b
|| OPERATION_LIMIT_BREACH_ABBREVIATION.equals(failureReason)) // Case 4.c
&& !wasExceptionThrown; // Case 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please encapsulate the chain of conditions to a method #shouldUpdateCSTMetrics()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken


if (shouldUpdateCSTMetrics) {
intercept.updateMetrics(operationType, httpOperation);
}
}

LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);

int status = httpOperation.getStatusCode();
failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage());
retryPolicy = client.getRetryPolicy(failureReason);

if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
return false;
}

result = httpOperation;

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ private RetryReasonConstants() {
public static final String CONNECTION_TIMEOUT_JDK_MESSAGE = "connect timed out";
public static final String READ_TIMEOUT_JDK_MESSAGE = "Read timed out";
public static final String CONNECTION_RESET_MESSAGE = "Connection reset";
public static final String OPERATION_BREACH_MESSAGE = "Operations per second is over the account limit.";
public static final String CONNECTION_RESET_ABBREVIATION = "CR";
public static final String CONNECTION_TIMEOUT_ABBREVIATION = "CT";
public static final String READ_TIMEOUT_ABBREVIATION = "RT";
public static final String INGRESS_LIMIT_BREACH_ABBREVIATION = "ING";
public static final String EGRESS_LIMIT_BREACH_ABBREVIATION = "EGR";
public static final String OPERATION_LIMIT_BREACH_ABBREVIATION = "OPR";
public static final String OTHER_SERVER_THROTTLING_ABBREVIATION = "OTH";
public static final String UNKNOWN_HOST_EXCEPTION_ABBREVIATION = "UH";
public static final String IO_EXCEPTION_ABBREVIATION = "IOE";
public static final String SOCKET_EXCEPTION_ABBREVIATION = "SE";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_STATUS_CATEGORY_QUOTIENT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION;

/**
* Category that can capture server-response errors for 5XX status-code.
Expand Down Expand Up @@ -56,10 +58,14 @@ String getAbbreviation(final Integer statusCode,
splitedServerErrorMessage)) {
return EGRESS_LIMIT_BREACH_ABBREVIATION;
}
if (OPERATION_BREACH_MESSAGE.equalsIgnoreCase(
if (TPS_OVER_ACCOUNT_LIMIT.getErrorMessage().equalsIgnoreCase(
splitedServerErrorMessage)) {
return OPERATION_LIMIT_BREACH_ABBREVIATION;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please rename OPERATION_LIMIT_BREACH_ABBREVIATION to TPS_LIMIT_BREACH_ABBREVIATION, this would maintain consistency with the naming convention with other constants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken

}
if (OTHER_SERVER_THROTTLING.getErrorMessage().equalsIgnoreCase(
splitedServerErrorMessage)) {
return OTHER_SERVER_THROTTLING_ABBREVIATION;
}
return HTTP_UNAVAILABLE + "";
}
return statusCode + "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -233,6 +234,11 @@ private AbfsRestOperation getRestOperation() throws Exception {
// mocked the response code and the response message to check different
// behaviour based on response code.
Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode();
if (responseCode == HTTP_UNAVAILABLE) {
Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage())
.when(abfsHttpOperation)
.getStorageErrorMessage();
}
Mockito.doReturn(responseMessage)
.when(abfsHttpOperation)
.getConnResponseMessage();
Expand Down
Loading