diff --git a/Directory.Build.props b/Directory.Build.props index ab807b268c..9786540064 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -3,7 +3,7 @@ 3.47.0 3.48.0 preview.0 - 3.37.9 + 3.37.10 1.0.0 beta.0 2.0.4 diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs index 0600c7be85..21521a8cd0 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests { using System; + using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -19,6 +20,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Threading; using System.Threading.Tasks; using global::Azure; + using Microsoft.Azure.Cosmos.FaultInjection; using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.Services.Management.Tests.LinqProviderTests; using Microsoft.Azure.Cosmos.Telemetry; @@ -936,7 +938,136 @@ public async Task MultiRegionAccountTest() AccountProperties properties = await cosmosClient.ReadAccountAsync(); Assert.IsNotNull(properties); } - + + [TestMethod] + [Owner("amudumba")] + public async Task CreateItemDuringTimeoutTest() + { + //Prepare + //Enabling aggressive timeout detection that empowers connnection health checker whih marks a channel/connection as "unhealthy" if there are a set of consecutive timeouts. + Environment.SetEnvironmentVariable("AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED", "True"); + Environment.SetEnvironmentVariable("AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS", "1"); + + // Enabling fault injection rule to simulate a timeout scenario. + string timeoutRuleId = "timeoutRule-" + Guid.NewGuid().ToString(); + FaultInjectionRule timeoutRule = new FaultInjectionRuleBuilder( + id: timeoutRuleId, + condition: + new FaultInjectionConditionBuilder() + .WithOperationType(FaultInjectionOperationType.CreateItem) + .Build(), + result: + FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.SendDelay) + .WithDelay(TimeSpan.FromSeconds(100)) + .Build()) + .Build(); + + List rules = new List { timeoutRule }; + FaultInjector faultInjector = new FaultInjector(rules); + + + CosmosClientOptions cosmosClientOptions = new CosmosClientOptions() + { + ConsistencyLevel = Cosmos.ConsistencyLevel.Session, + FaultInjector = faultInjector, + RequestTimeout = TimeSpan.FromSeconds(2) + + }; + + Cosmos.Database db = null; + try + { + CosmosClient cosmosClient = TestCommon.CreateCosmosClient(clientOptions: cosmosClientOptions); + + db = await cosmosClient.CreateDatabaseIfNotExistsAsync("TimeoutFaultTest"); + Container container = await db.CreateContainerIfNotExistsAsync("TimeoutFaultContainer", "/pk"); + + // Act. + // Simulate a aggressive timeout scenario by performing 3 writes which will all timeout due to fault injection rule. + for (int i = 0; i < 3; i++) + { + try + { + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + await container.CreateItemAsync(testItem); + } + catch (CosmosException exx) + { + Assert.AreEqual(HttpStatusCode.RequestTimeout, exx.StatusCode); + } + } + + //Assert that the old channel that is now made unhealthy by the timeouts and a new healthy channel is available for next requests. + + + // Get all the channels that are under TransportClient -> ChannelDictionary -> Channels. + IStoreClientFactory factory = (IStoreClientFactory)cosmosClient.DocumentClient.GetType() + .GetField("storeClientFactory", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(cosmosClient.DocumentClient); + StoreClientFactory storeClientFactory = (StoreClientFactory)factory; + + TransportClient client = (TransportClient)storeClientFactory.GetType() + .GetField("transportClient", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(storeClientFactory); + Documents.Rntbd.TransportClient transportClient = (Documents.Rntbd.TransportClient)client; + + Documents.Rntbd.ChannelDictionary channelDict = (Documents.Rntbd.ChannelDictionary)transportClient.GetType() + .GetField("channelDictionary", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(transportClient); + ConcurrentDictionary allChannels = (ConcurrentDictionary)channelDict.GetType() + .GetField("channels", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(channelDict); + + //Assert that the old channel that is now made unhealthy by the timeouts. + //Get the channel by channelDict -> LoadBalancingChannel -> LoadBalancingPartition -> LbChannelState -> IChannel. + Documents.Rntbd.LoadBalancingChannel loadBalancingUnhealthyChannel = (Documents.Rntbd.LoadBalancingChannel)allChannels[allChannels.Keys.ElementAt(1)]; + Documents.Rntbd.LoadBalancingPartition loadBalancingPartitionUnHealthy = (Documents.Rntbd.LoadBalancingPartition)loadBalancingUnhealthyChannel.GetType() + .GetField("singlePartition", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(loadBalancingUnhealthyChannel); + + Assert.IsNotNull(loadBalancingPartitionUnHealthy); + + List openChannelsUnhealthy = (List)loadBalancingPartitionUnHealthy.GetType() + .GetField("openChannels", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(loadBalancingPartitionUnHealthy); + Assert.AreEqual(1, openChannelsUnhealthy.Count); + + foreach (Documents.Rntbd.LbChannelState channelState in openChannelsUnhealthy) + { + Documents.Rntbd.IChannel channel = (Documents.Rntbd.IChannel)openChannelsUnhealthy[0].GetType() + .GetField("channel", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(channelState); + Assert.IsFalse(channel.Healthy); + } + + //Assert that the new channel which is healthy. Picking the first channel from the allChannels dictionary as the new channel. + Documents.Rntbd.LoadBalancingChannel loadBalancingChannel = (Documents.Rntbd.LoadBalancingChannel)allChannels[allChannels.Keys.First()]; + Documents.Rntbd.LoadBalancingPartition loadBalancingPartition = (Documents.Rntbd.LoadBalancingPartition)loadBalancingChannel.GetType() + .GetField("singlePartition", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(loadBalancingChannel); + + Assert.IsNotNull(loadBalancingPartition); + + List openChannels = (List)loadBalancingPartition.GetType() + .GetField("openChannels", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(loadBalancingPartition); + Assert.AreEqual(1, openChannels.Count); + + foreach (Documents.Rntbd.LbChannelState channelState in openChannels) + { + Documents.Rntbd.IChannel channel = (Documents.Rntbd.IChannel)openChannels[0].GetType() + .GetField("channel", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(channelState); + Assert.IsTrue(channel.Healthy); + } + } + finally + { + Environment.SetEnvironmentVariable("AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED", null); + Environment.SetEnvironmentVariable("AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS", null); + if (db != null) await db.DeleteAsync(); + } + } public static IReadOnlyList GetActiveConnections() { string testPid = Process.GetCurrentProcess().Id.ToString();