Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0b9a887
PPAF : Separate retry timeouts for Reads and Query
Praveen-Msft Nov 10, 2025
c1bc018
Merge branch 'master' into fix-split-timeout-by-operation-type
Praveen-Msft Nov 10, 2025
f9ec5f6
Code changes to add integration tests. Fixing cross regional retry.
kundadebdatta Nov 11, 2025
7c4cd25
Adjust read timeouts, update test to do single region failover.
Praveen-Msft Nov 11, 2025
d3c4ac7
Merge branch 'master' into fix-split-timeout-by-operation-type
Praveen-Msft Nov 11, 2025
3437f7a
Add diagnostics to the exception message for pipeline debugging purpose
Praveen-Msft Nov 11, 2025
85f4c5c
Merge branch 'fix-split-timeout-by-operation-type' of https://github.…
Praveen-Msft Nov 11, 2025
f3cb0a1
Add logging for debugging the contacted regions update issue.
Praveen-Msft Nov 12, 2025
fdb7595
Code changes to fix set to string conversation.
kundadebdatta Nov 12, 2025
27fedc8
Code changes to address review comments
kundadebdatta Nov 12, 2025
891ff37
Code changes to fix failing tests
kundadebdatta Nov 12, 2025
87f3e93
Adding more logs.
kundadebdatta Nov 12, 2025
300368a
Update the last two timeout to 6s for reads
Praveen-Msft Nov 12, 2025
98abacd
Code changes to skip parallization for uery test.
kundadebdatta Nov 12, 2025
dc81b40
Code changes to remove assertions.
kundadebdatta Nov 12, 2025
8fb590c
Code changes to add tracking
kundadebdatta Nov 12, 2025
b4912ce
Code changes to ignore other PPCB tests.
kundadebdatta Nov 12, 2025
0f0f39e
Code changes to run similar tests sequentially.
kundadebdatta Nov 12, 2025
4597518
Code changes correctly reset environment variable.
kundadebdatta Nov 12, 2025
c382975
Code changes to add SylQuery as a check.
kundadebdatta Nov 12, 2025
7092b25
Update the PPAF policies for point reads and non point reads
Praveen-Msft Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,16 @@ public static HttpTimeoutPolicy GetTimeoutPolicy(
// Data Plane Reads.
else if (documentServiceRequest.IsReadOnlyRequest)
{
return isPartitionLevelFailoverEnabled
? HttpTimeoutPolicyForPartitionFailover.InstanceShouldThrow503OnTimeout
: HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout;
if (isPartitionLevelFailoverEnabled)
{
return documentServiceRequest.OperationType == OperationType.Read
? HttpTimeoutPolicyForPartitionFailover.InstanceShouldThrow503OnTimeoutForPointReads
: HttpTimeoutPolicyForPartitionFailover.InstanceShouldThrow503OnTimeoutForNonPointReads;
}
else
{
return HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,47 @@ namespace Microsoft.Azure.Cosmos

internal sealed class HttpTimeoutPolicyForPartitionFailover : HttpTimeoutPolicy
{
public static readonly HttpTimeoutPolicy Instance = new HttpTimeoutPolicyForPartitionFailover(false);
public static readonly HttpTimeoutPolicy InstanceShouldThrow503OnTimeout = new HttpTimeoutPolicyForPartitionFailover(true);
public bool shouldThrow503OnTimeout;
public static readonly HttpTimeoutPolicy InstanceShouldThrow503OnTimeoutForNonPointReads = new HttpTimeoutPolicyForPartitionFailover(isPointRead: false);
public static readonly HttpTimeoutPolicy InstanceShouldThrow503OnTimeoutForPointReads = new HttpTimeoutPolicyForPartitionFailover(isPointRead: true);
private readonly bool isPointRead;
private static readonly string Name = nameof(HttpTimeoutPolicyDefault);

private HttpTimeoutPolicyForPartitionFailover(bool shouldThrow503OnTimeout)
private HttpTimeoutPolicyForPartitionFailover(bool isPointRead)
{
this.shouldThrow503OnTimeout = shouldThrow503OnTimeout;
this.isPointRead = isPointRead;
}

private readonly IReadOnlyList<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> TimeoutsAndDelays = new List<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)>()
// Timeouts and delays are based on the following rationale:
// For point reads: 3 attempts with timeouts of 1s, 6s, and 6s respectively.
// For non-point reads: 3 attempts with timeouts of 6s, 6s, and 10s respectively.
private readonly IReadOnlyList<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> TimeoutsAndDelaysForPointReads = new List<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)>()
{
(TimeSpan.FromSeconds(.5), TimeSpan.Zero),
(TimeSpan.FromSeconds(.5), TimeSpan.Zero),
(TimeSpan.FromSeconds(1), TimeSpan.Zero),
(TimeSpan.FromSeconds(6), TimeSpan.Zero),
(TimeSpan.FromSeconds(6), TimeSpan.Zero),
};

private readonly IReadOnlyList<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> TimeoutsAndDelaysForNonPointReads = new List<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)>()
{
(TimeSpan.FromSeconds(6), TimeSpan.Zero),
(TimeSpan.FromSeconds(6), TimeSpan.Zero),
(TimeSpan.FromSeconds(10), TimeSpan.Zero),
};

public override string TimeoutPolicyName => HttpTimeoutPolicyForPartitionFailover.Name;

public override int TotalRetryCount => this.TimeoutsAndDelays.Count;
public override int TotalRetryCount => this.isPointRead ? this.TimeoutsAndDelaysForPointReads.Count : this.TimeoutsAndDelaysForNonPointReads.Count;

public override IEnumerator<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> GetTimeoutEnumerator()
{
return this.TimeoutsAndDelays.GetEnumerator();
return this.isPointRead ? this.TimeoutsAndDelaysForPointReads.GetEnumerator() : this.TimeoutsAndDelaysForNonPointReads.GetEnumerator();
}

public override bool ShouldRetryBasedOnResponse(HttpMethod requestHttpMethod, HttpResponseMessage responseMessage)
{
return false;
}

public override bool ShouldThrow503OnTimeout => this.shouldThrow503OnTimeout;
public override bool ShouldThrow503OnTimeout => true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
Expand All @@ -12,7 +13,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.FaultInjection;
using Microsoft.Azure.Cosmos.FaultInjection;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json.Linq;
using static Microsoft.Azure.Cosmos.Routing.GlobalPartitionEndpointManagerCore;
Expand Down Expand Up @@ -80,7 +81,8 @@ public void TestCleanup()
finally
{
//Do not delete the resources (except MM Write test object), georeplication is slow and we want to reuse the resources
this.client?.Dispose();
this.client?.Dispose();
Environment.SetEnvironmentVariable(ConfigurationManager.StalePartitionUnavailabilityRefreshIntervalInSeconds, null);
}
}

