Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
eb7b0a0
Code changes to retry on next preferred region for metadata reads on …
kundadebdatta Nov 9, 2023
ec125de
Code changes to add retry for PK Ranges call.
kundadebdatta Dec 6, 2023
483cc45
Code changes to mark endpoint unavailable for read when cosmos except…
kundadebdatta Dec 7, 2023
cc4657f
Code changes to fix unit tests. Added global endpoint manager in Pk R…
kundadebdatta Dec 8, 2023
dbee389
Code changes to fix unit tests.
kundadebdatta Dec 8, 2023
505ee41
Code changes to fix build break.
kundadebdatta Dec 8, 2023
77bc01d
Minor code clean-up.
kundadebdatta Dec 8, 2023
c26bbb9
Code changes to capture metadata location endpoint within on before s…
kundadebdatta Dec 19, 2023
416cb6e
Code changes to address review comments.
kundadebdatta Dec 19, 2023
ba31430
Code changes to fix build failure.
kundadebdatta Dec 19, 2023
dab70d0
Code changes to refactor metadata timeout policy.
kundadebdatta Dec 20, 2023
2e4cfc7
Code changes to add retry for request timeout. Fix emulator tests.
kundadebdatta Dec 20, 2023
697f9be
Code changes to add metadata retry policy unit tests.
kundadebdatta Dec 21, 2023
bcb2222
Code changes to add more tests.
kundadebdatta Dec 21, 2023
621bd64
Merge branch 'master' into users/kundadebdatta/4181_retry_metadata_re…
kundadebdatta Dec 21, 2023
0204173
Code changes to refactor metadata retry policy logic to increment loc…
kundadebdatta Dec 22, 2023
6724c77
Merge branch 'master' into users/kundadebdatta/4181_retry_metadata_re…
kundadebdatta Dec 22, 2023
b507ed2
Code changes to address review comments.
kundadebdatta Dec 22, 2023
2f427e3
Code changes to address review comments.
kundadebdatta Dec 29, 2023
a20af65
Code changes to add separate condition for pk range requests.
kundadebdatta Dec 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
}
}

// Any metadata request will throw a cosmos exception from CosmosHttpClientCore if
// it receives a 503 service unavailable from gateway. This check is to add retry
// mechanism for the metadata requests in such cases.
if (exception is CosmosException cosmosException)
{
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosException.StatusCode,
cosmosException.Headers.SubStatusCode);
if (shouldRetryResult != null)
{
return shouldRetryResult;
}
}

return await this.throttlingRetry.ShouldRetryAsync(exception, cancellationToken);
}

Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ private async Task OpenPrivateAsync(CancellationToken cancellationToken)
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);

DefaultTrace.TraceWarning("{0} occurred while OpenAsync. Exception Message: {1}", ex.ToString(), ex.Message);
}
Expand Down Expand Up @@ -1033,7 +1033,7 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);
Expand Down
13 changes: 10 additions & 3 deletions Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ public static HttpTimeoutPolicy GetTimeoutPolicy(
return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout;
}

//Partition Key Requests
if (documentServiceRequest.ResourceType == ResourceType.PartitionKeyRange)
//Get Partition Key Range Requests
if (documentServiceRequest.ResourceType == ResourceType.PartitionKeyRange
&& documentServiceRequest.OperationType == OperationType.ReadFeed)
{
return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout;
}

