Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
085d7d5
Added ability to accept the AllowOptimisticDirectExecution flag from …
akotalwar Oct 13, 2023
9eda804
Added comment and removed extra spacing
akotalwar Oct 13, 2023
c054c51
Added test coverage
akotalwar Oct 13, 2023
076f345
Added exception handling logic
akotalwar Oct 15, 2023
5f29b91
Resolved comments
akotalwar Oct 20, 2023
2191de4
Added null check for key parameter
akotalwar Oct 23, 2023
61c3893
Removed changes to common test infra
akotalwar Oct 23, 2023
5d24481
Removed all changes from QueryPartitionProviderTestInstance
akotalwar Oct 24, 2023
cc01ca5
Remove changes pt2
akotalwar Oct 24, 2023
a46862f
Removed the dictionary in QueryPartitionProvider and added a bool ins…
akotalwar Oct 24, 2023
6a6e495
Updated GetClientDisableOptimisticDirectExecution()
akotalwar Oct 24, 2023
bdda380
Fixed comments
akotalwar Oct 26, 2023
5f9a2b6
Revert QueryIterator.cs
akotalwar Oct 26, 2023
5e0f682
Undoing changes to settings.json
akotalwar Oct 26, 2023
26dfda8
Undoing changes to QueryIterator.cs
akotalwar Oct 26, 2023
986c3a3
Updated error message
akotalwar Oct 26, 2023
a953803
Made functions static
akotalwar Oct 27, 2023
8d711ff
Cast to bool instead of recasting in GetClientDisableOptimisticDirect…
akotalwar Oct 27, 2023
35d6618
Fix merge conflicts
akotalwar Oct 27, 2023
459a4b9
Merge branch 'master' into users/akotalwar/ODEGatewayConfig
akotalwar Oct 27, 2023
f8cc146
Added ignore flag
akotalwar Dec 7, 2023
0ff09d7
Merge branch 'master' into users/akotalwar/ODEGatewayConfig
akotalwar Dec 7, 2023
ab3f46e
Fixed merge conflicts
akotalwar Dec 7, 2023
a591dff
Updated GetPartitionedQueryExecutionInfoAndPartitionProvider()
akotalwar Dec 7, 2023
4bafe39
Updated return type in OffsetLimitPageSize()
akotalwar Dec 11, 2023
78a1eb7
Merge branch 'master' into users/akotalwar/ODEGatewayConfig
akotalwar Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using global::Azure;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
Expand All @@ -33,6 +32,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext
internal static class CosmosQueryExecutionContextFactory
{
private const string InternalPartitionKeyDefinitionProperty = "x-ms-query-partitionkey-definition";
private const string AllowOptimisticDirectExecution = "allowOptimisticDirectExecution";
private const string OptimisticDirectExecution = "OptimisticDirectExecution";
private const string Passthrough = "Passthrough";
private const string Specialized = "Specialized";
Expand All @@ -41,6 +41,7 @@ internal static class CosmosQueryExecutionContextFactory
public static IQueryPipelineStage Create(
DocumentContainer documentContainer,
CosmosQueryContext cosmosQueryContext,
CosmosClientContext clientContext,
InputParameters inputParameters,
ITrace trace)
{
Expand Down Expand Up @@ -68,6 +69,7 @@ public static IQueryPipelineStage Create(
valueFactory: (trace, innerCancellationToken) => CosmosQueryExecutionContextFactory.TryCreateCoreContextAsync(
documentContainer,
cosmosQueryContext,
clientContext,
inputParameters,
trace,
innerCancellationToken));
Expand All @@ -83,6 +85,7 @@ public static IQueryPipelineStage Create(
private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsync(
DocumentContainer documentContainer,
CosmosQueryContext cosmosQueryContext,
CosmosClientContext clientContext,
InputParameters inputParameters,
ITrace trace,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -142,6 +145,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
inputParameters,
queryPlanFromContinuationToken,
cosmosQueryContext,
clientContext,
containerQueryProperties,
trace);

Expand Down Expand Up @@ -242,6 +246,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
partitionedQueryExecutionInfo,
containerQueryProperties,
cosmosQueryContext,
clientContext,
inputParameters,
createQueryPipelineTrace,
cancellationToken);
Expand All @@ -253,6 +258,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
ContainerQueryProperties containerQueryProperties,
CosmosQueryContext cosmosQueryContext,
CosmosClientContext clientContext,
InputParameters inputParameters,
ITrace trace,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -289,9 +295,10 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
TryCatch<IQueryPipelineStage> tryCreatePipelineStage;

Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync(
inputParameters,
partitionedQueryExecutionInfo,
cosmosQueryContext,
inputParameters,
partitionedQueryExecutionInfo,
cosmosQueryContext,
clientContext,
containerQueryProperties,
trace);

Expand Down Expand Up @@ -758,18 +765,30 @@ private static Documents.PartitionKeyDefinition GetPartitionKeyDefinition(InputP
InputParameters inputParameters,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
CosmosQueryContext cosmosQueryContext,
CosmosClientContext clientContext,
ContainerQueryProperties containerQueryProperties,
ITrace trace)
{
if (!inputParameters.EnableOptimisticDirectExecution)
AccountProperties properties = await clientContext.Client.ReadAccountAsync();
bool allowOdeGatewayFlag = properties.QueryEngineConfiguration.ContainsKey(AllowOptimisticDirectExecution) && Convert.ToBoolean(properties.QueryEngineConfiguration[AllowOptimisticDirectExecution]);

// Use the Ode code path only if both AllowOdeGatewayFlag and EnableOptimisticDirectExecution are true
if (allowOdeGatewayFlag)
{
if (inputParameters.InitialUserContinuationToken != null
&& OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken))
if (!inputParameters.EnableOptimisticDirectExecution)
{
throw new MalformedContinuationTokenException($"The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. " +
$"{inputParameters.InitialUserContinuationToken}");
}
if (inputParameters.InitialUserContinuationToken != null
&& OptimisticDirectExecutionContinuationToken.IsOptimisticDirectExecutionContinuationToken(inputParameters.InitialUserContinuationToken))
{
throw new MalformedContinuationTokenException($"The continuation token supplied requires the Optimistic Direct Execution flag to be enabled in QueryRequestOptions for the query execution to resume. " +
$"{inputParameters.InitialUserContinuationToken}");
}

return null;
}
}
else
{
return null;
}

Expand Down
6 changes: 3 additions & 3 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ public static QueryIterator Create(
forcePassthrough: forcePassthrough,
enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution,
testInjections: queryRequestOptions.TestSettings);

return new QueryIterator(
cosmosQueryContext,
CosmosQueryExecutionContextFactory.Create(documentContainer, cosmosQueryContext, inputParameters, NoOpTrace.Singleton),
cosmosQueryContext,
CosmosQueryExecutionContextFactory.Create(documentContainer, cosmosQueryContext, clientContext, inputParameters, NoOpTrace.Singleton),
queryRequestOptions.CosmosSerializationFormatOptions,
queryRequestOptions,
clientContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public sealed class OptimisticDirectExecutionQueryTests : QueryTestsBase
private const string PartitionKeyField = "key";
private const string NumberField = "numberField";
private const string NullField = "nullField";
private const string AllowOptimisticDirectExecution = "allowOptimisticDirectExecution";

private static class PageSizeOptions
{
Expand Down Expand Up @@ -538,6 +539,19 @@ await this.CreateIngestQueryDeleteAsync(
documents,
(container, documents) => RunFailingTests(container, invalidQueries),
"/" + PartitionKeyField);
}

[TestMethod]
public async Task TestAllowOdeFlagInCosmosClient()
{
string authKey = Utils.ConfigurationManager.AppSettings["MasterKey"];
string endpoint = Utils.ConfigurationManager.AppSettings["GatewayEndpoint"];

CosmosClient client = new CosmosClient($"AccountEndpoint={endpoint};AccountKey={authKey}");
AccountProperties properties = await client.ReadAccountAsync();

Assert.IsTrue(properties.QueryEngineConfigurationString.Contains(AllowOptimisticDirectExecution));
Assert.IsTrue(Convert.ToBoolean(properties.QueryEngineConfiguration[AllowOptimisticDirectExecution]));
}

private static async Task RunTests(IEnumerable<DirectExecutionTestCase> testCases, Container container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Routing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json;

[TestClass]
Expand Down Expand Up @@ -161,6 +162,7 @@ public async Task TestDefaultQueryRequestOptionsSettings()
public async Task TestPipelineForBackendDocumentsOnSinglePartitionAsync()
{
int numItems = 100;
int documentCountInSinglePartition = 0;
OptimisticDirectExecutionTestInput input = CreateInput(
description: @"Single Partition Key and Value Field",
query: "SELECT VALUE COUNT(1) FROM c",
Expand All @@ -170,8 +172,8 @@ public async Task TestPipelineForBackendDocumentsOnSinglePartitionAsync()

QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true);
DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: false);
IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions);
int documentCountInSinglePartition = 0;
CosmosClientContext clientContext = CreateCosmosClientContext(allowOptimisticDirectExecution: true);
IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions, clientContext);

while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton))
{
Expand Down Expand Up @@ -213,9 +215,12 @@ public async Task TestOdeTokenWithSpecializedPipeline()
DocumentContainer documentContainer = await CreateDocumentContainerAsync(numItems, multiPartition: false);
QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: input.ExpectedOptimisticDirectExecution);
(CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions);
CosmosClientContext clientContext = CreateCosmosClientContext(allowOptimisticDirectExecution: true);

IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create(
documentContainer,
cosmosQueryContextCore,
clientContext,
inputParameters,
NoOpTrace.Singleton);

Expand Down Expand Up @@ -436,6 +441,41 @@ public async Task TestPipelineForDistributedQueryAsync()
Assert.AreEqual(1, result);
}

[TestMethod]
public async Task TestAllowOdeFlagLogic()
{
// GetPipelineAndDrainAsyc() contains asserts to confirm that the Ode pipeline only gets picked if allowOptimisticDirectExecution flag is true
int numItems = 100;
OptimisticDirectExecutionTestInput input = CreateInput(
description: @"Single Partition Key and Value Field",
query: "SELECT * FROM c",
expectedOptimisticDirectExecution: true,
partitionKeyPath: @"/pk",
partitionKeyValue: "a");

// Test with AllowODE = true
int result = await this.GetPipelineAndDrainAsync(
input,
numItems: numItems,
isMultiPartition: false,
expectedContinuationTokenCount: 10,
requiresDist: false,
allowOptimisticDirectExecution: true);

Assert.AreEqual(numItems, result);

// Test with AllowODE = false
result = await this.GetPipelineAndDrainAsync(
input,
numItems: numItems,
isMultiPartition: false,
expectedContinuationTokenCount: 10,
requiresDist: false,
allowOptimisticDirectExecution: false);

Assert.AreEqual(numItems, result);
}