Expand Down Expand Up @@ -467,7 +469,7 @@ await this.container.DeleteItemAsync<CosmosIntegrationTestObject>(
}

[TestMethod]
[TestCategory("MultiRegion")]
[TestCategory("MultiRegion")]
[DataRow(ConnectionMode.Direct, "15", "10", DisplayName = "Direct Mode - Scenario when the total iteration count is 15 and circuit breaker consecutive failure threshold is set to 10.")]
[DataRow(ConnectionMode.Direct, "25", "20", DisplayName = "Direct Mode - Scenario when the total iteration count is 25 and circuit breaker consecutive failure threshold is set to 20.")]
[DataRow(ConnectionMode.Direct, "35", "30", DisplayName = "Direct Mode - Scenario when the total iteration count is 35 and circuit breaker consecutive failure threshold is set to 30.")]
Expand Down Expand Up @@ -602,7 +604,7 @@ public async Task ReadItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccountA
}

[TestMethod]
[TestCategory("MultiRegion")]
[TestCategory("MultiRegion")]
[DataRow(ConnectionMode.Direct, DisplayName ="Direct Mode")]
[DataRow(ConnectionMode.Gateway, DisplayName = "Gateway Mode")]
[Owner("nalutripician")]
Expand Down Expand Up @@ -721,15 +723,14 @@ public async Task ReadItemAsync_WithCircuitBreakerEnabledAndTimeoutCounterOverwr
}
finally
{
Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelCircuitBreakerEnabled, null);
Environment.SetEnvironmentVariable(ConfigurationManager.CircuitBreakerConsecutiveFailureCountForReads, null);

Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelCircuitBreakerEnabled, null);
Environment.SetEnvironmentVariable(ConfigurationManager.CircuitBreakerTimeoutCounterResetWindowInMinutes, null);
await this.TryDeleteItems(itemsList);
}
}

[TestMethod]
[TestCategory("MultiRegion")]
[TestCategory("MultiRegion")]
[Owner("dkunda")]
[Timeout(70000)]
public async Task ReadItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccountAndServiceUnavailableReceivedFromTwoRegions_ShouldApplyPartitionLevelOverrideToThridRegion()
Expand Down Expand Up @@ -893,7 +894,7 @@ public async Task ReadItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccountA
}

[TestMethod]
[TestCategory("MultiRegion")]
[TestCategory("MultiRegion")]
[Owner("dkunda")]
[Timeout(70000)]
public async Task ReadItemAsync_WithNoPreferredRegionsAndCircuitBreakerEnabledAndSingleMasterAccountAndServiceUnavailableReceived_ShouldApplyPartitionLevelOverride()
Expand Down Expand Up @@ -1011,7 +1012,7 @@ public async Task ReadItemAsync_WithNoPreferredRegionsAndCircuitBreakerEnabledAn

