Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 37 additions & 68 deletions src/Polly.Core/CircuitBreaker/CircuitBreakerManualControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,53 +30,30 @@ public CircuitBreakerManualControl()

internal IDisposable Initialize(Func<ResilienceContext, Task> onIsolate, Func<ResilienceContext, Task> onReset)
{
bool isolated;
lock (_lock)
{
_onIsolate.Add(onIsolate);
_onReset.Add(onReset);
isolated = _isolated;
}

if (_isolated)
{
var context = ResilienceContextPool.Shared.Get().Initialize<VoidResult>(isSynchronous: true);

// if the control indicates that circuit breaker should be isolated, we isolate it right away
IsolateAsync(context).GetAwaiter().GetResult();
}

return new RegistrationDisposable(() =>
{
lock (_lock)
{
_onIsolate.Remove(onIsolate);
_onReset.Remove(onReset);
}
});
// if the control indicates that circuit breaker should be isolated, we isolate it right away
if (isolated)
{
var context = ResilienceContextPool.Shared.Get().Initialize<VoidResult>(isSynchronous: true);
onIsolate(context).GetAwaiter().GetResult();
}

return new RegistrationDisposable(this, onIsolate, onReset);
}

/// <summary>
/// Isolates (opens) the circuit manually, and holds it in this state until a call to <see cref="CloseAsync(CancellationToken)"/> is made.
/// </summary>
/// <param name="context">The resilience context.</param>
/// <returns>The instance of <see cref="Task"/> that represents the asynchronous execution.</returns>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="context"/> is <see langword="null"/>.</exception>
/// <exception cref="ObjectDisposedException">Thrown when calling this method after this object is disposed.</exception>
internal async Task IsolateAsync(ResilienceContext context)
private void Remove(Func<ResilienceContext, Task> onIsolate, Func<ResilienceContext, Task> onReset)
{
Guard.NotNull(context);

_isolated = true;

Func<ResilienceContext, Task>[] callbacks;

lock (_lock)
{
callbacks = _onIsolate.ToArray();
}

foreach (var action in callbacks)
{
await action(context).ConfigureAwait(context.ContinueOnCapturedContext);
_onIsolate.Remove(onIsolate);
_onReset.Remove(onReset);
}
}

Expand All @@ -88,11 +65,21 @@ internal async Task IsolateAsync(ResilienceContext context)
/// <exception cref="ObjectDisposedException">Thrown when calling this method after this object is disposed.</exception>
public async Task IsolateAsync(CancellationToken cancellationToken = default)
{
Func<ResilienceContext, Task>[] callbacks;
lock (_lock)
{
callbacks = _onIsolate.ToArray();
_isolated = true;
}

var context = ResilienceContextPool.Shared.Get(cancellationToken).Initialize<VoidResult>(isSynchronous: false);

try
{
await IsolateAsync(context).ConfigureAwait(false);
foreach (var action in callbacks)
{
await action(context).ConfigureAwait(context.ContinueOnCapturedContext);
}
}
finally
{
Expand All @@ -103,57 +90,39 @@ public async Task IsolateAsync(CancellationToken cancellationToken = default)
/// <summary>
/// Closes the circuit, and resets any statistics controlling automated circuit-breaking.
/// </summary>
/// <param name="context">The resilience context.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The instance of <see cref="Task"/> that represents the asynchronous execution.</returns>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="context"/> is <see langword="null"/>.</exception>
/// <exception cref="ObjectDisposedException">Thrown when calling this method after this object is disposed.</exception>
internal async Task CloseAsync(ResilienceContext context)
public async Task CloseAsync(CancellationToken cancellationToken = default)
{
Guard.NotNull(context);

_isolated = false;

context.Initialize<VoidResult>(isSynchronous: false);

Func<ResilienceContext, Task>[] callbacks;

lock (_lock)
{
callbacks = _onReset.ToArray();
_isolated = false;
}

foreach (var action in callbacks)
{
await action(context).ConfigureAwait(context.ContinueOnCapturedContext);
}
}

/// <summary>
/// Closes the circuit, and resets any statistics controlling automated circuit-breaking.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The instance of <see cref="Task"/> that represents the asynchronous execution.</returns>
/// <exception cref="ObjectDisposedException">Thrown when calling this method after this object is disposed.</exception>
public async Task CloseAsync(CancellationToken cancellationToken = default)
{
var context = ResilienceContextPool.Shared.Get(cancellationToken);
var context = ResilienceContextPool.Shared.Get(cancellationToken).Initialize<VoidResult>(isSynchronous: false);

try
{
await CloseAsync(context).ConfigureAwait(false);
foreach (var action in callbacks)
{
await action(context).ConfigureAwait(context.ContinueOnCapturedContext);
}
}
finally
{
ResilienceContextPool.Shared.Return(context);
}
}

private sealed class RegistrationDisposable : IDisposable
private sealed class RegistrationDisposable(CircuitBreakerManualControl owner, Func<ResilienceContext, Task> onIsolate, Func<ResilienceContext, Task> onReset) : IDisposable
{
private readonly Action _disposeAction;

public RegistrationDisposable(Action disposeAction) => _disposeAction = disposeAction;
private readonly CircuitBreakerManualControl _owner = owner;
private readonly Func<ResilienceContext, Task> _onIsolate = onIsolate;
private readonly Func<ResilienceContext, Task> _onReset = onReset;

public void Dispose() => _disposeAction();
public void Dispose() => _owner.Remove(_onIsolate, _onReset);
}
}
16 changes: 13 additions & 3 deletions src/Polly.Core/CircuitBreaker/CircuitBreakerResilienceStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public CircuitBreakerResilienceStrategy(

stateProvider?.Initialize(() => _controller.CircuitState);
_manualControlRegistration = manualControl?.Initialize(
async c => await _controller.IsolateCircuitAsync(c).ConfigureAwait(c.ContinueOnCapturedContext),
async c => await _controller.CloseCircuitAsync(c).ConfigureAwait(c.ContinueOnCapturedContext));
_controller.IsolateCircuitAsync,
_controller.CloseCircuitAsync);
}

public void Dispose()
Expand All @@ -34,7 +34,17 @@ protected internal override async ValueTask<Outcome<T>> ExecuteCore<TState>(Func
return outcome;
}

outcome = await StrategyHelper.ExecuteCallbackSafeAsync(callback, context, state).ConfigureAwait(context.ContinueOnCapturedContext);
try
{
context.CancellationToken.ThrowIfCancellationRequested();
outcome = await callback(context, state).ConfigureAwait(context.ContinueOnCapturedContext);
}
#pragma warning disable CA1031
catch (Exception ex)
{
outcome = new(ex);
}
#pragma warning restore CA1031

var args = new CircuitBreakerPredicateArguments<T>(context, outcome);
if (await _handler(args).ConfigureAwait(context.ContinueOnCapturedContext))
Expand Down
Loading
Loading