Skip to content
32 changes: 32 additions & 0 deletions src/core/Akka.Persistence.Tests/PersistenceHealthCheckSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// -----------------------------------------------------------------------
// <copyright file="PersistenceHealthCheckSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Tests;

public class PersistenceHealthCheckSpec : PersistenceSpec
{
public PersistenceHealthCheckSpec(ITestOutputHelper output) : base(Configuration("PersistenceHealthCheckSpec"), output)
{
}

[Theory]
[InlineData(null)] // default plugin
[InlineData("akka.persistence.journal.inmem")]
public async Task JournalHealthCheck_should_default_to_Healthy(string? pluginId)
{
using var cts = new CancellationTokenSource(RemainingOrDefault);
var pluginHealth = await Extension.CheckJournalHealthAsync(pluginId, cts.Token);

Assert.Equal(PersistenceHealthStatus.Healthy, pluginHealth.Status);
Assert.Empty(pluginHealth.Message);
}
}
80 changes: 58 additions & 22 deletions src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand All @@ -25,6 +24,12 @@ public abstract class AsyncWriteJournal : WriteJournalBase, IAsyncRecovery
{
protected readonly bool CanPublish;
private readonly CircuitBreaker _breaker;

/// <summary>
/// Set to <c>true</c> when a fatal error has occurred, i.e., the Akka.Persistence configuration is illegal
/// </summary>
private bool _hasFatalError;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have fatal errors, such as configuration errors, in the CTOR we should try to capture those.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth it to have an enum to distinguish the types of known errors?

Thinking with a devops hat, a fatal error due to config vs a fatal error due to a condition that forces a journal shutdown/restart (Is that a thing or am I misremembering? 😇) would be useful to track... but maybe I'm forgetting things here (probably?).

As an additional thought... IDK if it's a requirement for all plugins buuuut a simple check like getting the max ordering ID would be nice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could just capture the Exception itself in a nullable field - that's probably easiest.

There's only a handful of "fatal" exceptions - we treat the rest of the exceptions as "eventually recoverable"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an additional thought... IDK if it's a requirement for all plugins buuuut a simple check like getting the max ordering ID would be nice.

that might be a stretch just given some of the limitations across plugins


private readonly ReplayFilterMode _replayFilterMode;
private readonly bool _isReplayFilterEnabled;
private readonly int _replayFilterWindowSize;
Expand All @@ -49,6 +54,7 @@ protected AsyncWriteJournal()
var extension = Persistence.Instance.Apply(Context.System);
if (extension == null)
{
_hasFatalError = true;
throw new ArgumentException("Couldn't initialize AsyncWriteJournal instance, because associated Persistence extension has not been used in current actor system context.");
}

Expand Down Expand Up @@ -76,6 +82,7 @@ protected AsyncWriteJournal()
_replayFilterMode = ReplayFilterMode.Warn;
break;
default:
_hasFatalError = true;
throw new ConfigurationException($"Invalid replay-filter.mode [{replayFilterMode}], supported values [off, repair-by-discard-old, fail, warn]");
}
_isReplayFilterEnabled = _replayFilterMode != ReplayFilterMode.Disabled;
Expand All @@ -86,6 +93,23 @@ protected AsyncWriteJournal()
_resequencer = Context.ActorOf(Props.Create(() => new Resequencer()), "resequencer");
}

/// <summary>
/// Health check for the journal.
/// </summary>
/// <param name="cancellationToken">Cancellation token for the health check invocation.</param>
/// <returns>A <see cref="PersistenceHealthCheckResult"/> with a health status and optional error message.</returns>
public virtual Task<PersistenceHealthCheckResult> CheckHealthAsync(CancellationToken cancellationToken = default)
{
if(_breaker.IsHalfOpen)
return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded,
$"Circuit breaker is half-open, some operations may be failing intermittently with error: {_breaker.LastCaughtException?.Message ?? "N/A"}"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two thoughts looking at this;

First, IMO It would be nice if we had some way to track 'last success' (And maybe even have auto-polling as a config option to track)

Second is a question, could we have a more 'structured' output instead of just a formatted string? It at least makes it easier to handle such events from a parsing standpoint (esp if we are able to add time or possibly other things, need to keep looking at this to know what's up)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second is a question, could we have a more 'structured' output instead of just a formatted string? It at least makes it easier to handle such events from a parsing standpoint (esp if we are able to add time or possibly other things, need to keep looking at this to know what's up)

As long as it can fit into a HealthCheckResult:

public HealthCheckResult(HealthStatus status, string? description = null, Exception? exception = null, IReadOnlyDictionary<string, object>? data = null)
    {
        Status = status;
        Description = description;
        Exception = exception;
        Data = data ?? _emptyReadOnlyDictionary;
    }

Then that should be fine.

if(_breaker.IsOpen)
return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded,
$"Circuit breaker is open, some operations may be failing intermittently with error: with error: {_breaker.LastCaughtException?.Message ?? "N/A"}"));
return Task.FromResult(_hasFatalError ? new PersistenceHealthCheckResult(PersistenceHealthStatus.Unhealthy, "Fatal error has occurred. The ActorSystem must be restarted.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Formatting here makes this a PITA to read, probably best to split the ternary into more lines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how I originally had it (the way you suggested) until Rider nitpicked me. Never doubt your vibe I guess 🤷

: new PersistenceHealthCheckResult(PersistenceHealthStatus.Healthy));
}