//Get Addresses Requests
if (documentServiceRequest.ResourceType == ResourceType.Address)
{
return HttpTimeoutPolicyControlPlaneRetriableHotPath.Instance;
}
Expand All @@ -44,7 +51,7 @@ public static HttpTimeoutPolicy GetTimeoutPolicy(
//Meta Data Read
if (HttpTimeoutPolicy.IsMetaData(documentServiceRequest) && documentServiceRequest.IsReadOnlyRequest)
{
return HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout;
return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout;
}

//Default behavior
Expand Down
190 changes: 190 additions & 0 deletions Microsoft.Azure.Cosmos/src/MetadataRequestThrottleRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

/// <summary>
/// Metadata Request Throttle Retry Policy is combination of endpoint change retry + throttling retry.
/// </summary>
internal sealed class MetadataRequestThrottleRetryPolicy : IDocumentClientRetryPolicy
{
/// <summary>
/// A constant integer defining the default maximum retry wait time in seconds.
/// </summary>
private const int DefaultMaxWaitTimeInSeconds = 60;

/// <summary>
/// A constant integer defining the default maximum retry count on service unavailable.
/// </summary>
private const int DefaultMaxServiceUnavailableRetryCount = 1;

/// <summary>
/// An instance of <see cref="IGlobalEndpointManager"/>.
/// </summary>
private readonly IGlobalEndpointManager globalEndpointManager;

/// <summary>
/// Defines the throttling retry policy that is used as the underlying retry policy.
/// </summary>
private readonly IDocumentClientRetryPolicy throttlingRetryPolicy;

/// <summary>
/// An integer defining the maximum retry count on service unavailable.
/// </summary>
private readonly int maxServiceUnavailableRetryCount;

/// <summary>
/// An instance of <see cref="Uri"/> containing the location endpoint where the partition key
/// range http request will be sent over.
/// </summary>
private MetadataRetryContext retryContext;

/// <summary>
/// An integer capturing the current retry count on service unavailable.
/// </summary>
private int serviceUnavailableRetryCount;

/// <summary>
/// The constructor to initialize an instance of <see cref="MetadataRequestThrottleRetryPolicy"/>.
/// </summary>
/// <param name="endpointManager">An instance of <see cref="GlobalEndpointManager"/></param>
/// <param name="maxRetryAttemptsOnThrottledRequests">An integer defining the maximum number
/// of attempts to retry when requests are throttled.</param>
/// <param name="maxRetryWaitTimeInSeconds">An integer defining the maximum wait time in seconds.</param>
public MetadataRequestThrottleRetryPolicy(
IGlobalEndpointManager endpointManager,
int maxRetryAttemptsOnThrottledRequests,
int maxRetryWaitTimeInSeconds = DefaultMaxWaitTimeInSeconds)
{
this.globalEndpointManager = endpointManager;
this.maxServiceUnavailableRetryCount = Math.Max(
MetadataRequestThrottleRetryPolicy.DefaultMaxServiceUnavailableRetryCount,
this.globalEndpointManager.PreferredLocationCount);

this.throttlingRetryPolicy = new ResourceThrottleRetryPolicy(
maxRetryAttemptsOnThrottledRequests,
maxRetryWaitTimeInSeconds);

this.retryContext = new MetadataRetryContext
{
RetryLocationIndex = 0,
RetryRequestOnPreferredLocations = true,
};
}

/// <summary>
/// Should the caller retry the operation.
/// </summary>
/// <param name="exception">Exception that occured when the operation was tried</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
/// <returns>True indicates caller should retry, False otherwise</returns>
public Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
if (exception is CosmosException cosmosException
&& cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable
&& cosmosException.Headers.SubStatusCode == SubStatusCodes.TransportGenerated503)
{
if (this.IncrementRetryIndexOnServiceUnavailableForMetadataRead())
{
return Task.FromResult(ShouldRetryResult.RetryAfter(TimeSpan.Zero));
}
}

return this.throttlingRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

/// <summary>
/// Should the caller retry the operation.
/// </summary>
/// <param name="cosmosResponseMessage"><see cref="ResponseMessage"/> in return of the request</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
/// <returns>True indicates caller should retry, False otherwise</returns>
public Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
if (cosmosResponseMessage?.StatusCode == HttpStatusCode.ServiceUnavailable
&& cosmosResponseMessage?.Headers?.SubStatusCode == SubStatusCodes.TransportGenerated503)
{
if (this.IncrementRetryIndexOnServiceUnavailableForMetadataRead())
{
return Task.FromResult(ShouldRetryResult.RetryAfter(TimeSpan.Zero));
}
}

return this.throttlingRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

/// <summary>
/// Method that is called before a request is sent to allow the retry policy implementation
/// to modify the state of the request.
/// </summary>
/// <param name="request">The request being sent to the service.</param>
public void OnBeforeSendRequest(DocumentServiceRequest request)
{
// Clear the previous location-based routing directive.
request.RequestContext.ClearRouteToLocation();
request.RequestContext.RouteToLocation(
this.retryContext.RetryLocationIndex,
this.retryContext.RetryRequestOnPreferredLocations);

Uri metadataLocationEndpoint = this.globalEndpointManager.ResolveServiceEndpoint(request);

DefaultTrace.TraceInformation("MetadataRequestThrottleRetryPolicy: Routing the metadata request to: {0} for operation type: {1} and resource type: {2}.", metadataLocationEndpoint, request.OperationType, request.ResourceType);
request.RequestContext.RouteToLocation(metadataLocationEndpoint);
}

/// <summary>
/// Increments the location index when a service unavailable exception ocurrs, for any future read requests.
/// </summary>
/// <returns>A boolean flag indicating if the operation was successful.</returns>
private bool IncrementRetryIndexOnServiceUnavailableForMetadataRead()
{
if (this.serviceUnavailableRetryCount++ >= this.maxServiceUnavailableRetryCount)
{
DefaultTrace.TraceWarning("MetadataRequestThrottleRetryPolicy: Retry count: {0} has exceeded the maximum permitted retry count on service unavailable: {1}.", this.serviceUnavailableRetryCount, this.maxServiceUnavailableRetryCount);
return false;
}

// Retrying on second PreferredLocations.
// RetryCount is used as zero-based index.
DefaultTrace.TraceWarning("MetadataRequestThrottleRetryPolicy: Incrementing the metadata retry location index to: {0}.", this.serviceUnavailableRetryCount);
this.retryContext = new MetadataRetryContext()
{
RetryLocationIndex = this.serviceUnavailableRetryCount,
RetryRequestOnPreferredLocations = true,
};

return true;
}

/// <summary>
/// A helper class containing the required attributes for
/// metadata retry context.
/// </summary>
internal sealed class MetadataRetryContext
{
/// <summary>
/// An integer defining the current retry location index.
/// </summary>
public int RetryLocationIndex { get; set; }

/// <summary>
/// A boolean flag indicating if the request should retry on
/// preferred locations.
/// </summary>
public bool RetryRequestOnPreferredLocations { get; set; }
}
}
}
62 changes: 24 additions & 38 deletions Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@ internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingM
private readonly ICosmosAuthorizationTokenProvider authorizationTokenProvider;
private readonly IStoreModel storeModel;
private readonly CollectionCache collectionCache;
private readonly IGlobalEndpointManager endpointManager;

