Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Microsoft.Azure.Cosmos
internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInternal
{
private const string HedgeContext = "Hedge Context";
private const string ResponseRegion = "Response Region";
private const string HedgeConfig = "Hedge Config";

/// <summary>
/// Latency threshold which activates the first region hedging
Expand All @@ -44,6 +44,8 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
/// </summary>
public bool EnableMultiWriteRegionHedge { get; private set; }

private readonly string HedgeConfigText;

/// <summary>
/// Constructor for hedging availability strategy
/// </summary>
Expand All @@ -68,6 +70,8 @@ public CrossRegionHedgingAvailabilityStrategy(
this.Threshold = threshold;
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
this.EnableMultiWriteRegionHedge = enableMultiWriteRegionHedge;

this.HedgeConfigText = $"t:{this.Threshold.TotalMilliseconds}ms, s:{this.ThresholdStep.TotalMilliseconds}ms, w:{this.EnableMultiWriteRegionHedge}";
}

/// <inheritdoc/>
Expand Down Expand Up @@ -134,13 +138,12 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
: await StreamExtension.AsClonableStreamAsync(request.Content)))
{
IReadOnlyCollection<string> hedgeRegions = client.DocumentClient.GlobalEndpointManager
.GetApplicableRegions(
request.RequestOptions?.ExcludeRegions,
OperationTypeExtensions.IsReadOperation(request.OperationType));
.GetApplicableRegions(
request.RequestOptions?.ExcludeRegions,
OperationTypeExtensions.IsReadOperation(request.OperationType));

List<Task> requestTasks = new List<Task>(hedgeRegions.Count + 1);

Task<HedgingResponse> primaryRequest = null;
HedgingResponse hedgeResponse = null;

//Send out hedged requests
Expand All @@ -153,33 +156,17 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
CancellationToken timerToken = timerTokenSource.Token;
using (Task hedgeTimer = Task.Delay(awaitTime, timerToken))
{
if (requestNumber == 0)
{
primaryRequest = this.RequestSenderAndResultCheckAsync(
sender,
request,
hedgeRegions.ElementAt(requestNumber),
cancellationToken,
cancellationTokenSource,
trace);

requestTasks.Add(primaryRequest);
}
else
{
Task<HedgingResponse> requestTask = this.CloneAndSendAsync(
sender: sender,
request: request,
clonedBody: clonedBody,
hedgeRegions: hedgeRegions,
requestNumber: requestNumber,
trace: trace,
cancellationToken: cancellationToken,
cancellationTokenSource: cancellationTokenSource);

requestTasks.Add(requestTask);
}

Task<HedgingResponse> requestTask = this.CloneAndSendAsync(
sender: sender,
request: request,
clonedBody: clonedBody,
hedgeRegions: hedgeRegions,
requestNumber: requestNumber,
trace: trace,
cancellationToken: cancellationToken,
cancellationTokenSource: cancellationTokenSource);

requestTasks.Add(requestTask);
requestTasks.Add(hedgeTimer);

Task completedTask = await Task.WhenAny(requestTasks);
Expand All @@ -202,13 +189,14 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
if (hedgeResponse.IsNonTransient)
{
cancellationTokenSource.Cancel();

((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeConfig,
this.HedgeConfigText);
//Take is not inclusive, so we need to add 1 to the request number which starts at 0
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
hedgeRegions.Take(requestNumber + 1));
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
ResponseRegion,
hedgeResponse.ResponseRegion);
return hedgeResponse.ResponseMessage;
}
}
Expand All @@ -231,12 +219,12 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
if (hedgeResponse.IsNonTransient || requestTasks.Count == 0)
{
cancellationTokenSource.Cancel();
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeConfig,
this.HedgeConfigText);
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
hedgeRegions);
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
ResponseRegion,
hedgeResponse.ResponseRegion);
return hedgeResponse.ResponseMessage;
}
}
Expand Down Expand Up @@ -270,15 +258,17 @@ private async Task<HedgingResponse> CloneAndSendAsync(
{
clonedRequest.RequestOptions ??= new RequestOptions();

List<string> excludeRegions = new List<string>(hedgeRegions);
string region = excludeRegions[requestNumber];
excludeRegions.RemoveAt(requestNumber);
clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;
//we do not want to exclude any regions for the primary request
if (requestNumber > 0)
{
List<string> excludeRegions = new List<string>(hedgeRegions);
excludeRegions.RemoveAt(requestNumber);
clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;
}

return await this.RequestSenderAndResultCheckAsync(
sender,
clonedRequest,
region,
cancellationToken,
cancellationTokenSource,
trace);
Expand All @@ -288,7 +278,6 @@ private async Task<HedgingResponse> CloneAndSendAsync(
private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
RequestMessage request,
string hedgedRegion,
CancellationToken cancellationToken,
CancellationTokenSource cancellationTokenSource,
ITrace trace)
Expand All @@ -303,12 +292,12 @@ private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
cancellationTokenSource.Cancel();
}

return new HedgingResponse(true, response, hedgedRegion);
return new HedgingResponse(true, response);
}

return new HedgingResponse(false, response, hedgedRegion);
return new HedgingResponse(false, response);
}
catch (OperationCanceledException oce ) when (cancellationTokenSource.IsCancellationRequested)
catch (OperationCanceledException oce) when (cancellationTokenSource.IsCancellationRequested)
{
throw new CosmosOperationCanceledException(oce, trace);
}
Expand Down Expand Up @@ -348,13 +337,11 @@ private sealed class HedgingResponse
{
public readonly bool IsNonTransient;
public readonly ResponseMessage ResponseMessage;
public readonly string ResponseRegion;

public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage, string responseRegion)
public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage)
{
this.IsNonTransient = isNonTransient;
this.ResponseMessage = responseMessage;
this.ResponseRegion = responseRegion;
}
}
}
Expand Down
Loading
Loading