/// <inheritdoc/>
public abstract Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback);

Expand Down Expand Up @@ -162,25 +186,25 @@ protected AsyncWriteJournal()
///
/// This call is protected with a circuit-breaker.
/// </summary>
/// <param name="messages">TBD</param>
/// <param name="messages">The set of messages to write.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> used to signal cancelled snapshot operation</param>
protected abstract Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken);

/// <summary>
/// Asynchronously deletes all persistent messages up to inclusive <paramref name="toSequenceNr"/>
/// bound.
/// </summary>
/// <param name="persistenceId">TBD</param>
/// <param name="toSequenceNr">TBD</param>
/// <param name="persistenceId">The id of the entity.</param>
/// <param name="toSequenceNr">The inclusive upper-bound of sequence numbers to delete.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> used to signal cancelled snapshot operation</param>
protected abstract Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken);

/// <summary>
/// Plugin API: Allows plugin implementers to use f.PipeTo(Self)
/// and handle additional messages for implementing advanced features
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>
/// <param name="message">The message to receive</param>
/// <returns><c>true</c> if the message was handled, <c>false</c> otherwise.</returns>
protected virtual bool ReceivePluginInternal(object message)
{
return false;
Expand All @@ -205,6 +229,14 @@ protected bool ReceiveWriteJournal(object message)
case DeleteMessagesTo deleteMessagesTo:
HandleDeleteMessagesTo(deleteMessagesTo);
return true;
case CheckHealth checkHealth:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messaging handler for the health check inside the journal.

var sender = Sender;
CheckHealthAsync(checkHealth.CancellationToken)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual handling of the health check invocation

// PipeTo implementation no longer requires a closure, but better safe than sorry
.PipeTo(sender,
success: result => new HealthCheckResponse(result),
failure: ex => new HealthCheckResponse(new PersistenceHealthCheckResult(PersistenceHealthStatus.Unhealthy, ex.Message)));
return true;
default:
return false;
}
Expand Down Expand Up @@ -256,16 +288,6 @@ private void HandleReplayMessages(ReplayMessages message)

async Task ExecuteHighestSequenceNr()
{
void CompleteHighSeqNo(long highSeqNo)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this below the return

{
replyTo.Tell(new RecoverySuccess(highSeqNo));

if (CanPublish)
{
eventStream.Publish(message);
}
}

try
{
var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), (state, ct) =>
Expand Down Expand Up @@ -306,6 +328,18 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr
{
replyTo.Tell(new ReplayMessagesFailure(TryUnwrapException(ex)));
}

return;

void CompleteHighSeqNo(long highSeqNo)
{
replyTo.Tell(new RecoverySuccess(highSeqNo));

if (CanPublish)
{
eventStream.Publish(message);
}
}
}

// instead of ContinueWith
Expand All @@ -315,10 +349,12 @@ await ReplayMessagesAsync(context, message.PersistenceId, message.FromSequenceNr
}

/// <summary>
/// TBD
/// INTERNAL API.
///
/// used to flatten aggregate exceptions.
/// </summary>
/// <param name="e">TBD</param>
/// <returns>TBD</returns>
/// <param name="e">The input exception.</param>
/// <returns>A possibly flattened exception.</returns>
protected static Exception TryUnwrapException(Exception e)
{
if (e is not AggregateException aggregateException) return e;
Expand Down Expand Up @@ -371,7 +407,7 @@ private async Task ExecuteBatch(WriteMessages message, int atomicWriteCount, IAc
}
}