public PartitionKeyRangeCache(
ICosmosAuthorizationTokenProvider authorizationTokenProvider,
IStoreModel storeModel,
CollectionCache collectionCache)
CollectionCache collectionCache,
IGlobalEndpointManager endpointManager)
{
this.routingMapCache = new AsyncCacheNonBlocking<string, CollectionRoutingMap>(
keyEqualityComparer: StringComparer.Ordinal);
this.authorizationTokenProvider = authorizationTokenProvider;
this.storeModel = storeModel;
this.collectionCache = collectionCache;
this.endpointManager = endpointManager;
}

public virtual async Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRangesAsync(
Expand Down Expand Up @@ -121,10 +124,10 @@ public virtual async Task<CollectionRoutingMap> TryLookupAsync(
return await this.routingMapCache.GetAsync(
key: collectionRid,
singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync(
collectionRid,
previousValue,
trace,
request?.RequestContext?.ClientRequestStatistics),
collectionRid: collectionRid,
previousRoutingMap: previousValue,
trace: trace,
clientSideRequestStatistics: request?.RequestContext?.ClientRequestStatistics),
forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue));
}
catch (DocumentClientException ex)
Expand Down Expand Up @@ -174,35 +177,6 @@ private static bool ShouldForceRefresh(
return previousValue.ChangeFeedNextIfNoneMatch == currentValue.ChangeFeedNextIfNoneMatch;
}

public async Task<PartitionKeyRange> TryGetRangeByPartitionKeyRangeIdAsync(string collectionRid,
string partitionKeyRangeId,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics)
{
try
{
CollectionRoutingMap routingMap = await this.routingMapCache.GetAsync(
key: collectionRid,
singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync(
collectionRid: collectionRid,
previousRoutingMap: null,
trace: trace,
clientSideRequestStatistics: clientSideRequestStatistics),
forceRefresh: (_) => false);

return routingMap.TryGetRangeByPartitionKeyRangeId(partitionKeyRangeId);
}
catch (DocumentClientException ex)
{
if (ex.StatusCode == HttpStatusCode.NotFound)
{
return null;
}

throw;
}
}

private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
string collectionRid,
CollectionRoutingMap previousRoutingMap,
Expand All @@ -213,6 +187,12 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
string changeFeedNextIfNoneMatch = previousRoutingMap?.ChangeFeedNextIfNoneMatch;

HttpStatusCode lastStatusCode = HttpStatusCode.OK;

RetryOptions retryOptions = new RetryOptions();
MetadataRequestThrottleRetryPolicy metadataRetryPolicy = new (
endpointManager: this.endpointManager,
maxRetryAttemptsOnThrottledRequests: retryOptions.MaxRetryAttemptsOnThrottledRequests,
maxRetryWaitTimeInSeconds: retryOptions.MaxRetryWaitTimeInSeconds);
do
{
INameValueCollection headers = new RequestNameValueCollection();
Expand All @@ -224,10 +204,9 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
headers.Set(HttpConstants.HttpHeaders.IfNoneMatch, changeFeedNextIfNoneMatch);
}

RetryOptions retryOptions = new RetryOptions();
using (DocumentServiceResponse response = await BackoffRetryUtility<DocumentServiceResponse>.ExecuteAsync(
() => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics),
new ResourceThrottleRetryPolicy(retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds)))
() => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics, metadataRetryPolicy),
retryPolicy: metadataRetryPolicy))
{
lastStatusCode = response.StatusCode;
changeFeedNextIfNoneMatch = response.Headers[HttpConstants.HttpHeaders.ETag];
Expand Down Expand Up @@ -274,7 +253,8 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFeedAsync(string collectionRid,
INameValueCollection headers,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics)
IClientSideRequestStatistics clientSideRequestStatistics,
IDocumentClientRetryPolicy retryPolicy)
{
using (ITrace childTrace = trace.StartChild("Read PartitionKeyRange Change Feed", TraceComponent.Transport, Tracing.TraceLevel.Info))
{
Expand All @@ -285,6 +265,7 @@ private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFe
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
retryPolicy.OnBeforeSendRequest(request);
string authorizationToken = null;
try
{
Expand Down Expand Up @@ -333,6 +314,11 @@ private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFe
childTrace.AddDatum("Exception Message", ex.Message);
throw;
}
catch (CosmosException ce)
{
childTrace.AddDatum("Exception Message", ce.Message);
throw;
}
}
}
}
Expand Down
Loading