diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/BufferedOrderByResults.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/BufferedOrderByResults.cs new file mode 100644 index 0000000000..18512ddc59 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/BufferedOrderByResults.cs @@ -0,0 +1,32 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy +{ + using System; + using System.Collections.Generic; + + internal sealed class BufferedOrderByResults + { + public IEnumerator Enumerator { get; } + + public int Count { get; } + + public double TotalRequestCharge { get; } + + public QueryPageParameters QueryPageParameters { get; } + + public BufferedOrderByResults( + IEnumerator enumerator, + int itemCount, + double totalRequestCharge, + QueryPageParameters queryPageParameters) + { + this.Enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); + this.Count = itemCount; + this.TotalRequestCharge = totalRequestCharge; + this.QueryPageParameters = queryPageParameters ?? throw new ArgumentNullException(nameof(queryPageParameters)); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionEnumerator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionEnumerator.cs index 968b1e1c84..4a919ffb7f 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionEnumerator.cs @@ -2,7 +2,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // ------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Pagination +namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy { using System; using System.Collections; @@ -10,9 +10,9 @@ namespace Microsoft.Azure.Cosmos.Pagination using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Pagination; using Microsoft.Azure.Cosmos.Query.Core.Collections; using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy; using Microsoft.Azure.Cosmos.Tracing; internal sealed class OrderByCrossPartitionEnumerator : IEnumerator @@ -25,21 +25,21 @@ internal sealed class OrderByCrossPartitionEnumerator : IEnumerator this.Current; - public OrderByCrossPartitionEnumerator(PriorityQueue> queue) + private OrderByCrossPartitionEnumerator(PriorityQueue> queue) { this.queue = queue ?? throw new ArgumentNullException(nameof(queue)); } - public static async Task<(IEnumerator orderbyQueryResultEnumerator, double totalRequestCharge)> CreateAsync( - IEnumerable enumerators, + public static async Task CreateAsync( + ITracingAsyncEnumerator> enumerator, IComparer comparer, int levelSize, ITrace trace, CancellationToken cancellationToken) { - if (enumerators == null) + if (enumerator == null) { - throw new ArgumentNullException(nameof(enumerators)); + throw new ArgumentNullException(nameof(enumerator)); } if (comparer == null) @@ -47,47 +47,64 @@ public OrderByCrossPartitionEnumerator(PriorityQueue> queue = new PriorityQueue>(enumeratorComparer); - foreach (ITracingAsyncEnumerator> enumerator in enumerators) + while (await enumerator.MoveNextAsync(trace, cancellationToken)) { - while (await enumerator.MoveNextAsync(trace, cancellationToken)) + TryCatch currentPage = enumerator.Current; + if (currentPage.Failed) { - TryCatch currentPage = enumerator.Current; - if (currentPage.Failed) - { - throw currentPage.Exception; - } + throw currentPage.Exception; + } - totalRequestCharge += currentPage.Result.RequestCharge; - IReadOnlyList page = currentPage.Result.Page.Documents; + if (queryPageParameters == null) + { + queryPageParameters = new QueryPageParameters( + activityId: currentPage.Result.ActivityId, + cosmosQueryExecutionInfo: currentPage.Result.Page.CosmosQueryExecutionInfo, + distributionPlanSpec: currentPage.Result.Page.DistributionPlanSpec, + additionalHeaders: currentPage.Result.AdditionalHeaders); + } - if (page.Count > 0) - { - PageEnumerator pageEnumerator = new PageEnumerator(page); - pageEnumerator.MoveNext(); + totalRequestCharge += currentPage.Result.RequestCharge; + IReadOnlyList page = currentPage.Result.Page.Documents; + bufferedItemCount += page.Count; + + if (page.Count > 0) + { + PageEnumerator pageEnumerator = new PageEnumerator(page); + pageEnumerator.MoveNext(); - queue.Enqueue(pageEnumerator); + queue.Enqueue(pageEnumerator); - if (queue.Count >= levelSize) - { - OrderByCrossPartitionEnumerator newEnumerator = new OrderByCrossPartitionEnumerator(queue); - newEnumerator.MoveNext(); + if (queue.Count >= levelSize) + { + OrderByCrossPartitionEnumerator newEnumerator = new OrderByCrossPartitionEnumerator(queue); + newEnumerator.MoveNext(); - queue = new PriorityQueue>(enumeratorComparer); - queue.Enqueue(newEnumerator); - } + queue = new PriorityQueue>(enumeratorComparer); + queue.Enqueue(newEnumerator); } } } if (queue.Count == 0) { - return (EmptyEnumerator.Instance, totalRequestCharge); + return new BufferedOrderByResults( + EmptyEnumerator.Instance, + itemCount: 0, + totalRequestCharge, + queryPageParameters); } - return (new OrderByCrossPartitionEnumerator(queue), totalRequestCharge); + return new BufferedOrderByResults( + new OrderByCrossPartitionEnumerator(queue), + bufferedItemCount, + totalRequestCharge, + queryPageParameters); } public bool MoveNext() diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs index c5f0883089..89bed7195c 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs @@ -21,7 +21,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy using Microsoft.Azure.Cosmos.Tracing; using ResourceId = Documents.ResourceId; - internal sealed class OrderByCrossPartitionQueryPipelineStage : IQueryPipelineStage + internal static class OrderByCrossPartitionQueryPipelineStage { /// /// Order by queries are rewritten to allow us to inject a filter. @@ -55,8 +55,6 @@ private sealed class InitializationParameters public int MaxConcurrency { get; } - public bool NonStreamingOrderBy { get; } - public InitializationParameters( IDocumentContainer documentContainer, SqlQuerySpec sqlQuerySpec, @@ -64,8 +62,7 @@ public InitializationParameters( PartitionKey? partitionKey, IReadOnlyList orderByColumns, QueryPaginationOptions queryPaginationOptions, - int maxConcurrency, - bool nonStreamingOrderBy) + int maxConcurrency) { this.DocumentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer)); this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec)); @@ -74,107 +71,14 @@ public InitializationParameters( this.OrderByColumns = orderByColumns ?? throw new ArgumentNullException(nameof(orderByColumns)); this.QueryPaginationOptions = queryPaginationOptions ?? throw new ArgumentNullException(nameof(queryPaginationOptions)); this.MaxConcurrency = maxConcurrency; - this.NonStreamingOrderBy = nonStreamingOrderBy; - } - } - - private sealed class QueryPageParameters - { - public string ActivityId { get; } - - public Lazy CosmosQueryExecutionInfo { get; } - - public DistributionPlanSpec DistributionPlanSpec { get; } - - public IReadOnlyDictionary AdditionalHeaders { get; } - - public QueryPageParameters( - string activityId, - Lazy cosmosQueryExecutionInfo, - DistributionPlanSpec distributionPlanSpec, - IReadOnlyDictionary additionalHeaders) - { - this.ActivityId = activityId ?? throw new ArgumentNullException(nameof(activityId)); - this.CosmosQueryExecutionInfo = cosmosQueryExecutionInfo; - this.DistributionPlanSpec = distributionPlanSpec; - this.AdditionalHeaders = additionalHeaders; } } private enum ExecutionState { Uninitialized, - Initialized - } - - private readonly InitializationParameters initializationParameters; - - private ExecutionState state; - - private Queue bufferedPages; - - private TryCatch inner; - - public TryCatch Current => this.GetCurrentPage(); - - private OrderByCrossPartitionQueryPipelineStage(InitializationParameters initializationParameters) - { - this.initializationParameters = initializationParameters ?? throw new ArgumentNullException(nameof(initializationParameters)); - this.state = ExecutionState.Uninitialized; - this.bufferedPages = new Queue(); - } - - private TryCatch GetCurrentPage() - { - if (this.state == ExecutionState.Uninitialized) - { - throw new InvalidOperationException("MoveNextAsync must be called before accessing the Current property."); - } - - if (this.bufferedPages.Count != 0) - { - return TryCatch.FromResult(this.bufferedPages.Peek()); - } - - return this.inner.Try(pipelineStage => pipelineStage.Current); - } - - public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - if (this.state == ExecutionState.Uninitialized) - { - // Note: when we set the state to initialized here, we no longer allowing a retry for these failures - // To allow retries, we must not set the state to initialized until construction of the inner pipeline succeeds - (this.inner, this.bufferedPages) = await MoveNextAsync_InitializeAsync(this.initializationParameters, trace, cancellationToken); - this.state = ExecutionState.Initialized; - - if (this.bufferedPages.Count > 0) - { - return true; - } - } - - if (this.bufferedPages.Count > 0) - { - this.bufferedPages.Dequeue(); - if (this.bufferedPages.Count > 0) - { - return true; - } - } - - TryCatch hasNext = await this.inner.TryAsync(pipelineStage => pipelineStage.MoveNextAsync(trace, cancellationToken)); - return hasNext.Failed || hasNext.Result; - } - - public ValueTask DisposeAsync() - { - if (this.state == ExecutionState.Initialized && this.inner.Succeeded) - { - return this.inner.Result.DisposeAsync(); - } - - return default; + Initialized, + Done } public static TryCatch MonadicCreate( @@ -218,7 +122,7 @@ public static TryCatch MonadicCreate( throw new ArgumentException($"{nameof(orderByColumns)} must not be empty."); } - if (continuationToken != null) + if (continuationToken != null || !nonStreamingOrderBy) { return StreamingOrderByCrossPartitionQueryPipelineStage.MonadicCreate( documentContainer, @@ -231,152 +135,18 @@ public static TryCatch MonadicCreate( continuationToken); } - InitializationParameters init = new InitializationParameters( + SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec( + sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter), + sqlQuerySpec.Parameters); + + return TryCatch.FromResult(NonStreamingOrderByPipelineStage.Create( documentContainer, - sqlQuerySpec, + rewrittenQueryForOrderBy, targetRanges, partitionKey, orderByColumns, queryPaginationOptions, - maxConcurrency, - nonStreamingOrderBy); - - return TryCatch.FromResult(new OrderByCrossPartitionQueryPipelineStage(init)); - } - - private static async ValueTask<(TryCatch, Queue)> MoveNextAsync_InitializeAsync( - InitializationParameters parameters, - ITrace trace, - CancellationToken cancellationToken) - { - SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec( - parameters.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter), - parameters.SqlQuerySpec.Parameters); - - List uninitializedEnumerators = parameters.TargetRanges - .Select(range => OrderByQueryPartitionRangePageAsyncEnumerator.Create( - parameters.DocumentContainer, - rewrittenQueryForOrderBy, - new FeedRangeState(range, state: default), - parameters.PartitionKey, - parameters.QueryPaginationOptions, - TrueFilter, - PrefetchPolicy.PrefetchSinglePage)) - .ToList(); - - Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens = new Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)>( - uninitializedEnumerators - .Select(x => (x, (OrderByContinuationToken)null))); - - await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, parameters.MaxConcurrency, trace, cancellationToken); - - IReadOnlyList sortOrders = parameters.OrderByColumns.Select(column => column.SortOrder).ToList(); - PriorityQueue initializedEnumerators = new PriorityQueue(new OrderByEnumeratorComparer(sortOrders)); - Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens = new Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)>(); - - Queue bufferedPages = new Queue(); - QueryPageParameters queryPageParameters = null; - while (uninitializedEnumeratorsAndTokens.Count != 0) - { - (OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token) = uninitializedEnumeratorsAndTokens.Dequeue(); - if (await enumerator.MoveNextAsync(trace, cancellationToken)) - { - if (enumerator.Current.Failed) - { - if (IsSplitException(enumerator.Current.Exception)) - { - await MoveNextAsync_InitializeAsync_HandleSplitAsync( - parameters.DocumentContainer, - uninitializedEnumeratorsAndTokens, - enumerator, - token, - trace, - cancellationToken); - - continue; - } - else - { - // early return - return (TryCatch.FromException(enumerator.Current.Exception), bufferedPages); - } - } - - QueryPage page = enumerator.Current.Result.Page; - if (queryPageParameters == null) - { - // It is difficult to merge the headers because the type is not strong enough to support merging. - // Moreover, the existing code also does not merge the headers. - // Instead they grab the headers at random from some pages and send them onwards. - queryPageParameters = new QueryPageParameters( - activityId: page.ActivityId, - cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo, - distributionPlanSpec: page.DistributionPlanSpec, - additionalHeaders: page.AdditionalHeaders); - } - - if (enumerator.Current.Result.Enumerator.MoveNext()) - { - // the page is non-empty then we need to enqueue the enumerator in the PriorityQueue - initializedEnumerators.Enqueue(enumerator); - } - else - { - enumeratorsAndTokens.Enqueue((enumerator, token)); - } - - // Ensure proper reporting of query charges - bufferedPages.Enqueue(new QueryPage( - documents: EmptyPage, - requestCharge: page.RequestCharge, - activityId: page.ActivityId, - cosmosQueryExecutionInfo: page.CosmosQueryExecutionInfo, - distributionPlanSpec: page.DistributionPlanSpec, - disallowContinuationTokenMessage: page.DisallowContinuationTokenMessage, - additionalHeaders: page.AdditionalHeaders, - state: InitializingQueryState, - streaming: page.Streaming)); - } - } - - IQueryPipelineStage pipelineStage; - if (parameters.NonStreamingOrderBy) - { - Queue orderbyEnumerators = new Queue(); - foreach ((OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken _) in enumeratorsAndTokens) - { - OrderByQueryPartitionRangePageAsyncEnumerator bufferedEnumerator = enumerator.CloneAsFullyBufferedEnumerator(); - orderbyEnumerators.Enqueue(bufferedEnumerator); - } - - foreach (OrderByQueryPartitionRangePageAsyncEnumerator initializedEnumerator in initializedEnumerators) - { - OrderByQueryPartitionRangePageAsyncEnumerator bufferedEnumerator = initializedEnumerator.CloneAsFullyBufferedEnumerator(); - orderbyEnumerators.Enqueue(bufferedEnumerator); - } - - await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, parameters.MaxConcurrency, trace, cancellationToken); - - pipelineStage = await NonStreamingOrderByPipelineStage.CreateAsync( - parameters.QueryPaginationOptions, - sortOrders, - orderbyEnumerators, - queryPageParameters, - trace, - cancellationToken); - } - else - { - pipelineStage = StreamingOrderByCrossPartitionQueryPipelineStage.Create( - parameters.DocumentContainer, - sortOrders, - initializedEnumerators, - enumeratorsAndTokens, - parameters.QueryPaginationOptions, - parameters.MaxConcurrency); - } - - return (TryCatch.FromResult(pipelineStage), bufferedPages); + maxConcurrency)); } private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync( @@ -461,6 +231,58 @@ private static bool IsSplitException(Exception exception) return exception.IsPartitionSplitException(); } + private static OrderByContinuationToken CreateOrderByContinuationToken( + ParallelContinuationToken parallelToken, + OrderByQueryResult orderByQueryResult, + int skipCount, + string filter) + { + OrderByContinuationToken token; + // If order by items have c* types then it cannot be converted to resume values + if (ContainsSupportedResumeTypes(orderByQueryResult.OrderByItems)) + { + List resumeValues = new List(orderByQueryResult.OrderByItems.Count); + foreach (OrderByItem orderByItem in orderByQueryResult.OrderByItems) + { + resumeValues.Add(SqlQueryResumeValue.FromOrderByValue(orderByItem.Item)); + } + + token = new OrderByContinuationToken( + parallelToken, + orderByItems: null, + resumeValues, + orderByQueryResult.Rid, + skipCount: skipCount, + filter: filter); + } + else + { + token = new OrderByContinuationToken( + parallelToken, + orderByQueryResult.OrderByItems, + resumeValues: null, + orderByQueryResult.Rid, + skipCount: skipCount, + filter: filter); + } + + return token; + } + + // Helper method to check that resume values are of type that is supported by SqlQueryResumeValue + private static bool ContainsSupportedResumeTypes(IReadOnlyList orderByItems) + { + foreach (OrderByItem orderByItem in orderByItems) + { + if (!orderByItem.Item.Accept(SupportedResumeTypeVisitor.Singleton)) + { + return false; + } + } + + return true; + } + /// /// This class is responsible for draining cross partition queries that have order by conditions. /// The way order by queries work is that they are doing a k-way merge of sorted lists from each partition with an added condition. @@ -1530,58 +1352,6 @@ private static (string leftFilter, string targetFilter, string rightFilter) GetF return (left.ToString(), target.ToString(), right.ToString()); } - private static OrderByContinuationToken CreateOrderByContinuationToken( - ParallelContinuationToken parallelToken, - OrderByQueryResult orderByQueryResult, - int skipCount, - string filter) - { - OrderByContinuationToken token; - // If order by items have c* types then it cannot be converted to resume values - if (ContainsSupportedResumeTypes(orderByQueryResult.OrderByItems)) - { - List resumeValues = new List(orderByQueryResult.OrderByItems.Count); - foreach (OrderByItem orderByItem in orderByQueryResult.OrderByItems) - { - resumeValues.Add(SqlQueryResumeValue.FromOrderByValue(orderByItem.Item)); - } - - token = new OrderByContinuationToken( - parallelToken, - orderByItems: null, - resumeValues, - orderByQueryResult.Rid, - skipCount: skipCount, - filter: filter); - } - else - { - token = new OrderByContinuationToken( - parallelToken, - orderByQueryResult.OrderByItems, - resumeValues: null, - orderByQueryResult.Rid, - skipCount: skipCount, - filter: filter); - } - - return token; - } - - // Helper method to check that resume values are of type that is supported by SqlQueryResumeValue - private static bool ContainsSupportedResumeTypes(IReadOnlyList orderByItems) - { - foreach (OrderByItem orderByItem in orderByItems) - { - if (!orderByItem.Item.Accept(SupportedResumeTypeVisitor.Singleton)) - { - return false; - } - } - - return true; - } - private static async Task monadicQueryByPage)>> FilterNextAsync( OrderByQueryPartitionRangePageAsyncEnumerator enumerator, IReadOnlyList sortOrders, @@ -1830,59 +1600,59 @@ public ComparisionWithUndefinedFilters( public string GreaterThan { get; } public string GreaterThanOrEqualTo { get; } } + } - private sealed class SupportedResumeTypeVisitor : ICosmosElementVisitor - { - public static readonly SupportedResumeTypeVisitor Singleton = new SupportedResumeTypeVisitor(); + private sealed class SupportedResumeTypeVisitor : ICosmosElementVisitor + { + public static readonly SupportedResumeTypeVisitor Singleton = new SupportedResumeTypeVisitor(); - private SupportedResumeTypeVisitor() - { - } + private SupportedResumeTypeVisitor() + { + } - public bool Visit(CosmosArray cosmosArray) - { - return true; - } + public bool Visit(CosmosArray cosmosArray) + { + return true; + } - public bool Visit(CosmosBinary cosmosBinary) - { - return false; - } + public bool Visit(CosmosBinary cosmosBinary) + { + return false; + } - public bool Visit(CosmosBoolean cosmosBoolean) - { - return true; - } + public bool Visit(CosmosBoolean cosmosBoolean) + { + return true; + } - public bool Visit(CosmosGuid cosmosGuid) - { - return false; - } + public bool Visit(CosmosGuid cosmosGuid) + { + return false; + } - public bool Visit(CosmosNull cosmosNull) - { - return true; - } + public bool Visit(CosmosNull cosmosNull) + { + return true; + } - public bool Visit(CosmosNumber cosmosNumber) - { - return cosmosNumber.Accept(SqlQueryResumeValue.SupportedResumeNumberTypeVisitor.Singleton); - } + public bool Visit(CosmosNumber cosmosNumber) + { + return cosmosNumber.Accept(SqlQueryResumeValue.SupportedResumeNumberTypeVisitor.Singleton); + } - public bool Visit(CosmosObject cosmosObject) - { - return true; - } + public bool Visit(CosmosObject cosmosObject) + { + return true; + } - public bool Visit(CosmosString cosmosString) - { - return true; - } + public bool Visit(CosmosString cosmosString) + { + return true; + } - public bool Visit(CosmosUndefined cosmosUndefined) - { - return true; - } + public bool Visit(CosmosUndefined cosmosUndefined) + { + return true; } } @@ -1898,122 +1668,247 @@ private sealed class NonStreamingOrderByPipelineStage : IQueryPipelineStage private readonly int pageSize; - private readonly double totalRequestCharge; - - private readonly string activityId; - - private readonly Lazy cosmosQueryExecutionInfo; - - private readonly DistributionPlanSpec distributionPlanSpec; + private readonly InitializationParameters parameters; - private readonly IReadOnlyDictionary additionalHeaders; + private ExecutionState executionState; - private readonly IEnumerator enumerator; - - private int totalBufferedResultCount; - - private bool firstPage; + private BufferedOrderByResults bufferedResults; public TryCatch Current { get; private set; } - private NonStreamingOrderByPipelineStage( - int pageSize, - double totalRequestCharge, - string activityId, - Lazy cosmosQueryExecutionInfo, - DistributionPlanSpec distributionPlanSpec, - IReadOnlyDictionary additionalHeaders, - IEnumerator enumerator, - int totalBufferedResultCount) + private NonStreamingOrderByPipelineStage(InitializationParameters parameters, int pageSize) { + this.parameters = parameters ?? throw new ArgumentNullException(nameof(parameters)); this.pageSize = pageSize; - this.totalRequestCharge = totalRequestCharge; - this.activityId = activityId ?? throw new ArgumentNullException(nameof(activityId)); - this.cosmosQueryExecutionInfo = cosmosQueryExecutionInfo; - this.distributionPlanSpec = distributionPlanSpec; - this.additionalHeaders = additionalHeaders; - this.firstPage = true; - this.enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); - this.totalBufferedResultCount = totalBufferedResultCount; + this.executionState = ExecutionState.Uninitialized; } public ValueTask DisposeAsync() { - this.enumerator.Dispose(); + this.bufferedResults.Enumerator.Dispose(); return default; } - public ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) + public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) { + if (this.executionState == ExecutionState.Done) + { + return false; + } + cancellationToken.ThrowIfCancellationRequested(); - List documents = this.totalBufferedResultCount >= this.pageSize ? new List(this.pageSize) : new List(); - for (int count = 0; count < this.pageSize && this.enumerator.MoveNext(); ++count) + bool firstPage = false; + if (this.executionState == ExecutionState.Uninitialized) { - documents.Add(this.enumerator.Current.Payload); + firstPage = true; + this.bufferedResults = await this.MoveNextAsync_InitializeAsync(trace, cancellationToken); + this.executionState = ExecutionState.Initialized; } - this.totalBufferedResultCount -= documents.Count; + List documents = new List(this.pageSize); + for (int count = 0; count < this.pageSize && this.bufferedResults.Enumerator.MoveNext(); ++count) + { + documents.Add(this.bufferedResults.Enumerator.Current.Payload); + } - if (this.firstPage || documents.Count > 0) + if (firstPage || documents.Count > 0) { - double requestCharge = this.firstPage ? this.totalRequestCharge : 0; + double requestCharge = firstPage ? this.bufferedResults.TotalRequestCharge : 0; QueryPage queryPage = new QueryPage( documents: documents, requestCharge: requestCharge, - activityId: this.activityId, - cosmosQueryExecutionInfo: this.cosmosQueryExecutionInfo, - distributionPlanSpec: this.distributionPlanSpec, + activityId: this.bufferedResults.QueryPageParameters.ActivityId, + cosmosQueryExecutionInfo: this.bufferedResults.QueryPageParameters.CosmosQueryExecutionInfo, + distributionPlanSpec: this.bufferedResults.QueryPageParameters.DistributionPlanSpec, disallowContinuationTokenMessage: DisallowContinuationTokenMessage, - additionalHeaders: this.additionalHeaders, + additionalHeaders: this.bufferedResults.QueryPageParameters.AdditionalHeaders, state: documents.Count > 0 ? NonStreamingOrderByInProgress : null, streaming: false); - this.firstPage = false; this.Current = TryCatch.FromResult(queryPage); - return new ValueTask(true); + return true; } else { - return new ValueTask(false); + this.executionState = ExecutionState.Done; + return false; } } - public static async Task CreateAsync( + private async Task MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken cancellationToken) + { + ITracingAsyncEnumerator> enumerator = await OrderByCrossPartitionRangePageEnumerator.CreateAsync( + this.parameters.DocumentContainer, + this.parameters.SqlQuerySpec, + this.parameters.TargetRanges, + this.parameters.PartitionKey, + this.parameters.QueryPaginationOptions, + this.parameters.MaxConcurrency, + trace, + cancellationToken); + + IReadOnlyList sortOrders = this.parameters.OrderByColumns.Select(column => column.SortOrder).ToList(); + + OrderByQueryResultComparer comparer = new OrderByQueryResultComparer(sortOrders); + BufferedOrderByResults bufferedResults = await OrderByCrossPartitionEnumerator.CreateAsync( + enumerator, + comparer, + FlatHeapSizeLimit, + trace, + cancellationToken); + + return bufferedResults; + } + + public static IQueryPipelineStage Create( + IDocumentContainer documentContainer, + SqlQuerySpec sqlQuerySpec, + IReadOnlyList targetRanges, + Cosmos.PartitionKey? partitionKey, + IReadOnlyList orderByColumns, QueryPaginationOptions queryPaginationOptions, - IReadOnlyList sortOrders, - IEnumerable enumerators, - QueryPageParameters queryPageParameters, - ITrace trace, - CancellationToken cancellationToken) + int maxConcurrency) { int pageSize = queryPaginationOptions.PageSizeLimit.GetValueOrDefault(MaximumPageSize) > 0 ? Math.Min(MaximumPageSize, queryPaginationOptions.PageSizeLimit.Value) : MaximumPageSize; - int totalBufferedResultCount = 0; - foreach (OrderByQueryPartitionRangePageAsyncEnumerator enumerator in enumerators) + InitializationParameters parameters = new InitializationParameters( + documentContainer, + sqlQuerySpec, + targetRanges, + partitionKey, + orderByColumns, + queryPaginationOptions, + maxConcurrency); + + return new NonStreamingOrderByPipelineStage( + parameters, + pageSize); + } + } + + private sealed class OrderByCrossPartitionRangePageEnumerator : ITracingAsyncEnumerator> + { + private readonly IDocumentContainer documentContainer; + + private readonly Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens; + + public TryCatch Current { get; private set; } + + private OrderByCrossPartitionRangePageEnumerator( + IDocumentContainer documentContainer, + Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens) + { + this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer)); + this.enumeratorsAndTokens = enumeratorsAndTokens ?? throw new ArgumentNullException(nameof(enumeratorsAndTokens)); + } + + public static async Task>> CreateAsync( + IDocumentContainer documentContainer, + SqlQuerySpec sqlQuerySpec, + IReadOnlyList targetRanges, + Cosmos.PartitionKey? partitionKey, + QueryPaginationOptions queryPaginationOptions, + int maxConcurrency, + ITrace trace, + CancellationToken cancellationToken) + { + Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens = + new Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)>(targetRanges.Count); + foreach (FeedRangeEpk range in targetRanges) { - totalBufferedResultCount += enumerator.BufferedResultCount; + OrderByQueryPartitionRangePageAsyncEnumerator enumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( + documentContainer, + sqlQuerySpec, + new FeedRangeState(range, state: null), + partitionKey, + queryPaginationOptions, + filter: null, + PrefetchPolicy.PrefetchAll); + + enumeratorsAndTokens.Enqueue(new (enumerator, null)); } - OrderByQueryResultComparer comparer = new OrderByQueryResultComparer(sortOrders); - (IEnumerator orderbyQueryResultEnumerator, double totalRequestCharge) = await OrderByCrossPartitionEnumerator.CreateAsync( - enumerators, - comparer, - FlatHeapSizeLimit, + await ParallelPrefetch.PrefetchInParallelAsync( + enumeratorsAndTokens.Select(x => x.enumerator), + maxConcurrency, trace, cancellationToken); - return new NonStreamingOrderByPipelineStage( - pageSize, - totalRequestCharge, - queryPageParameters.ActivityId, - queryPageParameters.CosmosQueryExecutionInfo, - queryPageParameters.DistributionPlanSpec, - queryPageParameters.AdditionalHeaders, - orderbyQueryResultEnumerator, - totalBufferedResultCount); + return new OrderByCrossPartitionRangePageEnumerator(documentContainer, enumeratorsAndTokens); + } + + public async ValueTask DisposeAsync() + { + foreach ((OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken _) in this.enumeratorsAndTokens) + { + try + { + await enumerator.DisposeAsync(); + } + catch + { + } + } + } + + public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) + { + while (this.enumeratorsAndTokens.Count > 0) + { + cancellationToken.ThrowIfCancellationRequested(); + (OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token) = this.enumeratorsAndTokens.Dequeue(); + if (await enumerator.MoveNextAsync(trace, cancellationToken)) + { + if (enumerator.Current.Succeeded) + { + OrderByContinuationToken continuationToken; + if (enumerator.Current.Result.Page.Documents.Count > 0) + { + // Use the token for the next page, since we fully drained the page. + continuationToken = enumerator.FeedRangeState.State?.Value != null ? + CreateOrderByContinuationToken( + new ParallelContinuationToken( + token: ((CosmosString)enumerator.FeedRangeState.State.Value).Value, + range: ((FeedRangeEpk)enumerator.FeedRangeState.FeedRange).Range), + new OrderByQueryResult(enumerator.Current.Result.Page.Documents[enumerator.Current.Result.Page.Documents.Count - 1]), + skipCount: 0, + filter: enumerator.Filter) : + null; + } + else + { + // Empty page, so we cannot create a new resume value: just use the old one. + continuationToken = token; + } + + this.Current = enumerator.Current; + this.enumeratorsAndTokens.Enqueue((enumerator, continuationToken)); + return true; + } + else + { + if (IsSplitException(enumerator.Current.Exception)) + { + await MoveNextAsync_InitializeAsync_HandleSplitAsync( + this.documentContainer, + this.enumeratorsAndTokens, + enumerator, + token, + trace, + cancellationToken); + } + else + { + throw enumerator.Current.Exception; + } + } + } + } + + return false; } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/QueryPageParameters.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/QueryPageParameters.cs new file mode 100644 index 0000000000..86a0e51b2f --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/QueryPageParameters.cs @@ -0,0 +1,34 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy +{ + using System; + using System.Collections.Generic; + using Microsoft.Azure.Cosmos.Query.Core; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; + + internal sealed class QueryPageParameters + { + public string ActivityId { get; } + + public Lazy CosmosQueryExecutionInfo { get; } + + public DistributionPlanSpec DistributionPlanSpec { get; } + + public IReadOnlyDictionary AdditionalHeaders { get; } + + public QueryPageParameters( + string activityId, + Lazy cosmosQueryExecutionInfo, + DistributionPlanSpec distributionPlanSpec, + IReadOnlyDictionary additionalHeaders) + { + this.ActivityId = activityId ?? throw new ArgumentNullException(nameof(activityId)); + this.CosmosQueryExecutionInfo = cosmosQueryExecutionInfo; + this.DistributionPlanSpec = distributionPlanSpec; + this.AdditionalHeaders = additionalHeaders; + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs index 60d01dc69a..b9486a5ab9 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs @@ -13,23 +13,23 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline using Microsoft.Azure.Cosmos.Query.Core.Pipeline; using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.ReadFeed.Pagination; - using Microsoft.Azure.Cosmos.Tests.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - using Microsoft.Azure.Cosmos; - using Microsoft.VisualStudio.TestTools.UnitTesting; - using System.Collections.Generic; - using System.Diagnostics; - using System.Threading.Tasks; - using System.Threading; - using System; + using Microsoft.Azure.Cosmos.Tests.Pagination; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Cosmos; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using System.Collections.Generic; + using System.Diagnostics; + using System.Threading.Tasks; + using System.Threading; + using System; using System.Linq; using Microsoft.Azure.Cosmos.CosmosElements.Numbers; - [TestClass] - public class NonStreamingOrderByQueryTests - { - private const int MaxConcurrency = 10; - + [TestClass] + public class NonStreamingOrderByQueryTests + { + private const int MaxConcurrency = 10; + private const int DocumentCount = 420; private const int LeafPageCount = 100; @@ -52,8 +52,8 @@ public class NonStreamingOrderByQueryTests private const string Index = "index"; - private const string IndexString = "indexString"; - + private const string IndexString = "indexString"; + private static readonly int[] PageSizes = new [] { 1, 10, 100, DocumentCount }; [TestMethod] @@ -61,14 +61,14 @@ public async Task InMemoryContainerParityTests() { IDocumentContainer documentContainer = await CreateDocumentContainerAsync(DocumentCount); - IReadOnlyList idColumnAsc = new List - { - new OrderByColumn("c.id", SortOrder.Ascending) + IReadOnlyList idColumnAsc = new List + { + new OrderByColumn("c.id", SortOrder.Ascending) }; - IReadOnlyList idColumnDesc = new List - { - new OrderByColumn("c.id", SortOrder.Descending) + IReadOnlyList idColumnDesc = new List + { + new OrderByColumn("c.id", SortOrder.Descending) }; IReadOnlyList testCases = new List @@ -111,11 +111,39 @@ FROM c await RunParityTests( documentContainer, - new NonStreamingDocumentContainer(documentContainer), - await documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, default), + new NonStreamingDocumentContainer(documentContainer, allowSplits: false), testCases); } + [TestMethod] + public async Task SplittingContainerParityTests() + { + IReadOnlyList idColumnAsc = new List + { + new OrderByColumn("c.id", SortOrder.Ascending) + }; + + IReadOnlyList testCases = new List + { + MakeTest( + queryText: @" + SELECT c._rid AS _rid, [{""item"": c.id}] AS orderByItems, c AS payload + FROM c + WHERE {documentdb-formattableorderbyquery-filter} + ORDER BY c.id", + orderByColumns: idColumnAsc, + pageSizes: new int[] { 10 }, + validate: result => Validate.IndexIsInOrder(result, propertyName: "id", DocumentCount, reversed: false)), + }; + + IDocumentContainer documentContainer = await CreateDocumentContainerAsync(DocumentCount); + await RunParityTests( + documentContainer, + new NonStreamingDocumentContainer(documentContainer, allowSplits: true), + testCases, + new TestOptions(validateCharges: false, maxConcurrency: 1)); + } + [TestMethod] public async Task ShufflingContainerParityTests() { @@ -134,9 +162,9 @@ static bool IndexIsInOrder(IReadOnlyList result, bool reversed) FROM c WHERE {documentdb-formattableorderbyquery-filter} ORDER BY c.index", - orderByColumns: new List - { - new OrderByColumn($"c.{Index}", SortOrder.Ascending) + orderByColumns: new List + { + new OrderByColumn($"c.{Index}", SortOrder.Ascending) }, validate: result => IndexIsInOrder(result, reversed: false)), MakeParityTest( @@ -147,9 +175,9 @@ FROM c FROM c WHERE {documentdb-formattableorderbyquery-filter} ORDER BY c.index DESC", - orderByColumns: new List - { - new OrderByColumn($"c.{Index}", SortOrder.Descending) + orderByColumns: new List + { + new OrderByColumn($"c.{Index}", SortOrder.Descending) }, validate: result => IndexIsInOrder(result, reversed: true)), MakeParityTest( @@ -160,10 +188,10 @@ FROM c FROM c WHERE {documentdb-formattableorderbyquery-filter} ORDER BY c.index, c.indexString", - orderByColumns: new List + orderByColumns: new List { - new OrderByColumn($"c.{Index}", SortOrder.Ascending), - new OrderByColumn($"c.{IndexString}", SortOrder.Ascending) + new OrderByColumn($"c.{Index}", SortOrder.Ascending), + new OrderByColumn($"c.{IndexString}", SortOrder.Ascending) }, validate: result => IndexIsInOrder(result, reversed: false)), MakeParityTest( @@ -174,10 +202,10 @@ FROM c FROM c WHERE {documentdb-formattableorderbyquery-filter} ORDER BY c.index DESC, c.indexString DESC", - orderByColumns: new List + orderByColumns: new List { - new OrderByColumn($"c.{Index}", SortOrder.Descending), - new OrderByColumn($"c.{IndexString}", SortOrder.Descending) + new OrderByColumn($"c.{Index}", SortOrder.Descending), + new OrderByColumn($"c.{IndexString}", SortOrder.Descending) }, validate: result => IndexIsInOrder(result, reversed: true)), MakeParityTest( @@ -188,10 +216,10 @@ FROM c FROM c WHERE {documentdb-formattableorderbyquery-filter} ORDER BY c.indexString, c.index", - orderByColumns: new List + orderByColumns: new List { - new OrderByColumn($"c.{IndexString}", SortOrder.Ascending), - new OrderByColumn($"c.{Index}", SortOrder.Ascending), + new OrderByColumn($"c.{IndexString}", SortOrder.Ascending), + new OrderByColumn($"c.{Index}", SortOrder.Ascending), }, validate: result => IndexIsInOrder(result, reversed: false)), MakeParityTest( @@ -202,10 +230,10 @@ FROM c FROM c WHERE {documentdb-formattableorderbyquery-filter} ORDER BY c.indexString DESC, c.index DESC", - orderByColumns: new List + orderByColumns: new List { - new OrderByColumn($"c.{IndexString}", SortOrder.Descending), - new OrderByColumn($"c.{Index}", SortOrder.Descending), + new OrderByColumn($"c.{IndexString}", SortOrder.Descending), + new OrderByColumn($"c.{Index}", SortOrder.Descending), }, validate: result => IndexIsInOrder(result, reversed: true)), }; @@ -213,31 +241,40 @@ FROM c await RunParityTests(testCases); } - private static async Task RunParityTests( + private static Task RunParityTests( IDocumentContainer documentContainer, IDocumentContainer nonStreamingDocumentContainer, - IReadOnlyList ranges, IReadOnlyList testCases) + { + return RunParityTests(documentContainer, nonStreamingDocumentContainer, testCases, TestOptions.Default); + } + + private static async Task RunParityTests( + IDocumentContainer documentContainer, + IDocumentContainer nonStreamingDocumentContainer, + IReadOnlyList testCases, + TestOptions testOptions) { foreach (TestCase testCase in testCases) { foreach (int pageSize in testCase.PageSizes) { - IReadOnlyList nonStreamingResult = await CreateAndRunPipelineStage( - documentContainer: nonStreamingDocumentContainer, - ranges: ranges, + (IReadOnlyList streamingResult, double streamingCharge) = await CreateAndRunPipelineStage( + documentContainer: documentContainer, + ranges: await documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cancellationToken: default), queryText: testCase.QueryText, orderByColumns: testCase.OrderByColumns, pageSize: pageSize, - nonStreamingOrderBy: true); + nonStreamingOrderBy: false); - IReadOnlyList streamingResult = await CreateAndRunPipelineStage( - documentContainer: documentContainer, - ranges: ranges, + (IReadOnlyList nonStreamingResult, double nonStreamingCharge) = await CreateAndRunPipelineStage( + documentContainer: nonStreamingDocumentContainer, + ranges: await nonStreamingDocumentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cancellationToken: default), queryText: testCase.QueryText, orderByColumns: testCase.OrderByColumns, pageSize: pageSize, - nonStreamingOrderBy: false); + nonStreamingOrderBy: true, + maxConcurrency: testOptions.MaxConcurrency); if (!streamingResult.SequenceEqual(nonStreamingResult)) { @@ -248,17 +285,42 @@ private static async Task RunParityTests( { Assert.Fail($"Could not validate result for query:\n{testCase.QueryText}\npageSize: {pageSize}"); } + + if (testOptions.ValidateCharges && (Math.Abs(streamingCharge - nonStreamingCharge) > 0.0001)) + { + Assert.Fail($"Request charge mismatch for query:\n{testCase.QueryText}\npageSize: {pageSize}" + + $"\nStreaming request charge: {streamingCharge} NonStreaming request charge: {nonStreamingCharge}"); + } } } } - private static async Task> CreateAndRunPipelineStage( + private static Task<(IReadOnlyList, double)> CreateAndRunPipelineStage( IDocumentContainer documentContainer, IReadOnlyList ranges, string queryText, IReadOnlyList orderByColumns, int pageSize, bool nonStreamingOrderBy) + { + return CreateAndRunPipelineStage( + documentContainer, + ranges, + queryText, + orderByColumns, + pageSize, + nonStreamingOrderBy, + MaxConcurrency); + } + + private static async Task<(IReadOnlyList, double)> CreateAndRunPipelineStage( + IDocumentContainer documentContainer, + IReadOnlyList ranges, + string queryText, + IReadOnlyList orderByColumns, + int pageSize, + bool nonStreamingOrderBy, + int maxConcurrency) { TryCatch pipelineStage = OrderByCrossPartitionQueryPipelineStage.MonadicCreate( documentContainer: documentContainer, @@ -267,12 +329,13 @@ private static async Task> CreateAndRunPipelineStag partitionKey: null, orderByColumns: orderByColumns, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: pageSize), - maxConcurrency: MaxConcurrency, + maxConcurrency: maxConcurrency, nonStreamingOrderBy: nonStreamingOrderBy, continuationToken: null); Assert.IsTrue(pipelineStage.Succeeded); + double totalRequestCharge = 0; IQueryPipelineStage stage = pipelineStage.Result; List documents = new List(); while (await stage.MoveNextAsync(NoOpTrace.Singleton, default)) @@ -281,9 +344,10 @@ private static async Task> CreateAndRunPipelineStag Assert.IsTrue(stage.Current.Result.Documents.Count <= pageSize); DebugTraceHelpers.TracePipelineStagePage(stage.Current.Result); documents.AddRange(stage.Current.Result.Documents); + totalRequestCharge += stage.Current.Result.RequestCharge; } - return documents; + return (documents, totalRequestCharge); } private static async Task RunParityTests(IReadOnlyList testCases) @@ -300,9 +364,9 @@ private static async Task RunParityTests(IReadOnlyList testCases new FeedRangeEpk(new Documents.Routing.Range("EE", "FF", true, false)), }; - IDocumentContainer nonStreamingDocumentContainer = MockDocumentContainer.Create(ranges, testCase.FeedMode, testCase.DocumentCreationMode); + MockDocumentContainer nonStreamingDocumentContainer = MockDocumentContainer.Create(ranges, testCase.FeedMode, testCase.DocumentCreationMode); - IDocumentContainer streamingDocumentContainer = MockDocumentContainer.Create( + MockDocumentContainer streamingDocumentContainer = MockDocumentContainer.Create( ranges, testCase.FeedMode & PartitionedFeedMode.StreamingReversed, testCase.DocumentCreationMode); @@ -310,7 +374,7 @@ private static async Task RunParityTests(IReadOnlyList testCases foreach (int pageSize in testCase.PageSizes) { DebugTraceHelpers.TraceNonStreamingPipelineStarting(); - IReadOnlyList nonStreamingResult = await CreateAndRunPipelineStage( + (IReadOnlyList nonStreamingResult, double nonStreamingCharge) = await CreateAndRunPipelineStage( documentContainer: nonStreamingDocumentContainer, ranges: ranges, queryText: testCase.QueryText, @@ -319,7 +383,7 @@ private static async Task RunParityTests(IReadOnlyList testCases nonStreamingOrderBy: true); DebugTraceHelpers.TraceStreamingPipelineStarting(); - IReadOnlyList streamingResult = await CreateAndRunPipelineStage( + (IReadOnlyList streamingResult, double streamingCharge) = await CreateAndRunPipelineStage( documentContainer: streamingDocumentContainer, ranges: ranges, queryText: testCase.QueryText, @@ -331,6 +395,18 @@ private static async Task RunParityTests(IReadOnlyList testCases { Assert.Fail($"Results mismatch for query:\n{testCase.QueryText}\npageSize: {pageSize}"); } + + if (Math.Abs(streamingCharge - nonStreamingCharge) > 0.0001) + { + Assert.Fail($"Request charge mismatch for query:\n{testCase.QueryText}\npageSize: {pageSize}" + + $"\nStreaming request charge: {streamingCharge} NonStreaming request charge: {nonStreamingCharge}"); + } + + if (Math.Abs(nonStreamingCharge - nonStreamingDocumentContainer.TotalRequestCharge) > 0.0001) + { + Assert.Fail($"Request charge mismatch for query:\n{testCase.QueryText}\npageSize: {pageSize}" + + $"\nExpected: {nonStreamingDocumentContainer.TotalRequestCharge} Actual NonStreaming request charge: {nonStreamingCharge}"); + } } } } @@ -438,11 +514,16 @@ public static bool IndexIsInOrder(IReadOnlyList documents, string private sealed class NonStreamingDocumentContainer : IDocumentContainer { + private readonly Random random = new Random(); + private readonly IDocumentContainer inner; - public NonStreamingDocumentContainer(IDocumentContainer inner) + private readonly bool allowSplits; + + public NonStreamingDocumentContainer(IDocumentContainer inner, bool allowSplits) { this.inner = inner ?? throw new ArgumentNullException(nameof(inner)); + this.allowSplits = allowSplits; } public Task ChangeFeedAsync( @@ -529,6 +610,8 @@ public async Task> MonadicQueryAsync( ITrace trace, CancellationToken cancellationToken) { + await this.SplitMergeAsync(); + TryCatch queryPage = await this.inner.MonadicQueryAsync(sqlQuerySpec, feedRangeState, queryPaginationOptions, trace, cancellationToken); if (queryPage.Failed) @@ -618,12 +701,42 @@ public Task SplitAsync(FeedRangeInternal feedRange, CancellationToken cancellati { return this.inner.SplitAsync(feedRange, cancellationToken); } + + private async Task SplitMergeAsync() + { + if (!this.allowSplits) + { + return; + } + + if (this.random.Next() % 2 == 0) + { + // Split + await this.inner.RefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default); + List ranges = await this.inner.GetFeedRangesAsync( + trace: NoOpTrace.Singleton, + cancellationToken: default); + FeedRangeInternal randomRangeToSplit = ranges[this.random.Next(0, ranges.Count)]; + await this.inner.SplitAsync(randomRangeToSplit, cancellationToken: default); + + DebugTraceHelpers.TraceSplit(randomRangeToSplit); + } + } } private static class DebugTraceHelpers { private const bool Enabled = false; + [Conditional("DEBUG")] + public static void TraceSplit(FeedRangeInternal feedRange) + { + if (Enabled) + { + System.Diagnostics.Trace.WriteLine($"Split range: {feedRange.ToJsonString()}"); + } + } + [Conditional("DEBUG")] public static void TraceNonStreamingPipelineStarting() { @@ -693,7 +806,9 @@ private class MockDocumentContainer : IDocumentContainer private readonly bool streaming; - public static IDocumentContainer Create(IReadOnlyList feedRanges, PartitionedFeedMode feedMode, DocumentCreationMode documentCreationMode) + public double TotalRequestCharge { get; } + + public static MockDocumentContainer Create(IReadOnlyList feedRanges, PartitionedFeedMode feedMode, DocumentCreationMode documentCreationMode) { IReadOnlyDictionary>> pages = CreatePartitionedFeed( feedRanges, @@ -701,13 +816,18 @@ public static IDocumentContainer Create(IReadOnlyList feedRanges, PageSize, feedMode, (index) => CreateDocument(index, documentCreationMode)); - return new MockDocumentContainer(pages, !feedMode.HasFlag(PartitionedFeedMode.NonStreaming)); + double totalRequestCharge = feedRanges.Count * LeafPageCount * QueryCharge; + return new MockDocumentContainer(pages, !feedMode.HasFlag(PartitionedFeedMode.NonStreaming), totalRequestCharge); } - private MockDocumentContainer(IReadOnlyDictionary>> pages, bool streaming) + private MockDocumentContainer( + IReadOnlyDictionary>> pages, + bool streaming, + double totalRequestCharge) { this.pages = pages ?? throw new ArgumentNullException(nameof(pages)); - this.streaming = streaming; + this.streaming = streaming; + this.TotalRequestCharge = totalRequestCharge; } public Task ChangeFeedAsync(FeedRangeState feedRangeState, ChangeFeedPaginationOptions changeFeedPaginationOptions, ITrace trace, CancellationToken cancellationToken) @@ -1006,5 +1126,22 @@ private static async Task CreateDocumentContainerAsync(int d return documentContainer; } + + private sealed class TestOptions + { + public static readonly TestOptions Default = new TestOptions( + validateCharges: true, + maxConcurrency: NonStreamingOrderByQueryTests.MaxConcurrency); + + public bool ValidateCharges { get; } + + public int MaxConcurrency { get; } + + public TestOptions(bool validateCharges, int maxConcurrency) + { + this.ValidateCharges = validateCharges; + this.MaxConcurrency = maxConcurrency; + } + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs index af77a51996..0f2d94819b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs @@ -5,7 +5,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline { using System; - using System.Collections.Generic; + using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; @@ -77,7 +77,7 @@ public void MonadicCreate_NullContinuationToken() new OrderByColumn("_ts", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, + maxConcurrency: 10, nonStreamingOrderBy: false, continuationToken: null); Assert.IsTrue(monadicCreate.Succeeded); @@ -369,7 +369,7 @@ public void MonadicCreate_OrderByWithResumeValues() new OrderByColumn("item2", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, + maxConcurrency: 10, nonStreamingOrderBy: false, continuationToken: CosmosArray.Create( new List() @@ -425,7 +425,7 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy new OrderByColumn("c._ts", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 1), - maxConcurrency: 0, + maxConcurrency: 0, nonStreamingOrderBy: false, continuationToken: CosmosElement.Parse(continuationToken)); Assert.IsTrue(monadicCreate.Succeeded); @@ -440,7 +440,9 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy } [TestMethod] - public async Task TestDrainFully_StartFromBeginingAsync_NoDocuments() + [DataRow(false, DisplayName = "NonStreaming: false")] + [DataRow(true, DisplayName = "NonStreaming: true")] + public async Task TestDrainFully_StartFromBeginingAsync_NoDocuments(bool nonStreamingOrderBy) { int numItems = 0; IDocumentContainer documentContainer = await CreateDocumentContainerAsync(numItems); @@ -461,8 +463,8 @@ FROM c new OrderByColumn("c._ts", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, - nonStreamingOrderBy: false, + maxConcurrency: 10, + nonStreamingOrderBy: nonStreamingOrderBy, continuationToken: null); Assert.IsTrue(monadicCreate.Succeeded); IQueryPipelineStage queryPipelineStage = monadicCreate.Result; @@ -479,9 +481,8 @@ FROM c QueryPage queryPage = tryGetQueryPage.Result; documents.AddRange(queryPage.Documents); - if (queryPage.RequestCharge > 0) + if (!nonStreamingOrderBy) { - // some empty pages may be emitted Assert.AreEqual(42, queryPage.RequestCharge); } } @@ -511,12 +512,12 @@ FROM c new OrderByColumn("c._ts", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, + maxConcurrency: 10, nonStreamingOrderBy: false, continuationToken: null); Assert.IsTrue(monadicCreate.Succeeded); IQueryPipelineStage queryPipelineStage = monadicCreate.Result; - + int countAdditionalHeadersReceived = 0; while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton, cancellationToken: default)) { @@ -526,32 +527,36 @@ FROM c Assert.Fail(tryGetQueryPage.Exception.ToString()); } - QueryPage queryPage = tryGetQueryPage.Result; - if (queryPage.AdditionalHeaders.Count > 0) - { - ++countAdditionalHeadersReceived; + QueryPage queryPage = tryGetQueryPage.Result; + if (queryPage.AdditionalHeaders.Count > 0) + { + ++countAdditionalHeadersReceived; } - } - - int countFeedRanges = (await documentContainer.GetFeedRangesAsync( - trace: NoOpTrace.Singleton, - cancellationToken: default)) - .Count; + } + + int countFeedRanges = (await documentContainer.GetFeedRangesAsync( + trace: NoOpTrace.Singleton, + cancellationToken: default)) + .Count; Assert.IsTrue(countAdditionalHeadersReceived >= countFeedRanges); - } - + } + [TestMethod] - [DataRow(false, false, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")] - [DataRow(false, false, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")] - [DataRow(false, true, false, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: false")] - [DataRow(false, true, true, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: true")] - [DataRow(true, false, false, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: false")] - [DataRow(true, false, true, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: true")] - [DataRow(true, true, false, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: false")] - [DataRow(true, true, true, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: true")] - public async Task TestDrainWithStateSplitsAndMergeAsync(bool useState, bool allowSplits, bool allowMerges) + [DataRow(false, false, false, false, DisplayName = "NonStreaming: false, Use State: false, Allow Splits: false, Allow Merges: false")] + [DataRow(false, false, false, true, DisplayName = "NonStreaming: false, Use State: false, Allow Splits: false, Allow Merges: true")] + [DataRow(false, false, true, false, DisplayName = "NonStreaming: false, Use State: false, Allow Splits: true, Allow Merges: false")] + [DataRow(false, false, true, true, DisplayName = "NonStreaming: false, Use State: false, Allow Splits: true, Allow Merges: true")] + [DataRow(false, true, false, false, DisplayName = "NonStreaming: false, Use State: true, Allow Splits: false, Allow Merges: false")] + [DataRow(false, true, false, true, DisplayName = "NonStreaming: false, Use State: true, Allow Splits: false, Allow Merges: true")] + [DataRow(false, true, true, false, DisplayName = "NonStreaming: false, Use State: true, Allow Splits: true, Allow Merges: false")] + [DataRow(false, true, true, true, DisplayName = "NonStreaming: false, Use State: true, Allow Splits: true, Allow Merges: true")] + [DataRow(true, false, false, false, DisplayName = "NonStreaming: true, Use State: false, Allow Splits: false, Allow Merges: false")] + [DataRow(true, false, false, true, DisplayName = "NonStreaming: true, Use State: false, Allow Splits: false, Allow Merges: true")] + [DataRow(true, false, true, false, DisplayName = "NonStreaming: true, Use State: false, Allow Splits: true, Allow Merges: false")] + [DataRow(true, false, true, true, DisplayName = "NonStreaming: true, Use State: false, Allow Splits: true, Allow Merges: true")] + public async Task TestDrainWithStateSplitsAndMergeAsync(bool nonStreamingOrderBy, bool useState, bool allowSplits, bool allowMerges) { - static async Task CreatePipelineStateAsync(IDocumentContainer documentContainer, CosmosElement continuationToken) + static async Task CreatePipelineStateAsync(IDocumentContainer documentContainer, CosmosElement continuationToken, bool nonStreamingOrderBy) { TryCatch monadicQueryPipelineStage = OrderByCrossPartitionQueryPipelineStage.MonadicCreate( documentContainer: documentContainer, @@ -569,8 +574,8 @@ FROM c new OrderByColumn("c.pk", SortOrder.Ascending) }, queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - maxConcurrency: 10, - nonStreamingOrderBy: false, + maxConcurrency: 10, + nonStreamingOrderBy: nonStreamingOrderBy, continuationToken: continuationToken); monadicQueryPipelineStage.ThrowIfFailed(); IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result; @@ -578,9 +583,10 @@ FROM c return queryPipelineStage; } + bool verbose = false; int numItems = 1000; IDocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems); - IQueryPipelineStage queryPipelineStage = await CreatePipelineStateAsync(inMemoryCollection, continuationToken: null); + IQueryPipelineStage queryPipelineStage = await CreatePipelineStateAsync(inMemoryCollection, continuationToken: null, nonStreamingOrderBy); List documents = new List(); Random random = new Random(); while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton, cancellationToken: default)) @@ -615,7 +621,7 @@ FROM c break; } - queryPipelineStage = await CreatePipelineStateAsync(inMemoryCollection, queryState.Value); + queryPipelineStage = await CreatePipelineStateAsync(inMemoryCollection, queryState.Value, nonStreamingOrderBy); } if (random.Next() % 2 == 0) @@ -629,6 +635,11 @@ FROM c cancellationToken: default); FeedRangeInternal randomRangeToSplit = ranges[random.Next(0, ranges.Count)]; await inMemoryCollection.SplitAsync(randomRangeToSplit, cancellationToken: default); + + if (verbose) + { + System.Diagnostics.Trace.WriteLine($"Split range: {randomRangeToSplit.ToJsonString()}"); + } } if (allowMerges && (random.Next() % 2 == 0)) @@ -645,6 +656,12 @@ FROM c int adjacentIndex = indexToMerge == (ranges.Count - 1) ? indexToMerge - 1 : indexToMerge + 1; await inMemoryCollection.MergeAsync(ranges[indexToMerge], ranges[adjacentIndex], cancellationToken: default); } + + if (verbose) + { + string mergedRanges = string.Join(", ", ranges.Select(range => range.ToJsonString())); + System.Diagnostics.Trace.WriteLine($"Merged ranges: {mergedRanges}"); + } } } }