Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ public final class AbfsHttpConstants {
// The HTTP 100 Continue informational status response code indicates that everything so far
// is OK and that the client should continue with the request or ignore it if it is already finished.
public static final String HUNDRED_CONTINUE = "100-continue";
/**
* HTTP status code indicating that the server has received too many requests and the client should
* qualify for retrying the operation, as described in the Microsoft Azure documentation.
* {@link "https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling"}.
*/
public static final int HTTP_TOO_MANY_REQUESTS = 429;

public static final char CHAR_FORWARD_SLASH = '/';
public static final char CHAR_EXCLAMATION_POINT = '!';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2;
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2_000;

public static final int ONE_KB = 1024;
public static final int ONE_MB = ONE_KB * ONE_KB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Hashtable;
import java.util.Map;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import com.fasterxml.jackson.core.JsonFactory;
Expand Down Expand Up @@ -73,6 +74,11 @@ public static void init(AbfsConfiguration abfsConfiguration) {
tokenFetchRetryPolicy = abfsConfiguration.getOauthTokenFetchRetryPolicy();
}

@VisibleForTesting
public static void setTokenFetchRetryPolicy(ExponentialRetryPolicy retryPolicy) {
tokenFetchRetryPolicy = retryPolicy;
}

/**
* gets Azure Active Directory token using the user ID and password of
* a service principal (that is, Web App in Azure Active Directory).
Expand Down Expand Up @@ -255,7 +261,19 @@ public String getRequestId() {
return this.requestId;
}

protected HttpException(
/**
Constructs an instance of HttpException with detailed information about an HTTP error response.
This exception is designed to encapsulate details of an HTTP error response, providing context about the error
encountered during an HTTP operation. It includes the HTTP error code, the associated request ID, an error message,
the URL that triggered the error, the content type of the response, and the response body.
@param httpErrorCode The HTTP error code indicating the nature of the encountered error.
@param requestId The unique identifier associated with the corresponding HTTP request.
@param message A descriptive error message providing additional information about the encountered error.
@param url The URL that resulted in the HTTP error response.
@param contentType The content type of the HTTP response.
@param body The body of the HTTP response, containing more details about the error.
*/
public HttpException(
final int httpErrorCode,
final String requestId,
final String message,
Expand Down Expand Up @@ -383,7 +401,20 @@ private static boolean isRecoverableFailure(IOException e) {
|| e instanceof FileNotFoundException);
}

private static AzureADToken getTokenSingleCall(String authEndpoint,
/**
Retrieves an Azure OAuth token for authentication through a single API call.
This method facilitates the acquisition of an OAuth token from Azure Active Directory
to enable secure authentication for various services. It supports both Managed Service Identity (MSI)
tokens and non-MSI tokens based on the provided parameters.
@param authEndpoint The URL endpoint for OAuth token retrieval.
@param payload The payload to be included in the token request. This typically contains grant type and
any required parameters for token acquisition.
@param headers A Hashtable containing additional HTTP headers to be included in the token request.
@param httpMethod The HTTP method to be used for the token request (e.g., GET, POST).
@param isMsi A boolean flag indicating whether to request a Managed Service Identity (MSI) token or not.
@return An AzureADToken object containing the acquired OAuth token and associated metadata.
*/
public static AzureADToken getTokenSingleCall(String authEndpoint,
String payload, Hashtable<String, String> headers, String httpMethod,
boolean isMsi)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import java.net.HttpURLConnection;

import org.apache.hadoop.classification.VisibleForTesting;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_TOO_MANY_REQUESTS;

/**
* Abstract Class for Retry policy to be used by {@link AbfsClient}
Expand Down Expand Up @@ -57,6 +60,8 @@ public boolean shouldRetry(final int retryCount, final int statusCode) {
return retryCount < maxRetryCount
&& (statusCode < HTTP_CONTINUE
|| statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
|| statusCode == HttpURLConnection.HTTP_GONE
|| statusCode == HTTP_TOO_MANY_REQUESTS
|| (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
&& statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
&& statusCode != HttpURLConnection.HTTP_VERSION));
Expand Down Expand Up @@ -84,7 +89,8 @@ public String getAbbreviation() {
* Returns maximum number of retries allowed in this retry policy
* @return max retry count
*/
protected int getMaxRetryCount() {
@VisibleForTesting
public int getMaxRetryCount() {
return maxRetryCount;
}

Expand Down
2 changes: 1 addition & 1 deletion hadoop-tools/hadoop-azure/src/site/markdown/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ interval. Default value is 60000 (sixty seconds). Set the interval in milli
seconds.
* `fs.azure.oauth.token.fetch.retry.delta.backoff`: Back-off interval between
retries. Multiples of this timespan are used for subsequent retry attempts
. The default value is 2.
. The default value is 2000 (seconds). Set the interval in milliseconds.

### <a name="shared-key-auth"></a> Default: Shared Key

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator;
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_TOO_MANY_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS;
import static org.junit.Assume.assumeThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
Expand Down Expand Up @@ -90,4 +97,66 @@ private String getTrimmedPasswordString(AbfsConfiguration conf, String key,
return value.trim();
}

/**
* Verifies that MsiTokenProvider retries on HTTP 429 responses.
* Ensures shouldRetry returns true for 429 until the maximum retries are reached.
*/
@Test
public void testShouldRetryFor429() throws Exception {
ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy(
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS);
AzureADAuthenticator.setTokenFetchRetryPolicy(retryPolicy);
AtomicInteger attemptCounter = new AtomicInteger(0);

// Inner class to simulate MsiTokenProvider retry logic
class TestMsiTokenProvider extends MsiTokenProvider {
TestMsiTokenProvider(String endpoint, String tenant, String clientId, String authority) {
super(endpoint, tenant, clientId, authority);
}

@Override
public AzureADToken getToken() throws IOException {
int attempt = 0;
while (true) {
attempt++;
attemptCounter.incrementAndGet();

boolean retry = retryPolicy.shouldRetry(attempt - 1,
HTTP_TOO_MANY_REQUESTS);

// Validate shouldRetry returns true until the final attempt
if (attempt < retryPolicy.getMaxRetryCount()) {
Assertions.assertThat(retry)
.describedAs("Attempt %d: shouldRetry must be true for 429", attempt)
.isTrue();
// Simulate retry by continuing
} else {
// Final attempt: shouldRetry should now be false if this was last retry
Assertions.assertThat(retry)
.describedAs("Final attempt %d: shouldRetry can be false after max retries", attempt)
.isTrue(); // Still true because maxRetries not exceeded yet

// Return a valid fake token
AzureADToken token = new AzureADToken();
token.setAccessToken("fake-token");
token.setExpiry(new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)));
return token;
}
}
}
}
AccessTokenProvider tokenProvider = new TestMsiTokenProvider(
"https://fake-endpoint", "tenant", "clientId", "authority"
);
// Trigger token acquisition
AzureADToken token = tokenProvider.getToken();
// Assertions
assertEquals("fake-token", token.getAccessToken());
// If the status code doesn't qualify for retry shouldRetry returns false and the loop ends.
// It being called multiple times verifies that the retry was done for the throttling status code 429.
Assertions.assertThat(attemptCounter.get())
.describedAs("Number of retries should be equal to "
+ "max attempts for token fetch.")
.isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS);
}
}