Skip to content

Commit 68e6c1c

Browse files
committed
Multi-Stage Tracked Sessions Closes GH-1809
using latest Marten & JasperFx.Events Added working code for the 2 stage testing w/ Marten!!!! Added necessary registrations to the TrackedSessionConfiguration
1 parent df4518c commit 68e6c1c

23 files changed

+1343
-62
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
namespace MartenTests.TestHelpers;
2+
3+
public record AppendLetters(Guid Id, string[] Events);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
namespace MartenTests.TestHelpers;
2+
3+
public record AppendLetters2(Guid Id, string[] Events);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using Wolverine;
2+
using Wolverine.Marten;
3+
4+
namespace MartenTests.TestHelpers;
5+
6+
public static class AppendLetters2Handler
7+
{
8+
[MartenStore(typeof(ILetterStore))]
9+
public static (Events, OutgoingMessages) Handle(
10+
AppendLetters2 command,
11+
12+
[WriteAggregate(Required = false)]
13+
LetterCounts aggregate)
14+
{
15+
switch (command.Events.Length)
16+
{
17+
case 0:
18+
return ([], []);
19+
20+
case 1:
21+
return (new Events(command.Events[0].ToLetterEvents()), []);
22+
23+
default:
24+
return (new Events(command.Events[0].ToLetterEvents()), [new AppendLetters2(command.Id, command.Events.Skip(1).ToArray())]);
25+
}
26+
}
27+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using Wolverine;
2+
using Wolverine.Marten;
3+
4+
namespace MartenTests.TestHelpers;
5+
6+
public static class AppendLettersHandler
7+
{
8+
public static (Events, OutgoingMessages) Handle(
9+
AppendLetters command,
10+
11+
[WriteAggregate(Required = false)]
12+
LetterCounts aggregate)
13+
{
14+
switch (command.Events.Length)
15+
{
16+
case 0:
17+
return ([], []);
18+
19+
case 1:
20+
return (new Events(command.Events[0].ToLetterEvents()), []);
21+
22+
default:
23+
return (new Events(command.Events[0].ToLetterEvents()), [new AppendLetters(command.Id, command.Events.Skip(1).ToArray())]);
24+
}
25+
}
26+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
namespace MartenTests.TestHelpers;
2+
3+
public record EEvent;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using Marten;
2+
3+
namespace MartenTests.TestHelpers;
4+
5+
public interface ILetterStore : IDocumentStore
6+
{
7+
8+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using JasperFx;
2+
3+
namespace MartenTests.TestHelpers;
4+
5+
public class LetterCounts: IRevisioned
6+
{
7+
public Guid Id { get; set; }
8+
public int ACount { get; set; }
9+
public int BCount { get; set; }
10+
public int CCount { get; set; }
11+
public int DCount { get; set; }
12+
public int ECount { get; set; }
13+
public int Version { get; set; }
14+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using JasperFx.Events;
2+
using Marten.Events.Aggregation;
3+
using MartenTests.AggregateHandlerWorkflow;
4+
5+
namespace MartenTests.TestHelpers;
6+
7+
public class LetterCountsProjection: SingleStreamProjection<LetterCounts, Guid>
8+
{
9+
public override LetterCounts Evolve(LetterCounts snapshot, Guid id, IEvent e)
10+
{
11+
12+
switch (e.Data)
13+
{
14+
case AEvent _:
15+
snapshot ??= new() { Id = id };
16+
snapshot.ACount++;
17+
break;
18+
19+
case BEvent _:
20+
snapshot ??= new() { Id = id };
21+
snapshot.BCount++;
22+
break;
23+
24+
case CEvent _:
25+
snapshot ??= new() { Id = id };
26+
snapshot.CCount++;
27+
break;
28+
29+
case DEvent _:
30+
snapshot ??= new() { Id = id };
31+
snapshot.DCount++;
32+
break;
33+
34+
case EEvent _:
35+
snapshot ??= new() { Id = id };
36+
snapshot.ECount++;
37+
break;
38+
}
39+
40+
return snapshot;
41+
}
42+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using MartenTests.AggregateHandlerWorkflow;
2+
3+
namespace MartenTests.TestHelpers;
4+
5+
/// <summary>
6+
/// Basically an ObjectMother for the A/B/C/D/Event types
7+
/// </summary>
8+
public static class LetterEvents
9+
{
10+
public static IEnumerable<object> ToLetterEvents(this string text)
11+
{
12+
foreach (var character in text.ToLowerInvariant())
13+
{
14+
switch (character)
15+
{
16+
case 'a':
17+
yield return new AEvent();
18+
break;
19+
20+
case 'b':
21+
yield return new BEvent();
22+
break;
23+
24+
case 'c':
25+
yield return new CEvent();
26+
break;
27+
28+
case 'd':
29+
yield return new DEvent();
30+
break;
31+
32+
case 'e':
33+
yield return new EEvent();
34+
break;
35+
}
36+
}
37+
}
38+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
using IntegrationTests;
2+
using JasperFx.Core;
3+
using JasperFx.Events.Daemon;
4+
using JasperFx.Events.Projections;
5+
using Marten;
6+
using Marten.Events;
7+
using Microsoft.Extensions.Hosting;
8+
using Shouldly;
9+
using Wolverine;
10+
using Wolverine.Marten;
11+
using Wolverine.Tracking;
12+
13+
namespace MartenTests.TestHelpers;
14+
15+
public class catch_up_and_then_do_nothing : IAsyncLifetime
16+
{
17+
private IHost _host;
18+
19+
public async Task InitializeAsync()
20+
{
21+
22+
_host = await Host.CreateDefaultBuilder()
23+
.UseWolverine(opts =>
24+
{
25+
opts.Services.AddMarten(m =>
26+
{
27+
m.DisableNpgsqlLogging = true;
28+
29+
m.Connection(Servers.PostgresConnectionString);
30+
m.DatabaseSchemaName = "letters3";
31+
32+
m.Projections.Add<LetterCountsProjection>(ProjectionLifecycle.Async);
33+
}).AddAsyncDaemon(DaemonMode.Solo).IntegrateWithWolverine();
34+
35+
opts.Services.AddMartenStore<ILetterStore>(m =>
36+
{
37+
m.DisableNpgsqlLogging = true;
38+
39+
m.Connection(Servers.PostgresConnectionString);
40+
m.DatabaseSchemaName = "letters4";
41+
42+
m.Projections.Add<LetterCountsProjection>(ProjectionLifecycle.Async);
43+
}).IntegrateWithWolverine();
44+
45+
opts.Durability.Mode = DurabilityMode.Solo;
46+
}).StartAsync();
47+
}
48+
49+
public async Task DisposeAsync()
50+
{
51+
await _host.StopAsync();
52+
}
53+
54+
[Fact]
55+
public async Task with_main_store()
56+
{
57+
await _host.ResetAllMartenDataAsync();
58+
59+
var id = Guid.NewGuid();
60+
61+
// Setting up some other aggregates first
62+
using var session = _host.DocumentStore().LightweightSession();
63+
session.Events.StartStream<LetterCounts>("AABBCCDDEE".ToLetterEvents());
64+
session.Events.StartStream<LetterCounts>("AABBCCDDEE".ToLetterEvents());
65+
session.Events.StartStream<LetterCounts>("AABBCCDDEE".ToLetterEvents());
66+
await session.SaveChangesAsync();
67+
await _host.WaitForNonStaleProjectionDataAsync(5.Seconds());
68+
69+
(await session.Query<LetterCounts>().CountAsync()).ShouldBe(3);
70+
71+
72+
var tracked = await _host.TrackActivity()
73+
.ResetAllMartenDataFirst()
74+
.PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndDoNothing)
75+
.InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"]));
76+
77+
// Proving that previous data was wiped out
78+
79+
var all = await session.Query<LetterCounts>().ToListAsync();
80+
var counts = all.Single();
81+
counts.Id.ShouldBe(id);
82+
83+
counts.ACount.ShouldBe(7);
84+
counts.BCount.ShouldBe(5);
85+
counts.CCount.ShouldBe(8);
86+
}
87+
88+
[Fact]
89+
public async Task with_ancillary_store()
90+
{
91+
await _host.ResetAllMartenDataAsync<ILetterStore>();
92+
93+
var id = Guid.NewGuid();
94+
95+
// Setting up some other aggregates first
96+
using var session = _host.DocumentStore<ILetterStore>().LightweightSession();
97+
session.Events.StartStream<LetterCounts>("AABBCCDDEE".ToLetterEvents());
98+
session.Events.StartStream<LetterCounts>("AABBCCDDEE".ToLetterEvents());
99+
session.Events.StartStream<LetterCounts>("AABBCCDDEE".ToLetterEvents());
100+
await session.SaveChangesAsync();
101+
await _host.WaitForNonStaleProjectionDataAsync<ILetterStore>(5.Seconds());
102+
103+
(await session.Query<LetterCounts>().CountAsync()).ShouldBe(3);
104+
105+
106+
var tracked = await _host.TrackActivity()
107+
.ResetAllMartenDataFirst<ILetterStore>()
108+
.PauseThenCatchUpOnMartenDaemonActivity<ILetterStore>(CatchUpMode.AndDoNothing)
109+
.InvokeMessageAndWaitAsync(new AppendLetters2(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"]));
110+
111+
// Proving that previous data was wiped out
112+
113+
var all = await session.Query<LetterCounts>().ToListAsync();
114+
var counts = all.Single();
115+
counts.Id.ShouldBe(id);
116+
117+
counts.ACount.ShouldBe(7);
118+
counts.BCount.ShouldBe(5);
119+
counts.CCount.ShouldBe(8);
120+
}
121+
}

0 commit comments

Comments
 (0)