Skip to content

Commit d57687a

Browse files
Dedupe follow-up: ProjectionCoordinatorBase (#4516) + HiloSequenceBase (#4527) (#4532)
* #4516: reduce ProjectionCoordinator to a ProjectionCoordinatorBase subclass #326 lifted the projection-coordinator execute loop + agent-start resilience + lifecycle (Start/Pause/Resume/Stop) into JasperFx.Events.Daemon.ProjectionCoordinatorBase. Reduce Marten's coordinator to a subclass: - Supply via the base ctor: BuildDistributor(store), the store's ResiliencePipeline, Events.TimeProvider, and the three settings (LeadershipPollingTime as a TimeSpan, AgentPauseTime, HealthCheckPollingTime). - Implement the seams: ResolveDaemon (keeps Marten's ImHashMap + double-checked-lock daemon cache) and ResolvedDaemons; keep the existing DaemonForMainDatabase / DaemonForDatabase / AllDaemonsAsync. - Delete the lifted-into-base members: executeAsync, start/stopAgentsIfNecessary, tryStartAgent, the Start/Pause/Resume/Stop bodies, pauseDistributor, and the DaemonShardName record. Behavior is unchanged (the base adopted Marten's resilient agent-start + single-cancellation lifecycle verbatim). DaemonTests 185/0; HotCold leadership-election tests (detect_high_water_mark, end_to_end_with_events_already_published) pass. Closes #4516. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * #4527: derive HiloSequence from Weasel.Core.Sequences.HiloSequenceBase Completes #4527 (the serialization-enum subset landed in #4529). weasel#287 lifted the dialect-agnostic Hi-Lo pieces into Weasel.Core.Sequences: HiloSequenceBase (state + arithmetic), ISequence, IReadOnlyHiloSettings, HiloSettings, and HiloSequenceAdvanceToNextHiAttemptsExceededException. - Marten's HiloSequence now derives from HiloSequenceBase, keeping only the PostgreSQL I/O (mt_get_next_hi stored function in AdvanceToNextHi/Sync, the mt_hilo floor update in SetFloor). The client-side arithmetic (AdvanceValue/ShouldAdvanceHi/NextInt/NextLong/TrySetCurrentHi/hi-lo state) is inherited verbatim — id allocation is unchanged. - Delete Marten's ISequence / IReadOnlyHiloSettings / HiloSettings / HiloSequenceAdvanceToNextHiAttemptsExceededException copies and alias the old names via the shared dedupe file (shapes identical; the exception's base changed MartenException -> Exception but nothing catches it as MartenException). - AdvanceToNextHiSync is protected on the base; the one test that drove it directly now invokes it reflectively to keep exercising the sync I/O path. Hi-Lo + sequence-id tests pass (32/0). Closes #4527. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent dfa23e8 commit d57687a

8 files changed

Lines changed: 60 additions & 370 deletions

File tree

src/CoreTests/Storage/Identification/closed_shape_id_strategies_tests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public async Task identity_key_assigns_alias_slash_sequence()
8080
{
8181
opts.Schema.For<IdentityKeyDoc>().IdStrategy(
8282
new Marten.Schema.Identity.Sequences.IdentityKeyGeneration(
83-
(DocumentMapping)null!, new Marten.Schema.Identity.Sequences.HiloSettings()));
83+
(DocumentMapping)null!, new HiloSettings()));
8484
});
8585

8686
var doc = new IdentityKeyDoc { Name = "v1" };

src/DocumentDbTests/Writing/Identity/Sequences/hilo_tests.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Reflection;
56
using System.Threading.Tasks;
67
using JasperFx.Core;
78
using JasperFx.Core.Reflection;
@@ -22,7 +23,11 @@ public class AdvanceToNextHi : IEnumerable<object[]>
2223
{
2324
private static readonly Func<HiloSequence, Task> AsyncNext = sequence => sequence.AdvanceToNextHi();
2425
private static readonly Func<HiloSequence, Task> SyncNext = sequence => {
25-
sequence.AdvanceToNextHiSync();
26+
// AdvanceToNextHiSync is protected on the lifted Weasel.Core HiloSequenceBase (#4527),
27+
// so invoke it reflectively to keep exercising the synchronous hi-advance I/O path.
28+
typeof(HiloSequence)
29+
.GetMethod("AdvanceToNextHiSync", BindingFlags.Instance | BindingFlags.NonPublic)!
30+
.Invoke(sequence, null);
2631
return Task.CompletedTask;
2732
};
2833
public IEnumerator<object[]> GetEnumerator()
Lines changed: 35 additions & 253 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4-
using System.Threading;
54
using System.Threading.Tasks;
65
using ImTools;
7-
using JasperFx;
86
using JasperFx.Core;
97
using JasperFx.Events.Daemon;
108
using JasperFx.Events.Projections;
@@ -23,35 +21,42 @@ public ProjectionCoordinator(T documentStore, ILogger<ProjectionCoordinator> log
2321
}
2422
}
2523

