Skip to content

Commit ba5e5f8

Browse files
Akka.Persistence HealthChecks (#7842)
* Akka.Persistence: add health check support to `AsyncWriteJournal` #7840 * added messaging protocol to support plugin health check * Added tests for basic Akka.Persistence health checks this is mostly a sanity test. I don't want to get sucked into testing the `CircuitBreaker` necessarily either * added structured output to health check results * fix compilation errors * added failure specs * implemented `SnapshotStore` health checks * renamed test class * SnapshotStoreHealthCheckSpecs * API approvals
1 parent 1ec6f9e commit ba5e5f8

File tree

11 files changed

+609
-31
lines changed

11 files changed

+609
-31
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ namespace Akka.Persistence
113113
public override int GetHashCode() { }
114114
public override string ToString() { }
115115
}
116+
public sealed class CheckJournalHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalRequest, Akka.Persistence.IPersistenceMessage
117+
{
118+
public CheckJournalHealth(System.Threading.CancellationToken cancellationToken) { }
119+
public System.Threading.CancellationToken CancellationToken { get; }
120+
public override string ToString() { }
121+
}
122+
public sealed class CheckSnapshotStoreHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest
123+
{
124+
public CheckSnapshotStoreHealth(System.Threading.CancellationToken cancellationToken) { }
125+
public System.Threading.CancellationToken CancellationToken { get; }
126+
public override string ToString() { }
127+
}
116128
public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable<Akka.Persistence.DeleteMessagesFailure>
117129
{
118130
public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { }
@@ -318,6 +330,12 @@ namespace Akka.Persistence
318330
{
319331
Akka.Persistence.IStashOverflowStrategy Create(Akka.Configuration.Config config);
320332
}
333+
public sealed class JournalHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalResponse, Akka.Persistence.IPersistenceMessage
334+
{
335+
public JournalHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
336+
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
337+
public override string ToString() { }
338+
}
321339
public sealed class LoadSnapshot : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest, System.IEquatable<Akka.Persistence.LoadSnapshot>
322340
{
323341
public LoadSnapshot(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, long toSequenceNr) { }
@@ -377,12 +395,37 @@ namespace Akka.Persistence
377395
public Akka.Persistence.IStashOverflowStrategy DefaultInternalStashOverflowStrategy { get; }
378396
public Akka.Persistence.PersistenceSettings Settings { get; }
379397
public Akka.Persistence.Journal.EventAdapters AdaptersFor(string journalPluginId) { }
398+
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId, System.Threading.CancellationToken cancellationToken = null) { }
399+
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, System.Threading.CancellationToken cancellationToken = null) { }
380400
[Akka.Annotations.InternalStableApiAttribute()]
381401
public Akka.Actor.IActorRef JournalFor(string journalPluginId) { }
382402
public string PersistenceId(Akka.Actor.IActorRef actor) { }
383403
[Akka.Annotations.InternalStableApiAttribute()]
384404
public Akka.Actor.IActorRef SnapshotStoreFor(string snapshotPluginId) { }
385405
}
406+
[System.Runtime.CompilerServices.IsReadOnlyAttribute()]
407+
[System.Runtime.CompilerServices.NullableAttribute(0)]
408+
public struct PersistenceHealthCheckResult : System.IEquatable<Akka.Persistence.PersistenceHealthCheckResult>
409+
{
410+
public PersistenceHealthCheckResult(Akka.Persistence.PersistenceHealthStatus Status, string Description = null, System.Exception Exception = null, [System.Runtime.CompilerServices.NullableAttribute(new byte[] {
411+
2,
412+
0,
413+
0})] System.Collections.Generic.IReadOnlyDictionary<string, object> Data = null) { }
414+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
415+
2,
416+
0,
417+
0})]
418+
public System.Collections.Generic.IReadOnlyDictionary<string, object> Data { get; set; }
419+
public string Description { get; set; }
420+
public System.Exception Exception { get; set; }
421+
public Akka.Persistence.PersistenceHealthStatus Status { get; set; }
422+
}
423+
public enum PersistenceHealthStatus
424+
{
425+
Healthy = 0,
426+
Degraded = 1,
427+
Unhealthy = 2,
428+
}
386429
public sealed class PersistenceSettings : Akka.Actor.Settings
387430
{
388431
public PersistenceSettings(Akka.Actor.ActorSystem system, Akka.Configuration.Config config) { }
@@ -627,6 +670,12 @@ namespace Akka.Persistence
627670
public override int GetHashCode() { }
628671
public override string ToString() { }
629672
}
673+
public sealed class SnapshotStoreHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotResponse
674+
{
675+
public SnapshotStoreHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
676+
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
677+
public override string ToString() { }
678+
}
630679
public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation
631680
{
632681
public StashingHandlerInvocation(object evt, System.Action<object> handler) { }
@@ -857,6 +906,7 @@ namespace Akka.Persistence.Journal
857906
{
858907
protected readonly bool CanPublish;
859908
protected AsyncWriteJournal() { }
909+
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
860910
protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken);
861911
public abstract System.Threading.Tasks.Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken);
862912
protected virtual bool Receive(object message) { }
@@ -1215,6 +1265,7 @@ namespace Akka.Persistence.Snapshot
12151265
public abstract class SnapshotStore : Akka.Actor.ActorBase
12161266
{
12171267
protected SnapshotStore() { }
1268+
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
12181269
protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken);
12191270
protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
12201271
protected abstract System.Threading.Tasks.Task<Akka.Persistence.SelectedSnapshot> LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);

src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ namespace Akka.Persistence
113113
public override int GetHashCode() { }
114114
public override string ToString() { }
115115
}
116+
public sealed class CheckJournalHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalRequest, Akka.Persistence.IPersistenceMessage
117+
{
118+
public CheckJournalHealth(System.Threading.CancellationToken cancellationToken) { }
119+
public System.Threading.CancellationToken CancellationToken { get; }
120+
public override string ToString() { }
121+
}
122+
public sealed class CheckSnapshotStoreHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest
123+
{
124+
public CheckSnapshotStoreHealth(System.Threading.CancellationToken cancellationToken) { }
125+
public System.Threading.CancellationToken CancellationToken { get; }
126+
public override string ToString() { }
127+
}
116128
public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable<Akka.Persistence.DeleteMessagesFailure>
117129
{
118130
public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { }
@@ -318,6 +330,12 @@ namespace Akka.Persistence
318330
{
319331
Akka.Persistence.IStashOverflowStrategy Create(Akka.Configuration.Config config);
320332
}
333+
public sealed class JournalHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalResponse, Akka.Persistence.IPersistenceMessage
334+
{
335+
public JournalHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
336+
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
337+
public override string ToString() { }
338+
}
321339
public sealed class LoadSnapshot : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest, System.IEquatable<Akka.Persistence.LoadSnapshot>
322340
{
323341
public LoadSnapshot(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, long toSequenceNr) { }
@@ -377,12 +395,36 @@ namespace Akka.Persistence
377395
public Akka.Persistence.IStashOverflowStrategy DefaultInternalStashOverflowStrategy { get; }
378396
public Akka.Persistence.PersistenceSettings Settings { get; }
379397
public Akka.Persistence.Journal.EventAdapters AdaptersFor(string journalPluginId) { }
398+
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId, System.Threading.CancellationToken cancellationToken = null) { }
399+
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, System.Threading.CancellationToken cancellationToken = null) { }
380400
[Akka.Annotations.InternalStableApiAttribute()]
381401
public Akka.Actor.IActorRef JournalFor(string journalPluginId) { }
382402
public string PersistenceId(Akka.Actor.IActorRef actor) { }
383403
[Akka.Annotations.InternalStableApiAttribute()]
384404
public Akka.Actor.IActorRef SnapshotStoreFor(string snapshotPluginId) { }
385405
}
406+
[System.Runtime.CompilerServices.NullableAttribute(0)]
407+
public struct PersistenceHealthCheckResult : System.IEquatable<Akka.Persistence.PersistenceHealthCheckResult>
408+
{
409+
public PersistenceHealthCheckResult(Akka.Persistence.PersistenceHealthStatus Status, string Description = null, System.Exception Exception = null, [System.Runtime.CompilerServices.NullableAttribute(new byte[] {
410+
2,
411+
0,
412+
0})] System.Collections.Generic.IReadOnlyDictionary<string, object> Data = null) { }
413+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
414+
2,
415+
0,
416+
0})]
417+
public System.Collections.Generic.IReadOnlyDictionary<string, object> Data { get; set; }
418+
public string Description { get; set; }
419+
public System.Exception Exception { get; set; }
420+
public Akka.Persistence.PersistenceHealthStatus Status { get; set; }
421+
}
422+
public enum PersistenceHealthStatus
423+
{
424+
Healthy = 0,
425+
Degraded = 1,
426+
Unhealthy = 2,
427+
}
386428
public sealed class PersistenceSettings : Akka.Actor.Settings
387429
{
388430
public PersistenceSettings(Akka.Actor.ActorSystem system, Akka.Configuration.Config config) { }
@@ -627,6 +669,12 @@ namespace Akka.Persistence
627669
public override int GetHashCode() { }
628670
public override string ToString() { }
629671
}
672+
public sealed class SnapshotStoreHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotResponse
673+
{
674+
public SnapshotStoreHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
675+
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
676+
public override string ToString() { }
677+
}
630678
public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation
631679
{
632680
public StashingHandlerInvocation(object evt, System.Action<object> handler) { }
@@ -857,6 +905,7 @@ namespace Akka.Persistence.Journal
857905
{
858906
protected readonly bool CanPublish;
859907
protected AsyncWriteJournal() { }
908+
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
860909
protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken);
861910
public abstract System.Threading.Tasks.Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken);
862911
protected virtual bool Receive(object message) { }
@@ -1213,6 +1262,7 @@ namespace Akka.Persistence.Snapshot
12131262
public abstract class SnapshotStore : Akka.Actor.ActorBase
12141263
{
12151264
protected SnapshotStore() { }
1265+
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
12161266
protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken);
12171267
protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
12181268
protected abstract System.Threading.Tasks.Task<Akka.Persistence.SelectedSnapshot> LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="PersistenceHealthCheckSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Collections.Immutable;
11+
using System.Threading;
12+
using System.Threading.Tasks;
13+
using Akka.Configuration;
14+
using Akka.Persistence.Journal;
15+
using Akka.TestKit;
16+
using Akka.TestKit.Configs;
17+
using Xunit;
18+
using Xunit.Abstractions;
19+
20+
namespace Akka.Persistence.Tests;
21+
22+
public class JournalHealthCheckSpec : PersistenceSpec
23+
{
24+
private static Config HealthCheckConfig()
25+
{
26+
const string extraConfig = """
27+
28+
akka.persistence.journal.failing-open {
29+
class = "Akka.Persistence.Tests.FailingJournal, Akka.Persistence.Tests"
30+
circuit-breaker {
31+
max-failures = 1
32+
call-timeout = 1s
33+
reset-timeout = 10s
34+
}
35+
}
36+
akka.persistence.journal.failing-half-open {
37+
class = "Akka.Persistence.Tests.FailingJournal, Akka.Persistence.Tests"
38+
circuit-breaker {
39+
max-failures = 1
40+
call-timeout = 1s
41+
reset-timeout = 1s
42+
}
43+
}
44+
# Disable message serialization for circuit breaker tests to avoid serialization issues
45+
akka.actor.serialize-messages = off
46+
47+
""";
48+
return TestConfigs.TestSchedulerConfig
49+
.WithFallback(Configuration("PersistenceHealthCheckSpec", extraConfig: extraConfig));
50+
}
51+
52+
public JournalHealthCheckSpec(ITestOutputHelper output) : base(HealthCheckConfig(), output)
53+
{
54+
}
55+
56+
[Theory]
57+
[InlineData(null)] // default plugin
58+
[InlineData("akka.persistence.journal.inmem")]
59+
public async Task JournalHealthCheck_should_default_to_Healthy(string? pluginId)
60+
{
61+
using var cts = new CancellationTokenSource(RemainingOrDefault);
62+
var pluginHealth = await Extension.CheckJournalHealthAsync(pluginId, cts.Token);
63+
64+
Assert.Equal(PersistenceHealthStatus.Healthy, pluginHealth.Status);
65+
Assert.NotNull(pluginHealth.Description);
66+
}
67+
68+
[Fact]
69+
public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_is_Open()
70+
{
71+
// Get the journal actor reference
72+
var journal = Extension.JournalFor("akka.persistence.journal.failing-open");
73+
74+
// Trigger a failure to open the circuit breaker
75+
var writeMsg = new WriteMessages(new[] { new AtomicWrite(new Persistent("test", 1, "test-pid")) }.ToImmutableList(),
76+
TestActor, 1);
77+
journal.Tell(writeMsg, TestActor);
78+
79+
// Advance time to let the write fail and circuit breaker open
80+
var testScheduler = (TestScheduler)Sys.Scheduler;
81+
testScheduler.Advance(TimeSpan.FromSeconds(2));
82+
83+
using var cts = new CancellationTokenSource(RemainingOrDefault);
84+
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token);
85+
86+
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
87+
Assert.Contains("Circuit breaker is open", pluginHealth.Description);
88+
}
89+
90+
[Fact]
91+
public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_is_HalfOpen()
92+
{
93+
// Get the journal actor reference
94+
var journal = Extension.JournalFor("akka.persistence.journal.failing-half-open");
95+
96+
// Trigger a failure to open the circuit breaker
97+
var writeMsg = new WriteMessages(new[] { new AtomicWrite(new Persistent("test", 1, "test-pid")) }.ToImmutableList(),
98+
TestActor, 1);
99+
journal.Tell(writeMsg, TestActor);
100+
101+
var testScheduler = (TestScheduler)Sys.Scheduler;
102+
103+
// Advance time past call-timeout to let the write fail and circuit breaker open
104+
testScheduler.Advance(TimeSpan.FromSeconds(1));
105+
106+
// Give the async operations time to complete
107+
await Task.Delay(100);
108+
109+
// Advance time past reset-timeout to transition to half-open
110+
testScheduler.Advance(TimeSpan.FromSeconds(1));
111+
112+
// Give the transition time to complete
113+
await Task.Delay(100);
114+
115+
using var cts = new CancellationTokenSource(RemainingOrDefault);
116+
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-half-open", cts.Token);
117+
118+
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
119+
Assert.Contains("Circuit breaker is half-open", pluginHealth.Description);
120+
}
121+
}
122+
123+
/// <summary>
124+
/// Test journal that always fails writes to trigger circuit breaker
125+
/// </summary>
126+
public class FailingJournal : MemoryJournal
127+
{
128+
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
129+
{
130+
throw new InvalidOperationException("Simulated journal write failure");
131+
}
132+
}

0 commit comments

Comments
 (0)