Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0bea383
using locally attached references for JasperFx, Weasel, and Marten
jeremydmiller Sep 8, 2025
c7c3476
Tests on MessagingSubscriptionsAddedHandler
jeremydmiller Sep 9, 2025
ab89e7a
using new JasperFx helper for kebab casing
jeremydmiller Sep 9, 2025
ee030a5
silly quick stop setting for critter watch testing
jeremydmiller Sep 12, 2025
22d1c02
eliminating an unused method on IDeadLetterAdminService
jeremydmiller Sep 12, 2025
3316757
Streamlined the signature of the IDeadLetterAdminService service
jeremydmiller Sep 12, 2025
4474ee1
fixes on message storage while coding against CritterWatch
jeremydmiller Sep 15, 2025
d8ee284
Adjustments to the test harness to bring in SignalR into the CritterW…
jeremydmiller Sep 15, 2025
794dc79
Added the new [EnlistInCurrentConnectionSaga] attribute and middleware
jeremydmiller Sep 15, 2025
02f218f
Introducing MessageStoreCollection
jeremydmiller Sep 16, 2025
7e986a0
More validation on adding ancillary sql server or postgresql message …
jeremydmiller Sep 18, 2025
7bee218
Made the StorageCommand aware of the MessageStoreCollection and added…
jeremydmiller Sep 18, 2025
e28990d
Review of usages WolverineRuntime.Storage
jeremydmiller Sep 18, 2025
0d7578b
Removed an API in IMessageInbox that was not used
jeremydmiller Sep 19, 2025
c0fcb3d
Renamed an API method on IMessageInbox that wasn't clear
jeremydmiller Sep 19, 2025
f871414
Pruned IMessageStore a bit more
jeremydmiller Sep 19, 2025
542e41b
Whipped up the Delegating message inbox & outbox for later
jeremydmiller Sep 19, 2025
9defcfe
Little preparation for more support for ancillary message stores
jeremydmiller Sep 19, 2025
491f30b
Simplifying IDeadLetters before doing more work
jeremydmiller Sep 19, 2025
748af3f
Ton of work toward consolidating the DLQ admin service endpoints
jeremydmiller Sep 19, 2025
045a964
All existing tests passing on the DLQ admin services
jeremydmiller Sep 21, 2025
68ec644
Converted the tests on the HTTP endpoints for DLQ admin
jeremydmiller Sep 21, 2025
2dc074c
Full support for ancillary stores with EF Core or Marten. Closes GH-1…
jeremydmiller Sep 21, 2025
9282539
Detached from JasperFx code, imported PascalToKebabCase() again
jeremydmiller Sep 22, 2025
24f41ad
Tweak for unit tests that use too much mocking:(
jeremydmiller Sep 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public async Task start_up_with_no_persistence()
});

#endregion

host.Services.GetRequiredService<IMessageStore>().ShouldBeOfType<NullMessageStore>();

host.GetRuntime().Storage.ShouldBeOfType<NullMessageStore>();
}
}
36 changes: 21 additions & 15 deletions src/Http/Wolverine.Http.Tests/dead_letter_endpoints.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using JasperFx.Core.Reflection;
using Shouldly;
using Wolverine.Persistence.Durability.DeadLetterManagement;
using Wolverine.Tracking;
using WolverineWebApi;

Expand Down Expand Up @@ -32,18 +33,20 @@ await Host.TrackActivity().DoNotAssertOnExceptionsDetected().ExecuteAndWaitAsync
});

// Expect
var deadletters = result.ReadAsJson<DeadLetterEnvelopesFoundResponse>();
var deadletters = (result.ReadAsJson<IReadOnlyList<DeadLetterEnvelopeResults>>()).Single();


deadletters
.ShouldNotBeNull().Messages.Count.ShouldBe(1);
deadletters.Messages[0].ExceptionType.ShouldBe(typeof(AlwaysDeadLetterException).FullNameInCode());
deadletters.Messages[0].ExceptionMessage.ShouldBe(exceptionMessage);
deadletters.Messages[0].Body.ShouldNotBeNull();
deadletters.Messages[0].Id.ShouldNotBe(default);
deadletters.Messages[0].MessageType.ShouldNotBeNull();
deadletters.Messages[0].ReceivedAt.ShouldNotBeNull();
deadletters.Messages[0].Source.ShouldNotBeNull();
deadletters.Messages[0].SentAt.ShouldNotBe(default);
deadletters.Messages[0].Replayable.ShouldBeFalse();
.ShouldNotBeNull().Envelopes.Count.ShouldBe(1);
deadletters.Envelopes[0].ExceptionType.ShouldBe(typeof(AlwaysDeadLetterException).FullNameInCode());
deadletters.Envelopes[0].ExceptionMessage.ShouldBe(exceptionMessage);
deadletters.Envelopes[0].Message.ShouldNotBeNull();
deadletters.Envelopes[0].Id.ShouldNotBe(default);
deadletters.Envelopes[0].MessageType.ShouldNotBeNull();
deadletters.Envelopes[0].ReceivedAt.ShouldNotBeNull();
deadletters.Envelopes[0].Source.ShouldNotBeNull();
deadletters.Envelopes[0].SentAt.ShouldNotBe(default);
deadletters.Envelopes[0].Replayable.ShouldBeFalse();
}