26-
public class ProjectionCoordinator: IProjectionCoordinator
24+
// 9.0 (#4516 dedupe): the leadership-election + agent-lifecycle loop now lives in
25+
// JasperFx.Events.Daemon.ProjectionCoordinatorBase. Marten supplies the distributor +
26+
// settings via the base ctor and implements the daemon-resolution seams, keeping its
27+
// own ImHashMap + double-checked-lock daemon cache.
28+
public class ProjectionCoordinator: ProjectionCoordinatorBase, IProjectionCoordinator
2729
{
2830
private readonly System.Threading.Lock _daemonLock = new();
29-
private readonly ILogger<ProjectionCoordinator> _logger;
30-
private readonly StoreOptions _options;
31-
32-
private readonly ResiliencePipeline _resilience;
33-
private readonly TimeProvider _timeProvider;
34-
private CancellationTokenSource? _cancellation;
31+
private readonly ILogger _logger;
3532

3633
private ImHashMap<string, IProjectionDaemon> _daemons = ImHashMap<string, IProjectionDaemon>.Empty;
37-
private Task? _runner;
3834

3935
public ProjectionCoordinator(IDocumentStore documentStore, ILogger<ProjectionCoordinator> logger)
36+
: this((DocumentStore)documentStore, logger)
4037
{
41-
var store = (DocumentStore)documentStore;
38+
}
4239

40+
private ProjectionCoordinator(DocumentStore store, ILogger<ProjectionCoordinator> logger)
41+
: base(
42+
BuildDistributor(store),
43+
logger,
44+
store.Options.ResiliencePipeline,
45+
store.Options.Events.TimeProvider,
46+
store.Options.Projections.LeadershipPollingTime.Milliseconds(),
47+
store.Options.Projections.AgentPauseTime,
48+
store.Options.Projections.HealthCheckPollingTime)
49+
{
4350
Mode = store.Options.Projections.AsyncMode;
44-
45-
Distributor = BuildDistributor(store);
46-
47-
_options = store.Options;
48-
_logger = logger;
49-
_resilience = store.Options.ResiliencePipeline;
50-
_timeProvider = _options.Events.TimeProvider;
5151
Store = store;
52+
_logger = logger;
5253
}
5354

54-
// 9.0 (#4349 dedupe): the Solo / SingleTenant / MultiTenanted distributors now live in
55+
public DaemonMode Mode { get; }
56+
57+
public DocumentStore Store { get; }
58+
59+
// 9.0 (#4349 dedupe): the Solo / SingleTenant / MultiTenanted distributors live in
5560
// JasperFx.Events. Marten wires them with closures over its own tenancy, shard, and lock
5661
// surfaces. ProjectionSet (Marten-side) remains the IProjectionSet implementation, and the
5762
// Postgres lock factory hands back Weasel's AdvisoryLock — which implements
@@ -108,110 +113,36 @@ private static Func<IProjectionDatabase, IAdvisoryLock> buildLockFactory(Documen
108113
});
109114
}
110115

111-
public DaemonMode Mode { get; }
112-
113-
public DocumentStore Store { get; }
116+
protected override IProjectionDaemon ResolveDaemon(IProjectionSet set)
117+
{
118+
return findDaemonForDatabase((MartenDatabase)set.Database);
119+
}
114120

115-
public IProjectionDistributor Distributor { get; }
121+
protected override IReadOnlyList<IProjectionDaemon> ResolvedDaemons()
122+
{
123+
return _daemons.Enumerate().Select(x => x.Value).ToList();
124+
}
116125

117-
public IProjectionDaemon DaemonForMainDatabase()
126+
public override IProjectionDaemon DaemonForMainDatabase()
118127
{
119128
var database = (MartenDatabase)Store.Tenancy.Default.Database;
120129

121130
return findDaemonForDatabase(database);
122131
}
123132