// Creates a gone exception after the first MoveNexyAsync() call. This allows for the pipeline to return some documents before failing
private static async Task<bool> ExecuteGoneExceptionOnODEPipeline(bool isMultiPartition)
{
Expand Down Expand Up @@ -520,23 +560,30 @@ private static async Task<bool> TestHandlingOfFailedFallbackPipeline(bool isMult
inject429s: false,
injectEmptyPages: false,
shouldReturnFailure: mergeTest.ShouldReturnFailure));
CosmosClientContext clientContext = CreateCosmosClientContext(allowOptimisticDirectExecution: true);

IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions);
IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions, clientContext);

return (mergeTest, queryPipelineStage);
}

private async Task<int> GetPipelineAndDrainAsync(OptimisticDirectExecutionTestInput input, int numItems, bool isMultiPartition, int expectedContinuationTokenCount, bool requiresDist = false)
private async Task<int> GetPipelineAndDrainAsync(OptimisticDirectExecutionTestInput input, int numItems, bool isMultiPartition, int expectedContinuationTokenCount, bool requiresDist = false, bool allowOptimisticDirectExecution = true)
{
int continuationTokenCount = 0;
List<CosmosElement> documents = new List<CosmosElement>();
QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true);
DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: isMultiPartition, requiresDist: requiresDist);
IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions);
List<CosmosElement> documents = new List<CosmosElement>();
int continuationTokenCount = 0;
CosmosClientContext clientContext = CreateCosmosClientContext(allowOptimisticDirectExecution);
IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions, clientContext);

while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton))
{
if (!requiresDist)
if (!allowOptimisticDirectExecution)
{
Assert.AreNotEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value);
}

if (allowOptimisticDirectExecution && !requiresDist)
{
Assert.AreEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value);
}
Expand All @@ -560,7 +607,7 @@ private async Task<int> GetPipelineAndDrainAsync(OptimisticDirectExecutionTestIn
partitionKeyValue: input.PartitionKeyValue,
continuationToken: tryGetPage.Result.State.Value);

queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions);
queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions, clientContext);
}

continuationTokenCount++;
Expand All @@ -586,20 +633,38 @@ internal static PartitionedQueryExecutionInfo GetPartitionedQueryExecutionInfo(s
return tryGetQueryPlan.Result;
}

private static async Task<IQueryPipelineStage> GetOdePipelineAsync(OptimisticDirectExecutionTestInput input, DocumentContainer documentContainer, QueryRequestOptions queryRequestOptions)
private static async Task<IQueryPipelineStage> GetOdePipelineAsync(OptimisticDirectExecutionTestInput input, DocumentContainer documentContainer, QueryRequestOptions queryRequestOptions, CosmosClientContext clientContext)
{
(CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions);

IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create(
documentContainer,
cosmosQueryContextCore,
clientContext,
inputParameters,
NoOpTrace.Singleton);

Assert.IsNotNull(queryPipelineStage);
return queryPipelineStage;
}

private static CosmosClientContext CreateCosmosClientContext(bool allowOptimisticDirectExecution)
{
string queryEngineConfig = $"{{\"allowOptimisticDirectExecution\":{allowOptimisticDirectExecution.ToString().ToLower()}}}";

AccountProperties cosmosAccountSettings = new AccountProperties
{
QueryEngineConfigurationString = queryEngineConfig,
};

Mock<CosmosClient> mockClient = new Mock<CosmosClient>();
mockClient.Setup(c => c.ReadAccountAsync()).ReturnsAsync(cosmosAccountSettings);

Mock<CosmosClientContext> mockContext = new Mock<CosmosClientContext>();
mockContext.Setup(c => c.Client).Returns(mockClient.Object);

return mockContext.Object;
}

private static async Task<DocumentContainer> CreateDocumentContainerAsync(
int numItems,
bool multiPartition,
Expand Down Expand Up @@ -697,14 +762,13 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect
// gets DocumentContainer
IMonadicDocumentContainer monadicDocumentContainer = new InMemoryContainer(input.PartitionKeyDefinition);
DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer);

QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true);

(CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions);

CosmosClientContext clientContext = CreateCosmosClientContext(allowOptimisticDirectExecution: true);
IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create(
documentContainer,
cosmosQueryContextCore,
clientContext,
inputParameters,
NoOpTrace.Singleton);

Expand Down