Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ rethrow
rethrows
retryable
reusability
runnable
runtime
saas
sdk
Expand Down
90 changes: 90 additions & 0 deletions docs/pipelines/resilience-pipeline-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,96 @@ Both `AddReloadToken(...)` and `OnPipelineDisposed(...)` are used to implement t

Resource disposal occurs when the registry is disposed of or when the pipeline undergoes changes due to [dynamic reloads](#dynamic-reloads). Upon disposal, all callbacks registered through the `OnPipelineDisposed` method are invoked. However, actual resource disposal is deferred until the pipeline completes all outgoing executions. It's vital to note that dispose callbacks are associated only with a specific instance of the pipeline.

### Disposal of encapsulated rate limiters

If you are using custom rate limiters and want to dispose them on pipeline reload or when a registry is disposed, then you should use the `OnPipelineDisposed` callback.

Consider the following runnable example. It creates a registry with a concurrency strategy and a chained rate limiter strategy (which contains multiple rate limiters):

<!-- snippet: registry-ratelimiter-dispose -->
```cs
public static class Program
{
public static void Main()
{
using var registryAdapter = new PipelineRegistryAdapter();
registryAdapter.GetOrCreateResiliencePipeline("Pipeline foo", 1, 10, 100, 1000);
registryAdapter.GetOrCreateResiliencePipeline("Pipeline bar", 2, 20, 200, 2000);
}
}

public sealed class PipelineRegistryAdapter : IDisposable
{
private readonly ResiliencePipelineRegistry<string> _resiliencePipelineRegistry = new();
private bool _disposed;

public void Dispose()
{
if (!_disposed)
{
_resiliencePipelineRegistry.Dispose();
_disposed = true;
}
}

private static PartitionedRateLimiter<ResilienceContext> CreateConcurrencyLimiter(string partitionKey, int permitLimit) =>
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
RateLimitPartition.GetConcurrencyLimiter(
partitionKey: partitionKey,
factory: partitionKey => new ConcurrencyLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0 }));

private static PartitionedRateLimiter<ResilienceContext> CreateFixedWindowLimiter(string partitionKey, int permitLimit, TimeSpan window) =>
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
RateLimitPartition.GetFixedWindowLimiter(
partitionKey: partitionKey,
factory: partitionKey => new FixedWindowRateLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0, Window = window }));

public ResiliencePipeline GetOrCreateResiliencePipeline(string partitionKey, int maximumConcurrentThreads, int sendLimitPerSecond, int sendLimitPerHour, int sendLimitPerDay)
{
return _resiliencePipelineRegistry.GetOrAddPipeline(partitionKey, (builder, context) =>
{
PartitionedRateLimiter<ResilienceContext>? threadLimiter = null;
PartitionedRateLimiter<ResilienceContext>? requestLimiter = null;

// outer strategy: limit threads
builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = args =>
{
threadLimiter = CreateConcurrencyLimiter(partitionKey, maximumConcurrentThreads);
return threadLimiter.AcquireAsync(args.Context, permitCount: 1, args.Context.CancellationToken);
}
});

// inner strategy: limit requests (by second, hour, day)
builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = args =>
{
PartitionedRateLimiter<ResilienceContext>[] limiters = [
CreateFixedWindowLimiter(partitionKey, sendLimitPerSecond, TimeSpan.FromSeconds(1)),
CreateFixedWindowLimiter(partitionKey, sendLimitPerHour, TimeSpan.FromHours(1)),
CreateFixedWindowLimiter(partitionKey, sendLimitPerDay, TimeSpan.FromDays(1)),
];
requestLimiter = PartitionedRateLimiter.CreateChained(limiters);
return requestLimiter.AcquireAsync(args.Context, permitCount: 1, args.Context.CancellationToken);
}
});

// unlike other strategies, rate limiters disposed manually
context.OnPipelineDisposed(() =>
{
threadLimiter?.Dispose();
requestLimiter?.Dispose();
});
});
}
}
```
<!-- endSnippet -->

Notice how the rate limiters are disposed manually in the `OnPipelineDisposed` callback.

## Complex registry keys

Though the pipeline registry supports complex keys, we suggest you use them when defining pipelines with the [Dependency Injection](../advanced/dependency-injection.md) (DI) containers. For further information, see the [section on complex pipeline keys](../advanced/dependency-injection.md#complex-pipeline-keys).
82 changes: 82 additions & 0 deletions src/Snippets/Docs/ResiliencePipelineRegistry.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Net.Http;
using System.Threading.RateLimiting;
using Polly.RateLimiting;
using Polly.Registry;
using Polly.Retry;

Expand Down Expand Up @@ -171,5 +173,85 @@ private static void RegisterCancellationSource(CancellationTokenSource cancellat
{
// Register the source
}

#region registry-ratelimiter-dispose
public static class Program
{
public static void Main()
{
using var registryAdapter = new PipelineRegistryAdapter();
registryAdapter.GetOrCreateResiliencePipeline("Pipeline foo", 1, 10, 100, 1000);
registryAdapter.GetOrCreateResiliencePipeline("Pipeline bar", 2, 20, 200, 2000);
}
}

public sealed class PipelineRegistryAdapter : IDisposable
{
private readonly ResiliencePipelineRegistry<string> _resiliencePipelineRegistry = new();
private bool _disposed;

public void Dispose()
{
if (!_disposed)
{
_resiliencePipelineRegistry.Dispose();
_disposed = true;
}
}

private static PartitionedRateLimiter<ResilienceContext> CreateConcurrencyLimiter(string partitionKey, int permitLimit) =>
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
RateLimitPartition.GetConcurrencyLimiter(
partitionKey: partitionKey,
factory: partitionKey => new ConcurrencyLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0 }));

private static PartitionedRateLimiter<ResilienceContext> CreateFixedWindowLimiter(string partitionKey, int permitLimit, TimeSpan window) =>
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
RateLimitPartition.GetFixedWindowLimiter(
partitionKey: partitionKey,
factory: partitionKey => new FixedWindowRateLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0, Window = window }));