[TestMethod]
[Owner("dkunda")]
[TestCategory("MultiRegion")]
[TestCategory("MultiRegion")]
[Timeout(70000)]
public async Task ReadItemAsync_WithCircuitBreakerDisabledAndSingleMasterAccountAndServiceUnavailableReceived_ShouldNotApplyPartitionLevelOverride()
{
Expand Down Expand Up @@ -1095,14 +1096,12 @@ public async Task ReadItemAsync_WithCircuitBreakerDisabledAndSingleMasterAccount
finally
{
Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelCircuitBreakerEnabled, null);
Environment.SetEnvironmentVariable(ConfigurationManager.CircuitBreakerConsecutiveFailureCountForReads, null);

await this.TryDeleteItems(itemsList);
}
}

[TestMethod]
[Owner("dkunda")]
[Owner("dkunda")]
[TestCategory("MultiRegion")]
[Timeout(70000)]
public async Task CreateItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccountAndServiceUnavailableReceived_ShouldNotApplyPartitionLevelOverride()
Expand Down Expand Up @@ -1182,7 +1181,7 @@ public async Task CreateItemAsync_WithCircuitBreakerEnabledAndSingleMasterAccoun

[TestMethod]
[Owner("dkunda")]
[TestCategory("MultiMaster")]
[TestCategory("MultiMaster")]
[DataRow(ConnectionMode.Direct, "15", "10", DisplayName = "Direct Mode - Scenario whtn the total iteration count is 15 and circuit breaker consecutive failure threshold is set to 10.")]
[DataRow(ConnectionMode.Direct, "25", "20", DisplayName = "Direct Mode - Scenario whtn the total iteration count is 25 and circuit breaker consecutive failure threshold is set to 20.")]
[DataRow(ConnectionMode.Direct, "35", "30", DisplayName = "Direct Mode - Scenario whtn the total iteration count is 35 and circuit breaker consecutive failure threshold is set to 30.")]
Expand Down Expand Up @@ -2259,7 +2258,132 @@ public async Task ClinetOverrides0msRequestTimeoutValueForPPAF()
Assert.IsNotNull(strat);
Assert.AreNotEqual(0, strat.Threshold);
}


[TestMethod]
[TestCategory("MultiRegion")]
[Owner("pkolluri")]
[Timeout(70000)]
public async Task QueryItemAsync_WithCircuitBreakerEnabledMultiRegionAndServiceResponseDelay_ShouldFailOverToNextRegionAsync()
{
// Arrange.
Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelCircuitBreakerEnabled, "True");
Environment.SetEnvironmentVariable(ConfigurationManager.CircuitBreakerConsecutiveFailureCountForReads, "1");

// Enabling fault injection rule to simulate a 503 service unavailable scenario.
string serviceResponseDelayRuleId = "response-delay-rule-" + Guid.NewGuid().ToString();
FaultInjectionRule serviceResponseDelayRuleFromRegion1 = new FaultInjectionRuleBuilder(
id: serviceResponseDelayRuleId,
condition:
new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.QueryItem)
.WithConnectionType(FaultInjectionConnectionType.Gateway)
.WithRegion(region1)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
.WithDelay(TimeSpan.FromSeconds(70))
.Build())
.Build();

serviceResponseDelayRuleFromRegion1.Disable();

List<FaultInjectionRule> rules = new List<FaultInjectionRule> { serviceResponseDelayRuleFromRegion1};
FaultInjector faultInjector = new FaultInjector(rules);

List<string> preferredRegions = new List<string> { region1, region2, region3 };
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
{
ConsistencyLevel = ConsistencyLevel.Session,
FaultInjector = faultInjector,
ApplicationPreferredRegions = preferredRegions,
ConnectionMode = ConnectionMode.Gateway,
};

List<CosmosIntegrationTestObject> itemsList = new()
{
new() { Id = "smTestId2", Pk = "smpk1" },
};

try
{
using CosmosClient cosmosClient = new(connectionString: this.connectionString, clientOptions: cosmosClientOptions);
Database database = cosmosClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);

// Act and Assert.
await this.TryCreateItems(itemsList);

//Must Ensure the data is replicated to all regions
await Task.Delay(3000);

bool isRegion1Available = true;
bool isRegion2Available = true;

int thresholdCounter = 0;
int totalIterations = 7;
int ppcbDefaultThreshold = 1;
int firstRegionServiceUnavailableAttempt = 1;

