-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-17377: ABFS: MsiTokenProvider doesn't retry HTTP 429/410 from the Instance Metadata Service #5273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-17377: ABFS: MsiTokenProvider doesn't retry HTTP 429/410 from the Instance Metadata Service #5273
Changes from 7 commits
3f6386a
56c6f7d
2c25b39
88e51b6
3be0b00
e04011c
6283410
cf73a70
e6740a3
0688809
115e88a
fe15f44
01ec633
4faa92d
24c0483
99e66fa
8b5e883
1472eb8
7ba573f
3f80d00
462a3b6
78329de
240965d
b0563a1
225541e
c7d4671
588ffb9
fd26066
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -321,8 +321,23 @@ | |
| <dependency> | ||
| <groupId>org.mockito</groupId> | ||
| <artifactId>mockito-core</artifactId> | ||
| <version>4.11.0</version> | ||
| <scope>test</scope> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
|
||
| <groupId>org.mockito</groupId> | ||
| <artifactId>mockito-inline</artifactId> | ||
| <version>4.11.0</version> | ||
|
||
| <scope>test</scope> | ||
| <exclusions> | ||
| <exclusion> | ||
| <groupId>org.mockito</groupId> | ||
| <artifactId>mockito-core</artifactId> | ||
| </exclusion> | ||
| </exclusions> | ||
| </dependency> | ||
|
|
||
| <dependency> | ||
| <groupId>org.apache.hadoop</groupId> | ||
| <artifactId>hadoop-minikdc</artifactId> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,13 @@ public class ExponentialRetryPolicy { | |
| */ | ||
| private static final double MAX_RANDOM_RATIO = 1.2; | ||
|
|
||
| /** | ||
| * Qualifies for retry based on | ||
anmolanmol1234 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * https://learn.microsoft.com/en-us/azure/active-directory/ | ||
| * managed-identities-azure-resources/how-to-use-vm-token#error-handling | ||
| */ | ||
| private static final int HTTP_TOO_MANY_REQUESTS = 429; | ||
|
||
|
|
||
| /** | ||
| * Holds the random number generator used to calculate randomized backoff intervals | ||
| */ | ||
|
|
@@ -123,6 +130,9 @@ public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final | |
| * and the current strategy. The valid http status code lies in the range of 1xx-5xx. | ||
| * But an invalid status code might be set due to network or timeout kind of issues. | ||
| * Such invalid status code also qualify for retry. | ||
| * HTTP status code 410 qualifies for retry based on | ||
| * https://docs.microsoft.com/en-in/azure/virtual-machines/linux/ | ||
| * instance-metadata-service?tabs=windows#errors-and-debugging | ||
| * | ||
| * @param retryCount The current retry attempt count. | ||
| * @param statusCode The status code of the response, or -1 for socket error. | ||
|
|
@@ -132,6 +142,8 @@ public boolean shouldRetry(final int retryCount, final int statusCode) { | |
| return retryCount < this.retryCount | ||
| && (statusCode < HTTP_CONTINUE | ||
| || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT | ||
| || statusCode == HttpURLConnection.HTTP_GONE | ||
| || statusCode == HTTP_TOO_MANY_REQUESTS | ||
anujmodi2021 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR | ||
| && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED | ||
| && statusCode != HttpURLConnection.HTTP_VERSION)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,15 +19,22 @@ | |
| package org.apache.hadoop.fs.azurebfs; | ||
|
|
||
| import java.io.IOException; | ||
| import java.lang.reflect.Field; | ||
| import java.net.HttpURLConnection; | ||
| import java.util.Date; | ||
|
|
||
| import org.junit.Test; | ||
| import org.mockito.MockedStatic; | ||
| import org.mockito.Mockito; | ||
|
|
||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; | ||
| import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator; | ||
| import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken; | ||
| import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider; | ||
| import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; | ||
|
|
||
| import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS; | ||
| import static org.apache.hadoop.test.LambdaTestUtils.intercept; | ||
| import static org.junit.Assume.assumeThat; | ||
| import static org.hamcrest.CoreMatchers.is; | ||
| import static org.hamcrest.CoreMatchers.not; | ||
|
|
@@ -40,13 +47,16 @@ | |
| import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT; | ||
| import static org.mockito.Mockito.times; | ||
|
|
||
| /** | ||
| * Test MsiTokenProvider. | ||
| */ | ||
| public final class ITestAbfsMsiTokenProvider | ||
| extends AbstractAbfsIntegrationTest { | ||
|
|
||
| private static final int HTTP_TOO_MANY_REQUESTS = 429; | ||
|
||
|
|
||
| public ITestAbfsMsiTokenProvider() throws Exception { | ||
| super(); | ||
| } | ||
|
|
@@ -90,4 +100,109 @@ private String getTrimmedPasswordString(AbfsConfiguration conf, String key, | |
| return value.trim(); | ||
| } | ||
|
|
||
| /** | ||
| * Test to verify that token fetch is retried for throttling errors (too many requests 429). | ||
| * @throws Exception | ||
steveloughran marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| */ | ||
| @Test | ||
| public void testRetryForThrottling() throws Exception { | ||
| AbfsConfiguration conf = getConfiguration(); | ||
|
|
||
| // Exception to be thrown with throttling error code 429. | ||
| AzureADAuthenticator.HttpException httpException | ||
| = new AzureADAuthenticator.HttpException(HTTP_TOO_MANY_REQUESTS, | ||
| "abc", "abc", "abc", "abc", "abc"); | ||
|
|
||
| String tenantGuid = "abcd"; | ||
| String clientId = "abcd"; | ||
| String authEndpoint = getTrimmedPasswordString(conf, | ||
steveloughran marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT, | ||
| DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT); | ||
| String authority = getTrimmedPasswordString(conf, | ||
| FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY, | ||
| DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY); | ||
|
|
||
| // Mock the getTokenSingleCall to throw exception so the retry logic comes into place. | ||
| try (MockedStatic<AzureADAuthenticator> adAuthenticator = Mockito.mockStatic( | ||
| AzureADAuthenticator.class, Mockito.CALLS_REAL_METHODS)) { | ||
| adAuthenticator.when( | ||
| () -> AzureADAuthenticator.getTokenSingleCall(Mockito.anyString(), | ||
| Mockito.anyString(), Mockito.any(), Mockito.anyString(), | ||
| Mockito.anyBoolean())).thenThrow(httpException); | ||
|
|
||
| // Mock the tokenFetchRetryPolicy to verify retries. | ||
| ExponentialRetryPolicy exponentialRetryPolicy = Mockito.spy( | ||
| conf.getOauthTokenFetchRetryPolicy()); | ||
| Field tokenFetchRetryPolicy = AzureADAuthenticator.class.getDeclaredField( | ||
|
||
| "tokenFetchRetryPolicy"); | ||
| tokenFetchRetryPolicy.setAccessible(true); | ||
| tokenFetchRetryPolicy.set(ExponentialRetryPolicy.class, | ||
| exponentialRetryPolicy); | ||
|
|
||
| AccessTokenProvider tokenProvider = new MsiTokenProvider(authEndpoint, | ||
| tenantGuid, clientId, authority); | ||
| AzureADToken token = null; | ||
| intercept(AzureADAuthenticator.HttpException.class, | ||
| tokenProvider::getToken); | ||
|
|
||
| // If the status code doesn't qualify for retry shouldRetry returns false and the loop ends. | ||
| // It being called multiple times verifies that the retry was done for the throttling status code 429. | ||
| Mockito.verify(exponentialRetryPolicy, | ||
|
||
| times(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS + 1)) | ||
| .shouldRetry(Mockito.anyInt(), Mockito.anyInt()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Test to verify that token fetch is not retried for resource not found errors. | ||
| * @throws Exception | ||
| */ | ||
| @Test | ||
| public void testNoRetryForResourceNotFound() throws Exception { | ||
| AbfsConfiguration conf = getConfiguration(); | ||
|
|
||
| // Exception to be thrown 404 error code. | ||
| AzureADAuthenticator.HttpException httpException | ||
| = new AzureADAuthenticator.HttpException(HttpURLConnection.HTTP_NOT_FOUND, | ||
| "abc", "abc", "abc", "abc", "abc"); | ||
|
|
||
| String tenantGuid = "abcd"; | ||
| String clientId = "abcd"; | ||
| String authEndpoint = getTrimmedPasswordString(conf, | ||
| FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT, | ||
| DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT); | ||
| String authority = getTrimmedPasswordString(conf, | ||
| FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY, | ||
| DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY); | ||
|
|
||
| // Mock the getTokenSingleCall to throw exception. | ||
| try (MockedStatic<AzureADAuthenticator> adAuthenticator = Mockito.mockStatic( | ||
| AzureADAuthenticator.class, Mockito.CALLS_REAL_METHODS)) { | ||
| adAuthenticator.when( | ||
| () -> AzureADAuthenticator.getTokenSingleCall(Mockito.anyString(), | ||
| Mockito.anyString(), Mockito.any(), Mockito.anyString(), | ||
| Mockito.anyBoolean())).thenThrow(httpException); | ||
|
|
||
| // Mock the tokenFetchRetryPolicy to verify no retries. | ||
| ExponentialRetryPolicy exponentialRetryPolicy = Mockito.spy( | ||
| conf.getOauthTokenFetchRetryPolicy()); | ||
| Field tokenFetchRetryPolicy = AzureADAuthenticator.class.getDeclaredField( | ||
| "tokenFetchRetryPolicy"); | ||
| tokenFetchRetryPolicy.setAccessible(true); | ||
| tokenFetchRetryPolicy.set(ExponentialRetryPolicy.class, | ||
| exponentialRetryPolicy); | ||
|
|
||
| AccessTokenProvider tokenProvider = new MsiTokenProvider(authEndpoint, | ||
| tenantGuid, clientId, authority); | ||
| AzureADToken token = null; | ||
| intercept(AzureADAuthenticator.HttpException.class, | ||
| tokenProvider::getToken); | ||
|
|
||
| // If the status code doesn't qualify for retry shouldRetry returns false and the loop ends. | ||
| // It being called only once verifies that retry doesn't come into place.. | ||
| Mockito.verify(exponentialRetryPolicy, | ||
| times(1)) | ||
| .shouldRetry(Mockito.anyInt(), Mockito.anyInt()); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, hadoop-project defines the version, and through properties. revert this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, cut this now; the version in hadoop project is the one you now expect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken