Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
<Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<ClientOfficialVersion>3.46.0</ClientOfficialVersion>
<ClientOfficialVersion>3.46.1</ClientOfficialVersion>
<ClientPreviewVersion>3.47.0</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview.0</ClientPreviewSuffixVersion>
<DirectVersion>3.37.1</DirectVersion>
<ClientPreviewSuffixVersion>preview.1</ClientPreviewSuffixVersion>
<DirectVersion>3.37.5</DirectVersion>
<FaultInjectionVersion>1.0.0</FaultInjectionVersion>
<FaultInjectionSuffixVersion>beta.0</FaultInjectionSuffixVersion>
<EncryptionOfficialVersion>2.0.4</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.1.0</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview4</EncryptionPreviewSuffixVersion>
Expand Down
1,655 changes: 1,655 additions & 0 deletions Microsoft.Azure.Cosmos/contracts/API_3.46.1.txt

Large diffs are not rendered by default.

1,752 changes: 1,752 additions & 0 deletions Microsoft.Azure.Cosmos/contracts/API_3.47.0-preview.1.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
this.canUseMultipleWriteLocations = this.globalEndpointManager.CanUseMultipleWriteLocations(request);
this.documentServiceRequest = request;
this.isMultiMasterWriteRequest = !this.isReadRequest
&& (this.globalEndpointManager?.CanSupportMultipleWriteLocations(request) ?? false);
&& (this.globalEndpointManager?.CanSupportMultipleWriteLocations(request.ResourceType, request.OperationType) ?? false);

// clear previous location-based routing directive
request.RequestContext.ClearRouteToLocation();
Expand Down
4 changes: 0 additions & 4 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider

private readonly bool IsLocalQuorumConsistency = false;
private readonly bool isReplicaAddressValidationEnabled;
private readonly AvailabilityStrategy availabilityStrategy;

//Fault Injection
private readonly IChaosInterceptorFactory chaosInterceptorFactory;
Expand Down Expand Up @@ -441,7 +440,6 @@ internal DocumentClient(Uri serviceEndpoint,
/// <param name="cosmosClientId"></param>
/// <param name="remoteCertificateValidationCallback">This delegate responsible for validating the third party certificate. </param>
/// <param name="cosmosClientTelemetryOptions">This is distributed tracing flag</param>
/// <param name="availabilityStrategy">This is the availability strategy for the client</param>"
/// <param name="chaosInterceptorFactory">This is the chaos interceptor used for fault injection</param>
/// <remarks>
/// The service endpoint can be obtained from the Azure Management Portal.
Expand Down Expand Up @@ -471,7 +469,6 @@ internal DocumentClient(Uri serviceEndpoint,
string cosmosClientId = null,
RemoteCertificateValidationCallback remoteCertificateValidationCallback = null,
CosmosClientTelemetryOptions cosmosClientTelemetryOptions = null,
AvailabilityStrategy availabilityStrategy = null,
IChaosInterceptorFactory chaosInterceptorFactory = null)
{
if (sendingRequestEventArgs != null)
Expand All @@ -495,7 +492,6 @@ internal DocumentClient(Uri serviceEndpoint,
this.transportClientHandlerFactory = transportClientHandlerFactory;
this.IsLocalQuorumConsistency = isLocalQuorumConsistency;
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);
this.availabilityStrategy = availabilityStrategy;
this.chaosInterceptorFactory = chaosInterceptorFactory;
this.chaosInterceptor = chaosInterceptorFactory?.CreateInterceptor(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public override async Task<ResponseMessage> SendAsync(
&& response.Content != null
&& response.Content is not CloneableStream)
{
response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default);
response.Content = await StreamExtension.AsClonableStreamAsync(
mediaStream: response.Content,
allowUnsafeDataAccess: true);
}

return response;
Expand Down
25 changes: 25 additions & 0 deletions Microsoft.Azure.Cosmos/src/Regions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -439,5 +439,30 @@ public static class Regions
/// Name of the Azure South Central US 2 region in the Azure Cosmos DB service.
/// </summary>
public const string SouthCentralUS2 = "South Central US 2";

/// <summary>
/// Name of the Azure Israel Northwest region in the Azure Cosmos DB service.
/// </summary>
public const string IsraelNorthwest = "Israel Northwest";

/// <summary>
/// Name of the Azure Belgium Central region in the Azure Cosmos DB service.
/// </summary>
public const string BelgiumCentral = "Belgium Central";

/// <summary>
/// Name of the Azure Denmark East region in the Azure Cosmos DB service.
/// </summary>
public const string DenmarkEast = "Denmark East";

/// <summary>
/// Name of the Azure Southeast US 3 region in the Azure Cosmos DB service.
/// </summary>
public const string SoutheastUS3 = "Southeast US 3";

/// <summary>
/// Name of the Azure Southeast US 5 region in the Azure Cosmos DB service.
/// </summary>
public const string SoutheastUS5 = "Southeast US 5";
}
}
1 change: 0 additions & 1 deletion Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ internal static CosmosClientContext Create(
cosmosClientId: cosmosClient.Id,
remoteCertificateValidationCallback: ClientContextCore.SslCustomValidationCallBack(clientOptions.GetServerCertificateCustomValidationCallback()),
cosmosClientTelemetryOptions: clientOptions.CosmosClientTelemetryOptions,
availabilityStrategy: clientOptions.AvailabilityStrategy,
chaosInterceptorFactory: clientOptions.ChaosInterceptorFactory);

return ClientContextCore.Create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,14 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
string resourceUri = this.GetResourceUri(requestOptions, operationType, itemId);

// Convert Text to Binary Stream.
streamPayload = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
inputStream: streamPayload == null ? null : await StreamExtension.AsClonableStreamAsync(streamPayload));
if (ConfigurationManager.IsBinaryEncodingEnabled())
{
streamPayload = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
inputStream: streamPayload == null ? null : await StreamExtension.AsClonableStreamAsync(
mediaStream: streamPayload,
allowUnsafeDataAccess: true));
}

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceUri,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ internal static AvailabilityStrategy DisabledStrategy()
/// </summary>
/// <param name="threshold"> how long before SDK begins hedging</param>
/// <param name="thresholdStep">Period of time between first hedge and next hedging attempts</param>
/// <param name="enableMultiWriteRegionHedge">Whether hedging for write requests on accounts with multi-region writes are enabled
/// Note that this does come with the caveat that there will be more 409 / 412 errors thrown by the SDK.
/// This is expected and applications that adopt this feature should be prepared to handle these exceptions.
/// Application might not be able to be deterministic on Create vs Replace in the case of Upsert Operations</param>
/// <returns>something</returns>
public static AvailabilityStrategy CrossRegionHedgingStrategy(TimeSpan threshold,
TimeSpan? thresholdStep)
public static AvailabilityStrategy CrossRegionHedgingStrategy(
TimeSpan threshold,
TimeSpan? thresholdStep,
bool enableMultiWriteRegionHedge = false)
{
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep);
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep, enableMultiWriteRegionHedge);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,24 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
/// </summary>
public TimeSpan ThresholdStep { get; private set; }

/// <summary>
/// Whether hedging for write requests on accounts with multi-region writes is enabled.
/// Note that this does come with the caveat that there will be more 409 / 412 errors thrown by the SDK.
/// This is expected and applications that adopt this feature should be prepared to handle these exceptions.
/// Application might not be able to be deterministic on Create vs Replace in the case of Upsert Operations
/// </summary>
public bool EnableMultiWriteRegionHedge { get; private set; }