public ResiliencePipeline GetOrCreateResiliencePipeline(string partitionKey, int maximumConcurrentThreads, int sendLimitPerSecond, int sendLimitPerHour, int sendLimitPerDay)
{
return _resiliencePipelineRegistry.GetOrAddPipeline(partitionKey, (builder, context) =>
{
PartitionedRateLimiter<ResilienceContext>? threadLimiter = null;
PartitionedRateLimiter<ResilienceContext>? requestLimiter = null;

// outer strategy: limit threads
builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = args =>
{
threadLimiter = CreateConcurrencyLimiter(partitionKey, maximumConcurrentThreads);
return threadLimiter.AcquireAsync(args.Context, permitCount: 1, args.Context.CancellationToken);
}
});

// inner strategy: limit requests (by second, hour, day)
builder.AddRateLimiter(new RateLimiterStrategyOptions
{
RateLimiter = args =>
{
PartitionedRateLimiter<ResilienceContext>[] limiters = [
CreateFixedWindowLimiter(partitionKey, sendLimitPerSecond, TimeSpan.FromSeconds(1)),
CreateFixedWindowLimiter(partitionKey, sendLimitPerHour, TimeSpan.FromHours(1)),
CreateFixedWindowLimiter(partitionKey, sendLimitPerDay, TimeSpan.FromDays(1)),
];
requestLimiter = PartitionedRateLimiter.CreateChained(limiters);
return requestLimiter.AcquireAsync(args.Context, permitCount: 1, args.Context.CancellationToken);
}
});

// unlike other strategies, rate limiters disposed manually
context.OnPipelineDisposed(() =>
{
threadLimiter?.Dispose();
requestLimiter?.Dispose();
});
});
}
}
#endregion
}