diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index 4ef7a3b721e4de..74ef7ec9e890b9 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -95,10 +95,27 @@ protected override ValueTask WaitAsyncCore(int permitCount, Canc return new ValueTask(lease); } - // Don't queue if queue limit reached - if (_queueCount + permitCount > _options.QueueLimit) + // Avoid integer overflow by using subtraction instead of addition + Debug.Assert(_options.QueueLimit >= _queueCount); + if (_options.QueueLimit - _queueCount < permitCount) { - return new ValueTask(QueueLimitLease); + if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit) + { + // remove oldest items from queue until there is space for the newest request + do + { + RequestRegistration oldestRequest = _queue.DequeueHead(); + _queueCount -= oldestRequest.Count; + Debug.Assert(_queueCount >= 0); + oldestRequest.Tcs.TrySetResult(FailedLease); + } + while (_options.QueueLimit - _queueCount < permitCount); + } + else + { + // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst + return new ValueTask(QueueLimitLease); + } } TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index bb1ec82f3fff01..6593d895a6810e 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -101,10 +101,27 @@ protected override ValueTask WaitAsyncCore(int tokenCount, Cance return new ValueTask(lease); } - // Don't queue if queue limit reached - if (_queueCount + tokenCount > _options.QueueLimit) + // Avoid integer overflow by using subtraction instead of addition + Debug.Assert(_options.QueueLimit >= _queueCount); + if (_options.QueueLimit - _queueCount < tokenCount) { - return new ValueTask(CreateFailedTokenLease(tokenCount)); + if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && tokenCount <= _options.QueueLimit) + { + // remove oldest items from queue until there is space for the newest acquisition request + do + { + RequestRegistration oldestRequest = _queue.DequeueHead(); + _queueCount -= oldestRequest.Count; + Debug.Assert(_queueCount >= 0); + oldestRequest.Tcs.TrySetResult(FailedLease); + } + while (_options.QueueLimit - _queueCount < tokenCount); + } + else + { + // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst + return new ValueTask(CreateFailedTokenLease(tokenCount)); + } } TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs index 9d98a5101a33bc..a96dc0ba86e632 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs @@ -24,11 +24,23 @@ public abstract class BaseRateLimiterTests public abstract Task CanAcquireResourceAsync_QueuesAndGrabsNewest(); [Fact] - public abstract Task FailsWhenQueuingMoreThanLimit(); + public abstract Task FailsWhenQueuingMoreThanLimit_OldestFirst(); + + [Fact] + public abstract Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst(); + + [Fact] + public abstract Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst(); + + [Fact] + public abstract Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst(); [Fact] public abstract Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable(); + [Fact] + public abstract Task LargeAcquiresAndQueuesDoNotIntegerOverflow(); + [Fact] public abstract void ThrowsWhenAcquiringMoreThanLimit(); diff --git a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs index 22658e07a5c9ca..da041ac938aa27 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs @@ -94,9 +94,9 @@ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() } [Fact] - public override async Task FailsWhenQueuingMoreThanLimit() + public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst() { - var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); using var lease = limiter.Acquire(1); var wait = limiter.WaitAsync(1); @@ -105,11 +105,96 @@ public override async Task FailsWhenQueuingMoreThanLimit() } [Fact] - public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable() + public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst() { var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); var lease = limiter.Acquire(1); var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(1); + var lease1 = await wait; + Assert.False(lease1.IsAcquired); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(1); + Assert.False(wait2.IsCompleted); + + var wait3 = limiter.WaitAsync(2); + var lease1 = await wait; + var lease2 = await wait2; + Assert.False(lease1.IsAcquired); + Assert.False(lease2.IsAcquired); + Assert.False(wait3.IsCompleted); + + lease.Dispose(); + + lease = await wait3; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + // Fill queue + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var lease1 = await limiter.WaitAsync(2); + Assert.False(lease1.IsAcquired); + + lease.Dispose(); + var lease2 = await wait; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue)); + var lease = limiter.Acquire(int.MaxValue); + Assert.True(lease.IsAcquired); + + // Fill queue + var wait = limiter.WaitAsync(3); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(int.MaxValue); + Assert.False(wait2.IsCompleted); + + var lease1 = await wait; + Assert.False(lease1.IsAcquired); + + lease.Dispose(); + var lease2 = await wait2; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); var failedLease = await limiter.WaitAsync(1); Assert.False(failedLease.IsAcquired); diff --git a/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs index edf05bfe15cce9..594bb79be22ebf 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs @@ -111,9 +111,9 @@ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() } [Fact] - public override async Task FailsWhenQueuingMoreThanLimit() + public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst() { - var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, TimeSpan.Zero, 1, autoReplenishment: false)); using var lease = limiter.Acquire(1); var wait = limiter.WaitAsync(1); @@ -125,9 +125,77 @@ public override async Task FailsWhenQueuingMoreThanLimit() } [Fact] - public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable() + public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst() { var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(1); + var lease1 = await wait; + Assert.False(lease1.IsAcquired); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(1); + Assert.False(wait2.IsCompleted); + + var wait3 = limiter.WaitAsync(2); + var lease1 = await wait; + var lease2 = await wait2; + Assert.False(lease1.IsAcquired); + Assert.False(lease2.IsAcquired); + Assert.False(wait3.IsCompleted); + + limiter.TryReplenish(); + limiter.TryReplenish(); + + lease = await wait3; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(2); + Assert.True(lease.IsAcquired); + + // Fill queue + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var lease1 = await limiter.WaitAsync(2); + Assert.False(lease1.IsAcquired); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, TimeSpan.Zero, 1, autoReplenishment: false)); var lease = limiter.Acquire(1); var wait = limiter.WaitAsync(1); @@ -147,6 +215,29 @@ public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAv Assert.True(lease.IsAcquired); } + [Fact] + public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue, + TimeSpan.Zero, int.MaxValue, autoReplenishment: false)); + var lease = limiter.Acquire(int.MaxValue); + Assert.True(lease.IsAcquired); + + // Fill queue + var wait = limiter.WaitAsync(3); + Assert.False(wait.IsCompleted); + + var wait2 = limiter.WaitAsync(int.MaxValue); + Assert.False(wait2.IsCompleted); + + var lease1 = await wait; + Assert.False(lease1.IsAcquired); + + limiter.TryReplenish(); + var lease2 = await wait2; + Assert.True(lease2.IsAcquired); + } + [Fact] public override void ThrowsWhenAcquiringMoreThanLimit() {