/// <summary>
/// Constructor for hedging availability strategy
/// </summary>
/// <param name="threshold"></param>
/// <param name="thresholdStep"></param>
/// <param name="enableMultiWriteRegionHedge"></param>
public CrossRegionHedgingAvailabilityStrategy(
TimeSpan threshold,
TimeSpan? thresholdStep)
TimeSpan? thresholdStep,
bool enableMultiWriteRegionHedge = false)
{
if (threshold <= TimeSpan.Zero)
{
Expand All @@ -57,6 +67,7 @@ public CrossRegionHedgingAvailabilityStrategy(

this.Threshold = threshold;
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
this.EnableMultiWriteRegionHedge = enableMultiWriteRegionHedge;
}

/// <inheritdoc/>
Expand All @@ -70,18 +81,24 @@ internal override bool Enabled()
/// This availability strategy can only be used if the request is a read-only request on a document request.
/// </summary>
/// <param name="request"></param>
/// <param name="client"></param>
/// <returns>whether the request should be a hedging request.</returns>
internal bool ShouldHedge(RequestMessage request)
internal bool ShouldHedge(RequestMessage request, CosmosClient client)
{
//Only use availability strategy for document point operations
if (request.ResourceType != ResourceType.Document)
{
return false;
}

//check to see if it is a not a read-only request
//check to see if it is a not a read-only request/ if multimaster writes are enabled
if (!OperationTypeExtensions.IsReadOperation(request.OperationType))
{
if (this.EnableMultiWriteRegionHedge
&& client.DocumentClient.GlobalEndpointManager.CanSupportMultipleWriteLocations(request.ResourceType, request.OperationType))
{
return true;
}
return false;
}

Expand All @@ -102,7 +119,7 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
RequestMessage request,
CancellationToken cancellationToken)
{
if (!this.ShouldHedge(request)
if (!this.ShouldHedge(request, client)
|| client.DocumentClient.GlobalEndpointManager.ReadEndpoints.Count == 1)
{
return await sender(request, cancellationToken);
Expand All @@ -113,7 +130,7 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
using (CloneableStream clonedBody = (CloneableStream)(request.Content == null
? null//new CloneableStream(new MemoryStream())
? null
: await StreamExtension.AsClonableStreamAsync(request.Content)))
{
IReadOnlyCollection<string> hedgeRegions = client.DocumentClient.GlobalEndpointManager
Expand Down Expand Up @@ -143,7 +160,8 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
request,
hedgeRegions.ElementAt(requestNumber),
cancellationToken,
cancellationTokenSource);
cancellationTokenSource,
trace);

requestTasks.Add(primaryRequest);
}
Expand Down Expand Up @@ -262,7 +280,8 @@ private async Task<HedgingResponse> CloneAndSendAsync(
clonedRequest,
region,
cancellationToken,
cancellationTokenSource);
cancellationTokenSource,
trace);
}
}

Expand All @@ -271,7 +290,8 @@ private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
RequestMessage request,
string hedgedRegion,
CancellationToken cancellationToken,
CancellationTokenSource cancellationTokenSource)
CancellationTokenSource cancellationTokenSource,
ITrace trace)
{
try
{
Expand All @@ -288,9 +308,9 @@ private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(

return new HedgingResponse(false, response, hedgedRegion);
}
catch (OperationCanceledException) when (cancellationTokenSource.IsCancellationRequested)
catch (OperationCanceledException oce ) when (cancellationTokenSource.IsCancellationRequested)
{
return new HedgingResponse(false, null, hedgedRegion);
throw new CosmosOperationCanceledException(oce, trace);
}
catch (Exception ex)
{
Expand Down
13 changes: 8 additions & 5 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -546,14 +546,17 @@ public virtual async Task RefreshLocationAsync(bool forceRefresh = false)
/// Determines whether the current configuration and state of the service allow for supporting multiple write locations.
/// This method returns True is the AvailableWriteLocations in LocationCache is more than 1. Otherwise, it returns False.
/// </summary>
/// <param name="request">The document service request for which the write location support is being evaluated.</param>
/// <returns>A boolean flag indicating if the available write locations are more than one.</returns>
public bool CanSupportMultipleWriteLocations(DocumentServiceRequest request)
/// <param name="resourceType"> resource type of the request</param>
/// <param name="operationType"> operation type of the request</param>
/// <returns>A boolean flag indicating if the available write locations are more than one.</returns>
public bool CanSupportMultipleWriteLocations(
ResourceType resourceType,
OperationType operationType)
{
return this.locationCache.CanUseMultipleWriteLocations()
&& this.locationCache.GetAvailableAccountLevelWriteLocations()?.Count > 1
&& (request.ResourceType == ResourceType.Document ||
(request.ResourceType == ResourceType.StoredProcedure && request.OperationType == OperationType.Execute));
&& (resourceType == ResourceType.Document ||
(resourceType == ResourceType.StoredProcedure && operationType == OperationType.Execute));
}

#pragma warning disable VSTHRD100 // Avoid async void methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ internal interface IGlobalEndpointManager : IDisposable

ReadOnlyDictionary<string, Uri> GetAvailableReadEndpointsByLocation();

bool CanSupportMultipleWriteLocations(DocumentServiceRequest request);
bool CanSupportMultipleWriteLocations(ResourceType resourceType, OperationType operationType);
}
}
Loading
Loading