124-
public async ValueTask<IProjectionDaemon> DaemonForDatabase(string databaseIdentifier)
133+
public override async ValueTask<IProjectionDaemon> DaemonForDatabase(string databaseIdentifier)
125134
{
126135
var database =
127136
(MartenDatabase)await Store.Storage.FindOrCreateDatabase(databaseIdentifier).ConfigureAwait(false);
128137
return findDaemonForDatabase(database);
129138
}
130139

131-
public async ValueTask<IReadOnlyList<IProjectionDaemon>> AllDaemonsAsync()
140+
public override async ValueTask<IReadOnlyList<IProjectionDaemon>> AllDaemonsAsync()
132141
{
133142
var all = await Store.Storage.AllDatabases().ConfigureAwait(false);
134143
return all.OfType<MartenDatabase>().Select(findDaemonForDatabase).ToList();
135144
}
136145

137-
public Task StartAsync(CancellationToken cancellationToken)
138-
{
139-
_cancellation?.SafeDispose();
140-
141-
_cancellation = new CancellationTokenSource();
142-
_runner = Task.Run(() => executeAsync(_cancellation.Token), _cancellation.Token);
143-
144-
return Task.CompletedTask;
145-
}
146-
147-
public async Task PauseAsync()
148-
{
149-
_logger.LogInformation("Pausing ProjectionCoordinator");
150-
if (_cancellation != null)
151-
{
152-
await _cancellation.CancelAsync().ConfigureAwait(false);
153-
}
154-
155-
await pauseDistributor().ConfigureAwait(false);
156-
157-
foreach (var pair in _daemons.Enumerate())
158-
{
159-
try
160-
{
161-
await pair.Value.StopAllAsync().ConfigureAwait(false);
162-
}
163-
catch (Exception exception)
164-
{
165-
_logger.LogError(exception, "Error while trying to stop daemon agents in database {Name}", pair.Key);
166-
}
167-
}
168-
}
169-
170-
private async Task pauseDistributor()
171-
{
172-
if (_runner == null) return;
173-
174-
try
175-
{
176-
#pragma warning disable VSTHRD003
177-
await _runner.ConfigureAwait(false);
178-
#pragma warning restore VSTHRD003
179-
}
180-
catch (TaskCanceledException)
181-
{
182-
// Nothing, just from shutting down
183-
}
184-
catch (OperationCanceledException)
185-
{
186-
// Nothing, just from shutting down
187-
}
188-
catch (Exception e)
189-
{
190-
_logger.LogError(e, "Error while trying to stop the ProjectionCoordinator");
191-
}
192-
}
193-
194-
public Task ResumeAsync()
195-
{
196-
return StartAsync(default);
197-
}
198-
199-
public async Task StopAsync(CancellationToken cancellationToken)
200-
{
201-
await PauseAsync().ConfigureAwait(false);
202-
203-
foreach (var daemon in _daemons.Enumerate()) daemon.Value.SafeDispose();
204-
205-
try
206-
{
207-
await Distributor.ReleaseAllLocks().ConfigureAwait(false);
208-
}
209-
catch (Exception e)
210-
{
211-
_logger.LogError(e, "Error trying to release subscription agent locks");
212-
}
213-
}
214-
215146
private IProjectionDaemon findDaemonForDatabase(MartenDatabase database)
216147
{
217148
if (_daemons.TryFind(database.Identifier, out var daemon))
@@ -232,153 +163,4 @@ private IProjectionDaemon findDaemonForDatabase(MartenDatabase database)
232163

233164
return daemon;
234165
}
235-
236-
private async Task executeAsync(CancellationToken stoppingToken)
237-
{
238-
await Distributor.RandomWait(stoppingToken).ConfigureAwait(false);
239-
240-
while (!stoppingToken.IsCancellationRequested)
241-
{
242-
try
243-
{
244-
var sets = await Distributor
245-
.BuildDistributionAsync().ConfigureAwait(false);
246-
247-
foreach (var set in sets)
248-
{
249-
// Is it already running here?
250-
if (Distributor.HasLock(set))
251-
{
252-
var daemon = resolveDaemon(set);
253-
254-
// check if it's still running
255-
await startAgentsIfNecessaryAsync(set, daemon, stoppingToken).ConfigureAwait(false);
256-
continue;
257-
}
258-
259-
try
260-
{
261-
if (await Distributor.TryAttainLockAsync(set, stoppingToken).ConfigureAwait(false))
262-
{
263-
var daemon = resolveDaemon(set);
264-
265-
// check if it's still running
266-
await startAgentsIfNecessaryAsync(set, daemon, stoppingToken).ConfigureAwait(false);
267-
}
268-
else
269-
{
270-
// We don't hold the lock, so we might've lost it due to a postgres outage. We should make sure any agents are no longer running on this node.
271-
var daemon = resolveDaemon(set);
272-
273-
await stopAgentsIfNecessaryAsync(set, daemon).ConfigureAwait(false);
274-
}
275-
}
276-
catch (Exception e)
277-
{
278-
_logger.LogError(e, "Error trying to attain a lock for set {Name} and lock id {LockId}. Will retry later", set.Names.Select(x => x.Identity).Join(", "), set.LockId);
279-
await Task.Delay(_options.Projections.LeadershipPollingTime.Milliseconds(), stoppingToken)
280-
.ConfigureAwait(false);
281-
}
282-
}
283-
}
284-
catch (Exception e)
285-
{
286-
if (stoppingToken.IsCancellationRequested)
287-
{
288-
return;
289-
}
290-
291-
// Only really expect any errors if there are dynamic tenants in place
292-
_logger.LogError(e, "Error trying to resolve projection distributions");
293-
}
294-
295-
if (stoppingToken.IsCancellationRequested)
296-
{
297-
return;
298-
}
299-
300-
try
301-
{
302-
if (_daemons.Enumerate().Any(x => x.Value.HasAnyPaused()))
303-
{
304-
await Task.Delay(_options.Projections.AgentPauseTime, stoppingToken).ConfigureAwait(false);
305-
}
306-
else
307-
{
308-
await Task.Delay(_options.Projections.LeadershipPollingTime.Milliseconds(), stoppingToken)
309-
.ConfigureAwait(false);
310-
}
311-
}
312-
catch (TaskCanceledException)
313-
{
314-
// just get out of here, this signals a graceful shutdown attempt
315-
}
316-
catch (OperationCanceledException)
317-
{
318-
// Nothing, just from shutting down
319-
}
320-
}
321-
}
322-
323-
324-
private async Task startAgentsIfNecessaryAsync(IProjectionSet set,
325-
IProjectionDaemon daemon, CancellationToken stoppingToken)
326-
{
327-
foreach (var name in set.Names)
328-
{
329-
var agent = daemon.CurrentAgents().FirstOrDefault(x => x.Name.Equals(name));
330-
if (agent == null)
331-
{
332-
await tryStartAgent(stoppingToken, daemon, name, set).ConfigureAwait(false);
333-
}
334-
else if (agent is { Status: AgentStatus.Paused, PausedTime: not null } &&
335-
_timeProvider.GetUtcNow().Subtract(agent.PausedTime.Value) >
336-
_options.Projections.HealthCheckPollingTime)
337-
{
338-
await tryStartAgent(stoppingToken, daemon, name, set).ConfigureAwait(false);
339-
}
340-
}
341-
}
342-
343-
private async Task stopAgentsIfNecessaryAsync(IProjectionSet set, IProjectionDaemon daemon)
344-
{
345-
foreach (var shardName in set.Names)
346-
{
347-
var status = daemon.StatusFor(shardName.Identity);
348-
if (status == AgentStatus.Running)
349-
{
350-
await daemon.StopAgentAsync(shardName.Identity).ConfigureAwait(false);
351-
}
352-
353-
}
354-
}
355-
356-
private IProjectionDaemon resolveDaemon(IProjectionSet set)
357-
{
358-
return findDaemonForDatabase((MartenDatabase)set.Database);
359-
}
360-
361-
private async Task tryStartAgent(CancellationToken stoppingToken, IProjectionDaemon daemon, ShardName name,
362-
IProjectionSet set)
363-
{
364-
try
365-
{
366-
await _resilience.ExecuteAsync(
367-
static (x, t) => new ValueTask(x.Daemon.StartAgentAsync(x.Name.Identity, t)),
368-
new DaemonShardName(daemon, name), stoppingToken).ConfigureAwait(false);
369-
}
370-
catch (Exception e)
371-
{
372-
_logger.LogError(e, "Error trying to start subscription {Name} on database {Database}", name.Identity,
373-
set.Database.Identifier);
374-
if (daemon.StatusFor(name.Identity) == AgentStatus.Paused)
375-
{
376-
daemon.EjectPausedShard(name.Identity);
377-
}
378-
379-
await Distributor.ReleaseLockAsync(set).ConfigureAwait(false);
380-
}
381-
}
382-
383-
internal record DaemonShardName(IProjectionDaemon Daemon, ShardName Name);
384166
}

src/Marten/Exceptions/HiloSequenceAdvanceToNextHiAttemptsExceededException.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)