[Fact]
Expand All @@ -70,12 +73,14 @@ await Host.TrackActivity().DoNotAssertOnExceptionsDetected().ExecuteAndWaitAsync
});

// When & Expect
var deadletters = result.ReadAsJson<DeadLetterEnvelopesFoundResponse>();
var all = result.ReadAsJson<IReadOnlyList<DeadLetterEnvelopeResults>>();
var id = all[0].Envelopes.Single().Id;

await Scenario(x =>
{
x.Post.Json(new DeadLetterEnvelopeIdsRequest
{
Ids = [deadletters.Messages.Single().Id]
Ids = [id]
}).ToUrl("/dead-letters/replay");
});
}
Expand Down Expand Up @@ -104,13 +109,14 @@ await Host.TrackActivity().DoNotAssertOnExceptionsDetected().ExecuteAndWaitAsync
});

// When & Expect
var deadletters = result.ReadAsJson<DeadLetterEnvelopesFoundResponse>();
var deadletters = result.ReadAsJson<IReadOnlyList<DeadLetterEnvelopeResults>>();
var id = deadletters[0].Envelopes[0].Id;

await Scenario(x =>
{
x.Delete.Json(new DeadLetterEnvelopeIdsRequest
{
Ids = [deadletters.Messages.Single().Id]
Ids = [id]
}).ToUrl("/dead-letters");
});
}
Expand Down
104 changes: 35 additions & 69 deletions src/Http/Wolverine.Http/DeadLettersEndpointExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,99 +2,65 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.Options;
using Wolverine.Persistence.Durability;
using Wolverine.Persistence;
using Wolverine.Persistence.Durability.DeadLetterManagement;
using Wolverine.Runtime.Handlers;
using Wolverine.Runtime;

namespace Wolverine.Http;

public class DeadLetterEnvelopeGetRequest
{
/// <summary>
/// Number of records to return per page.
/// </summary>
public uint Limit { get; set; } = 100;
/// <summary>
/// Fetch records starting after the record with this ID.
/// </summary>
public Guid? StartId { get; set; }
public string? MessageType { get; set; }
public string? ExceptionType { get; set; }
public string? ExceptionMessage { get; set; }
public DateTimeOffset? From { get; set; }
public DateTimeOffset? Until { get; set; }
public string? TenantId { get; set; }
}

public class DeadLetterEnvelopeIdsRequest
{
public Guid[] Ids { get; set; }
public string? TenantId { get; set; }
}

public record DeadLetterEnvelopesFoundResponse(IReadOnlyList<DeadLetterEnvelopeResponse> Messages, Guid? NextId);

public record DeadLetterEnvelopeResponse(
Guid Id,
DateTimeOffset? ExecutionTime,
object? Body,
string MessageType,
string ReceivedAt,
string Source,
string ExceptionType,
string ExceptionMessage,
DateTimeOffset SentAt,
bool Replayable);