for (int attemptCount = 1; attemptCount <= totalIterations; attemptCount++)
{
try
{
string sqlQueryText = $"SELECT * FROM c WHERE c.id = '{itemsList[0].Id}'";
using FeedIterator<CosmosIntegrationTestObject> feedIterator = container.GetItemQueryIterator<CosmosIntegrationTestObject>(sqlQueryText, requestOptions: new QueryRequestOptions());

while (feedIterator.HasMoreResults)
{
FeedResponse<CosmosIntegrationTestObject> response = await feedIterator.ReadNextAsync();
Assert.AreEqual(System.Net.HttpStatusCode.OK, response.StatusCode);
IReadOnlyList<(string regionName, Uri uri)> contactedRegionMapping = response.Diagnostics.GetContactedRegions();
HashSet<string> contactedRegions = new(contactedRegionMapping.Select(r => r.regionName));

if (isRegion1Available && isRegion2Available)
{
Assert.IsTrue(contactedRegions.Count == 1, "Assert that, when no failure happened, the query request is being served from region 1.");
Assert.IsTrue(contactedRegions.Contains(region1));

// Simulating service unavailable on region 1.
if (attemptCount == firstRegionServiceUnavailableAttempt)
{
isRegion1Available = false;
serviceResponseDelayRuleFromRegion1.Enable();
}
}
else if (isRegion2Available)
{
if (thresholdCounter <= ppcbDefaultThreshold)
{
Assert.IsTrue(contactedRegions.Count == 2, "Asserting that when the query request succeeds before the consecutive failure count reaches the threshold, the partition didn't fail over to the next region, and the request was retried.");
Assert.IsTrue(contactedRegions.Contains(region1) && contactedRegions.Contains(region2), "Asserting that both region 1 and region 2 were contacted.");
thresholdCounter++;
}
else
{
Assert.IsTrue(contactedRegions.Count == 1, "Asserting that when the consecutive failure count reaches the threshold, the partition was failed over to the next region, and the subsequent query request/s were successful on the next region");
}
}
}
}
catch (CosmosException ce)
{
Assert.Fail("Query operation should succeed with successful failover to next region." + ce.Diagnostics.ToString());
}
catch (Exception ex)
{
Assert.Fail($"Unhandled Exception was thrown during Query operation call. Message: {ex.Message}");
}
}
}
finally
{
Environment.SetEnvironmentVariable(ConfigurationManager.PartitionLevelCircuitBreakerEnabled, null);
Environment.SetEnvironmentVariable(ConfigurationManager.CircuitBreakerConsecutiveFailureCountForReads, null);

await this.TryDeleteItems(itemsList);
}
}

private async Task TryCreateItems(List<CosmosIntegrationTestObject> testItems)
{
foreach (CosmosIntegrationTestObject item in testItems)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,53 @@ await TestScenarioAsync(
expectedNumberOfRetrys: 3);
}

[TestMethod]
public void HttpTimeoutPolicyForParitionFailoverForQueries()
{
HttpTimeoutPolicy httpTimeoutPolicyForQuery = HttpTimeoutPolicy.GetTimeoutPolicy(
documentServiceRequest: CosmosHttpClientCoreTests.CreateDocumentServiceRequestByOperation(ResourceType.Document, OperationType.Query),
isPartitionLevelFailoverEnabled: true,
isThinClientEnabled: false);
IEnumerator<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> availableRetries = httpTimeoutPolicyForQuery.GetTimeoutEnumerator();

int count = 0;
while (availableRetries.MoveNext())
{
if (count <= 1)
{
Assert.AreEqual(new TimeSpan(0,0,6), availableRetries.Current.requestTimeout);
}
else if (count == 2)
{
Assert.AreEqual(new TimeSpan(0, 0, 10), availableRetries.Current.requestTimeout);
}
count++;
}
}

[TestMethod]
public void HttpTimeoutPolicyForParitionFailoverForReads()
{
HttpTimeoutPolicy httpTimeoutPolicyForQuery = HttpTimeoutPolicy.GetTimeoutPolicy(
documentServiceRequest: CosmosHttpClientCoreTests.CreateDocumentServiceRequestByOperation(ResourceType.Document, OperationType.Read),
isPartitionLevelFailoverEnabled: true,
isThinClientEnabled: false);
IEnumerator<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> availableRetries = httpTimeoutPolicyForQuery.GetTimeoutEnumerator();

int count = 0;
while (availableRetries.MoveNext())
{
if (count == 0)
{
Assert.AreEqual(new TimeSpan(0, 0, 1), availableRetries.Current.requestTimeout);
}
else if (count == 1 || count ==2 )
{
Assert.AreEqual(new TimeSpan(0, 0, 6), availableRetries.Current.requestTimeout);
}
count++;
}
}

private static DocumentServiceRequest CreateDocumentServiceRequestByOperation(
ResourceType resourceType,
Expand Down
Loading