private void ProcessResults(IImmutableList<Exception> results, int atomicWriteCount, WriteMessages writeMessage, IActorRef resequencer,
private static void ProcessResults(IImmutableList<Exception> results, int atomicWriteCount, WriteMessages writeMessage, IActorRef resequencer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soooo this is safe and I get why it may have happened, I would however suggest double-checking this against Persistence SQL (or another plugin of choice that uses AsyncJournal with some extra logic behind it) just in case the jumps bt methodtables cause an issue.

(To be clear I'm probably overthinking this BUUUUUT It would still be good to know the difference just in case 😇🤷‍♂️)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was always private so it can't have side-effects in other plugins.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow I'm dumb I meant to say the benchmarks.

Main side effect I'm thinking of is code locality.

But, again, probably overthinking it.

long resequencerCounter, IActorRef writeJournal)
{
// there should be no circumstances under which `writeResult` can be `null`
Expand All @@ -385,11 +421,11 @@ private void ProcessResults(IImmutableList<Exception> results, int atomicWriteCo
: new WriteMessageRejected(x, exception, writeMessage.ActorInstanceId), results, resequencerCounter, writeMessage, resequencer, writeJournal);
}

private void Resequence(Func<IPersistentRepresentation, Exception, object> mapper,
private static void Resequence(Func<IPersistentRepresentation, Exception, object> mapper,
IImmutableList<Exception> results, long resequencerCounter, WriteMessages msg, IActorRef resequencer, IActorRef writeJournal)
{
var i = 0;
var enumerator = results?.GetEnumerator();
using var enumerator = results?.GetEnumerator();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a memory leak here.

foreach (var resequencable in msg.Messages)
{
if (resequencable is AtomicWrite aw)
Expand Down
37 changes: 37 additions & 0 deletions src/core/Akka.Persistence/JournalProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using Akka.Actor;
using Akka.Event;

Expand Down Expand Up @@ -180,6 +181,42 @@ public override int GetHashCode()
public override string ToString() => $"DeleteMessagesTo<pid: {PersistenceId}, seqNr: {ToSequenceNr}, persistentActor: {PersistentActor}>";
}

/// <summary>
/// Invokes a health check on the journal plugin.
/// </summary>
public sealed class CheckHealth : IJournalRequest
{
public CheckHealth(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Health check messaging protocol - these messages are all tagged with INoSerializationVerificationNeeded so it's safe to pass the CancellationToken around via them.

}

public CancellationToken CancellationToken { get; }

public override string ToString()
{
return "CheckHealth";
}
}

/// <summary>
/// Health check response from the journal.
/// </summary>
public sealed class HealthCheckResponse : IJournalResponse
{
public HealthCheckResponse(PersistenceHealthCheckResult result)
{
Result = result;
}

public PersistenceHealthCheckResult Result { get; }

public override string ToString()
{
return $"HealthCheckResponse<Status={Result.Status}, Message={Result.Message}>";
}
}

/// <summary>
/// Request to write messages.
/// </summary>
Expand Down
18 changes: 18 additions & 0 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Annotations;
using Akka.Configuration;
Expand Down Expand Up @@ -252,6 +253,23 @@ public IActorRef JournalFor(string journalPluginId)
return PluginHolderFor(configPath, JournalFallbackConfigPath).Ref;
}

/// <summary>
/// Shortcut for invoking journal health checks.
/// </summary>
/// <param name="journalPluginId">The HOCON id of the Akka.Persistence plugin./</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>A <see cref="PersistenceHealthCheckResult"/> with health status and possibly a descriptive message.</returns>
public async Task<PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenience method for invoking the health check - if the journalPluginId is not found this method will err.

CancellationToken cancellationToken = default)
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(Settings.AskTimeout);

var pluginRef = JournalFor(journalPluginId);
var r = await pluginRef.Ask<HealthCheckResponse>(new CheckHealth(timeoutCts.Token), timeoutCts.Token);
return r.Result;
}

/// <summary>
/// Returns a snapshot store plugin actor identified by <paramref name="snapshotPluginId"/>.
/// When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path.
Expand Down
31 changes: 31 additions & 0 deletions src/core/Akka.Persistence/PersistenceHealthStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// -----------------------------------------------------------------------
// <copyright file="PersistentHealthStatus.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

namespace Akka.Persistence;

/// <summary>
/// Used by SnapshotStore and Journal to indicate the health status of the underlying storage.
/// </summary>
public enum PersistenceHealthStatus
{
/// <summary>
/// Akka.Persistence is working as expected.
/// </summary>
Healthy = 0,

/// <summary>
/// Akka.Persistence is experiencing some issues that should be recoverable.
/// </summary>
Degraded = 1,

/// <summary>
/// Akka.Persistence has experienced a fatal error.
/// </summary>
Unhealthy = 2,
}

public readonly record struct PersistenceHealthCheckResult(PersistenceHealthStatus Status, string Message = "");
Loading
Loading