public static class DeadLettersEndpointExtensions
{
/// <summary>
/// Add endpoints to manage the Wolverine database-backed deal letter queue for this
/// application.
/// Add endpoints to manage the Wolverine database-backed deal letter queue for this
/// application.
/// </summary>
/// <param name="groupUrlPrefix">Optionally override the group Url prefix for these endpoints. The default is "/dead-letters"</param>
public static RouteGroupBuilder MapDeadLettersEndpoints(this IEndpointRouteBuilder endpoints, string? groupUrlPrefix = "/dead-letters")
/// <param name="groupUrlPrefix">
/// Optionally override the group Url prefix for these endpoints. The default is
/// "/dead-letters"
/// </param>
public static RouteGroupBuilder MapDeadLettersEndpoints(this IEndpointRouteBuilder endpoints,
string? groupUrlPrefix = "/dead-letters")
{
if (groupUrlPrefix.IsEmpty())
{
throw new ArgumentNullException(nameof(groupUrlPrefix), "Cannot be empty or null");
}

var deadlettersGroup = endpoints.MapGroup(groupUrlPrefix);

deadlettersGroup.MapPost("/", async (DeadLetterEnvelopeGetRequest request, IMessageStore messageStore, HandlerGraph handlerGraph, IOptions<WolverineOptions> opts) =>
deadlettersGroup.MapPost("/",
async (DeadLetterEnvelopeGetRequest request, MessageStoreCollection stores,
CancellationToken cancellation) =>
{
return await stores.FetchDeadLetterEnvelopesAsync(request, cancellation);
});

deadlettersGroup.MapPost("/replay", (DeadLetterEnvelopeIdsRequest request, IWolverineRuntime runtime) =>
{
var deadLetters = messageStore.DeadLetters;
var queryParameters = new DeadLetterEnvelopeQueryParameters
if (request.TenantId.IsEmpty())
{
Limit = request.Limit,
StartId = request.StartId,
MessageType = request.MessageType,
ExceptionType = request.ExceptionType,
ExceptionMessage = request.ExceptionMessage,
From = request.From,
Until = request.Until
};
var deadLetterEnvelopesFound = await deadLetters.QueryDeadLetterEnvelopesAsync(queryParameters, request.TenantId);
return new DeadLetterEnvelopesFoundResponse(
[.. deadLetterEnvelopesFound.DeadLetterEnvelopes.Select(x => new DeadLetterEnvelopeResponse(
x.Id,
x.ExecutionTime,
handlerGraph.TryFindMessageType(x.MessageType, out var messageType) ? opts.Value.DetermineSerializer(x.Envelope).ReadFromData(messageType, x.Envelope) : null,
x.MessageType,
x.ReceivedAt,
x.Source,
x.ExceptionType,
x.ExceptionMessage,
x.SentAt,
x.Replayable))
],
deadLetterEnvelopesFound.NextId);
return runtime.Stores.ReplayDeadLettersAsync(request.Ids);
}

return runtime.Stores.ReplayDeadLettersAsync(request.TenantId, request.Ids);
});

deadlettersGroup.MapPost("/replay", (DeadLetterEnvelopeIdsRequest request, IMessageStore messageStore) =>
messageStore.DeadLetters.MarkDeadLetterEnvelopesAsReplayableAsync(request.Ids, request.TenantId));

deadlettersGroup.MapDelete("/", ([FromBody]DeadLetterEnvelopeIdsRequest request, IMessageStore messageStore) =>
messageStore.DeadLetters.DeleteDeadLetterEnvelopesAsync(request.Ids, request.TenantId));
deadlettersGroup.MapDelete("/", ([FromBody] DeadLetterEnvelopeIdsRequest request, IWolverineRuntime runtime) =>
{
if (request.TenantId.IsEmpty())
{
return runtime.Stores.DiscardDeadLettersAsync(request.Ids);
}

return runtime.Stores.DiscardDeadLettersAsync(request.TenantId, request.Ids);
});

return deadlettersGroup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Wolverine.Marten.Publishing;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using Wolverine.Tracking;

namespace MartenTests.AncillaryStores;
Expand All @@ -23,7 +24,7 @@ public class ancillary_stores_use_different_databases : IAsyncLifetime

private string playersConnectionString;
private string thingsConnectionString;
private DurabilityAgentFamily theFamily;
private IAgentFamily theStores;

public async Task InitializeAsync()
{
Expand Down Expand Up @@ -64,8 +65,8 @@ public async Task InitializeAsync()

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
theFamily = new DurabilityAgentFamily(theHost.GetRuntime());

theStores = theHost.GetRuntime().Stores;
}

public async Task DisposeAsync()
Expand Down Expand Up @@ -106,10 +107,10 @@ public async Task create_transaction_for_separate_store_on_different_database()
[Fact]
public async Task have_durability_agents_for_other_databases()
{
var uris = await theFamily.AllKnownAgentsAsync();
uris.ShouldBe([
new Uri("wolverinedb://postgresql/localhost/postgres/wolverine"),
var uris = await theStores.AllKnownAgentsAsync();
uris.OrderBy(x => x.ToString()).ShouldBe([
new Uri("wolverinedb://postgresql/localhost/players/wolverine"),
new Uri("wolverinedb://postgresql/localhost/postgres/wolverine"),
new Uri("wolverinedb://postgresql/localhost/things/wolverine"),
]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Wolverine.Postgresql;
using Wolverine.RDBMS;
using Wolverine.RDBMS.MultiTenancy;
using Wolverine.Runtime.Agents;
using Wolverine.Tracking;
using Xunit.Abstractions;

Expand All @@ -30,7 +31,7 @@ public class bootstrapping_ancillary_marten_stores_with_wolverine : IAsyncLifeti
private string tenant1ConnectionString;
private string tenant2ConnectionString;
private string tenant3ConnectionString;
private DurabilityAgentFamily theFamily;
private IAgentFamily theFamily;
private IHost theHost;

public bootstrapping_ancillary_marten_stores_with_wolverine(ITestOutputHelper output)
Expand Down Expand Up @@ -101,7 +102,7 @@ public async Task InitializeAsync()

#endregion

theFamily = new DurabilityAgentFamily(theHost.GetRuntime());
theFamily = theHost.GetRuntime().Stores;
}

public async Task DisposeAsync()
Expand Down Expand Up @@ -137,17 +138,18 @@ private async Task dropSchemaOnDatabase(string connectionString, string schemaNa
public void registers_the_single_tenant_ancillary_store()
{
theHost.DocumentStore<IPlayerStore>().ShouldNotBeNull();
var ancillaries = theHost.Services.GetServices<IAncillaryMessageStore>();
ancillaries.OfType<PostgresqlMessageStore<IPlayerStore>>().Any().ShouldBeTrue();
var ancillaries = theHost.Services.GetServices<AncillaryMessageStore>();
ancillaries.Select(x => x.MarkerType == typeof(IPlayerStore)).Any().ShouldBeTrue();
}

[Fact]
public void registers_the_multiple_tenant_ancillary_store()
{
theHost.DocumentStore<IThingStore>().ShouldNotBeNull();
var ancillaries = theHost.Services.GetServices<IAncillaryMessageStore>();
ancillaries.OfType<MultiTenantedMessageStore<IThingStore>>().Any()
.ShouldBeTrue();
var ancillaries = theHost.Services.GetServices<AncillaryMessageStore>();
ancillaries.Single(x => x.MarkerType == typeof(IThingStore))
.Inner
.ShouldBeOfType<MultiTenantedMessageStore>();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Wolverine;
using Wolverine.ErrorHandling;
using Wolverine.Marten;
using Wolverine.Persistence.Durability;
using Wolverine.Persistence.Durability.DeadLetterManagement;
using Wolverine.Runtime.Handlers;
using Wolverine.Tracking;
Expand Down Expand Up @@ -59,20 +60,20 @@ public async Task can_replay_dead_letter_event()
{
count++;
var messages =
await runtime.Storage.DeadLetters.QueryDeadLetterEnvelopesAsync(
new DeadLetterEnvelopeQueryParameters());
await runtime.Storage.DeadLetters.QueryAsync(
new DeadLetterEnvelopeQuery(), CancellationToken.None);

if (hasReplayed && !messages.DeadLetterEnvelopes.Any())
if (hasReplayed && !messages.Envelopes.Any())
{
break; // we're done!
}

if (messages.DeadLetterEnvelopes.Any(x => !x.Replayable))
if (messages.Envelopes.Any(x => !x.Replayable))
{
ErrorCausingEventHandler.ShouldThrow = false;

await runtime.Storage.DeadLetters.MarkDeadLetterEnvelopesAsReplayableAsync(messages
.DeadLetterEnvelopes.Select(x => x.Id).ToArray());
.Envelopes.Select(x => x.Id).ToArray());

hasReplayed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public MultiTenancyContext(MultiTenancyFixture fixture)
{
Fixture = fixture;
Runtime = fixture.Host.GetRuntime();
Stores = Runtime.Storage.ShouldBeOfType<MultiTenantedMessageStore>();
Stores = Runtime.Stores.MultiTenanted.Single();
}

public MultiTenantedMessageStore Stores { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Npgsql;
using Shouldly;
using Weasel.Postgresql;
using Wolverine.Persistence.Durability;
using Wolverine.RDBMS;
using Wolverine.Transports;

Expand Down Expand Up @@ -30,6 +31,19 @@ public void should_have_the_specified_master_database_as_master()
.Database.ShouldBe("postgres");
}

[Fact]
public async Task store_roles()
{
Stores.Main.Role.ShouldBe(MessageStoreRole.Main);
foreach (var activeDatabase in Stores.ActiveDatabases())
{
if (activeDatabase != Stores.Main)
{
activeDatabase.Role.ShouldBe(MessageStoreRole.Tenant);
}
}
}

[Fact]
public void knows_about_tenant_databases()
{
Expand Down Expand Up @@ -104,9 +118,9 @@ public void only_the_master_database_is_the_master()
{
foreach (var database in Stores.ActiveDatabases().OfType<IMessageDatabase>().Where(x => x.Name != StorageConstants.Main))
{
database.IsMain.ShouldBeFalse();
database.Role.ShouldBe(MessageStoreRole.Tenant);
}

Stores.Main.As<IMessageDatabase>().IsMain.ShouldBeTrue();
Stores.Main.As<IMessageDatabase>().Role.ShouldBe(MessageStoreRole.Main);
}
}
Loading
Loading