Skip to content

Commit 5a116ac

Browse files
committed
Improvements to the tracked sessions for failure acks and scheduled messages. Closes GH-1697. Closes GH-1694. Closes GH-1689. Closes GH-1688
Ability to replay scheduled messages WIP: handling scheduled messages in tracked sessions Adding some awareness of Failure acks to the tracked session testing support Adding more context into Failure Ack messages to troubleshoot problems w/ request/reply scenarios. Closes GH-1689
1 parent 0e55ec6 commit 5a116ac

17 files changed

+1034
-510
lines changed

docs/guide/testing.md

Lines changed: 572 additions & 475 deletions
Large diffs are not rendered by default.

src/Http/Wolverine.Http.Tests/IntegrationContext.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public async Task<IScenarioResult> Scenario(Action<Scenario> configure)
185185
// for message tracking to both record outgoing messages and to ensure
186186
// that any cascaded work spawned by the initial command is completed
187187
// before passing control back to the calling test
188-
protected async Task<(ITrackedSession, IScenarioResult)> TrackedHttpCall(Action<Scenario> configuration)
188+
protected async Task<(ITrackedSession, IScenarioResult)> TrackedHttpCall(Action<Scenario> configuration, int timeoutInMilliseconds = 5000)
189189
{
190190
IScenarioResult result = null!;
191191

@@ -196,7 +196,7 @@ public async Task<IScenarioResult> Scenario(Action<Scenario> configure)
196196
// The inner part here is actually making an HTTP request
197197
// to the system under test with Alba
198198
result = await Host.Scenario(configuration);
199-
});
199+
}, timeoutInMilliseconds);
200200

201201
return (tracked, result);
202202
}

src/Http/Wolverine.Http.Tests/using_efcore.cs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using IntegrationTests;
2+
using JasperFx.Resources;
23
using Microsoft.EntityFrameworkCore;
34
using Microsoft.Extensions.DependencyInjection;
45
using Npgsql;
@@ -15,6 +16,8 @@ public class using_efcore : IntegrationContext
1516
public using_efcore(AppFixture fixture) : base(fixture)
1617
{
1718
}
19+
20+
1821

1922
[Fact]
2023
public async Task using_db_context_without_outbox()
@@ -61,6 +64,8 @@ public async Task using_db_context_with_outbox()
6164

6265
private async Task cleanItems()
6366
{
67+
await Host.ResetResourceState();
68+
6469
var table = new Table("items");
6570
table.AddColumn<Guid>("Id").AsPrimaryKey();
6671
table.AddColumn<string>("Name");
@@ -75,4 +80,73 @@ private async Task cleanItems()
7580
await conn.CloseAsync();
7681
}
7782
}
83+
84+
85+
[Fact]
86+
public async Task using_db_context_with_outbox_schedule()
87+
{
88+
await cleanItems();
89+
90+
var command = new CreateItemCommand { Name = "Jerick McKinnon" };
91+
92+
var (tracked, _) = await TrackedHttpCall(x =>
93+
{
94+
x.Post.Json(command).ToUrl("/ef/schedule");
95+
x.StatusCodeShouldBe(204);
96+
}, 60000);
97+
98+
using var nested = Host.Services.CreateScope();
99+
var context = nested.ServiceProvider.GetRequiredService<ItemsDbContext>();
100+
101+
var item = await context.Items.Where(x => x.Name == command.Name).FirstOrDefaultAsync();
102+
item.ShouldBeNull();
103+
104+
var records = tracked.AllRecordsInOrder().ToArray();
105+
106+
var scheduledMessage = tracked.Scheduled.SingleEnvelope<ItemCreated>();
107+
scheduledMessage.ScheduleDelay.ShouldNotBeNull();
108+
scheduledMessage.Message.ShouldBeOfType<ItemCreated>();
109+
}
110+
111+
[Fact]
112+
public async Task using_db_context_with_outbox_schedule2()
113+
{
114+
await cleanItems();
115+
116+
var command = new CreateItemCommand { Name = "Jerick McKinnon" };
117+
118+
var (tracked, _) = await TrackedHttpCall(x =>
119+
{
120+
x.Post.Json(command).ToUrl("/ef/schedule2");
121+
x.StatusCodeShouldBe(204);
122+
});
123+
124+
using var nested = Host.Services.CreateScope();
125+
var context = nested.ServiceProvider.GetRequiredService<ItemsDbContext>();
126+
127+
var item = await context.Items.Where(x => x.Name == command.Name).FirstOrDefaultAsync();
128+
item.ShouldBeNull();
129+
130+
var scheduledMessage = tracked.Scheduled.SingleEnvelope<ItemCreated>();
131+
scheduledMessage.ScheduleDelay.ShouldNotBeNull();
132+
scheduledMessage.Message.ShouldBeOfType<ItemCreated>();
133+
}
134+
135+
[Fact]
136+
public async Task using_db_context_with_outbox_schedule_nodb()
137+
{
138+
await cleanItems();
139+
140+
var command = new CreateItemCommand { Name = "Jerick McKinnon" };
141+
142+
var (tracked, _) = await TrackedHttpCall(x =>
143+
{
144+
x.Post.Json(command).ToUrl("/ef/schedule_nodb");
145+
x.StatusCodeShouldBe(204);
146+
});
147+
148+
var scheduledMessage = tracked.Scheduled.SingleEnvelope<ItemCreated>();
149+
scheduledMessage.ScheduleDelay.ShouldNotBeNull();
150+
scheduledMessage.Message.ShouldBeOfType<ItemCreated>();
151+
}
78152
}

src/Http/WolverineWebApi/EfCoreEndpoints.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using JasperFx.Core;
12
using Wolverine;
23
using Wolverine.Http;
34

@@ -18,4 +19,22 @@ public async Task PublishItem(CreateItemCommand command, ItemsDbContext db, IMes
1819
db.Items.Add(item);
1920
await bus.PublishAsync(new ItemCreated { Id = item.Id });
2021
}
22+
23+
[WolverinePost("/ef/schedule")]
24+
public async Task ScheduleItem(CreateItemCommand command, ItemsDbContext db, IMessageBus bus)
25+
{
26+
await bus.PublishAsync(new ItemCreated { Id = Guid.NewGuid() }, new(){ ScheduleDelay = 5.Days()});
27+
}
28+
29+
[WolverinePost("/ef/schedule2"), EmptyResponse]
30+
public static object ScheduleItem2(CreateItemCommand command)
31+
{
32+
return new ItemCreated { Id = Guid.NewGuid() }.DelayedFor(5.Days());
33+
}
34+
35+
[WolverinePost("/ef/schedule_nodb")]
36+
public async Task ScheduleItem_NoDb(CreateItemCommand command, IMessageBus bus)
37+
{
38+
await bus.PublishAsync(new ItemCreated { Id = Guid.NewGuid() }, new() { ScheduleDelay = 5.Days() });
39+
}
2140
}

src/Testing/CoreTests/Acceptance/remote_invocation.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,6 @@ public async Task DisposeAsync()
7373
await _sender.StopAsync();
7474
}
7575

76-
/*
77-
78-
* Pass CancellationToken through to ReplyListener
79-
*/
80-
8176
[Fact]
8277
public async Task request_reply_with_no_reply()
8378
{

src/Testing/CoreTests/Acceptance/using_ISendMyself_as_cascading_message.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public async Task using_DelayedMessage_as_cascading()
3232
var timeout = session.FindSingleTrackedMessageOfType<DelayedResponse>();
3333
timeout.Id.ShouldBe(id);
3434

35+
var records = session.AllRecordsInOrder().ToArray();
36+
3537
var envelope = session.FindEnvelopesWithMessageType<DelayedResponse>()
3638
.Distinct()
3739
.Single().Envelope;

src/Testing/CoreTests/Configuration/configuring_deliver_within_rules.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ public async Task configure_remote_subscriber()
2525

2626
var message = new Message1();
2727

28-
var session = await host.SendMessageAndWaitAsync(message);
28+
var session = await host.TrackActivity().SendMessageAndWaitAsync(message);
29+
30+
var records = session.AllRecordsInOrder().ToArray();
31+
2932
session.Sent.SingleEnvelope<Message1>()
3033
.DeliverWithin.ShouldBe(3.Seconds());
3134
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
using System.Diagnostics;
2+
using IntegrationTests;
3+
using JasperFx.Core;
4+
using Microsoft.Extensions.Hosting;
5+
using Shouldly;
6+
using Wolverine;
7+
using Wolverine.Postgresql;
8+
using Wolverine.Runtime.RemoteInvocation;
9+
using Wolverine.Tracking;
10+
using Wolverine.Transports.Tcp;
11+
using Wolverine.Util;
12+
using Xunit;
13+
14+
namespace SlowTests;
15+
16+
public class tracked_session_mechanics
17+
{
18+
[Fact]
19+
public async Task failure_acks_show_up_in_tracked_session()
20+
{
21+
var port1 = PortFinder.GetAvailablePort();
22+
var port2 = PortFinder.GetAvailablePort();
23+
24+
using var sender = await Host.CreateDefaultBuilder()
25+
.UseWolverine(opts =>
26+
{
27+
opts.Discovery.DisableConventionalDiscovery();
28+
opts.PublishAllMessages().ToPort(port2);
29+
opts.ListenAtPort(port1);
30+
}).StartAsync();
31+
32+
using var receiver = await Host.CreateDefaultBuilder()
33+
.UseWolverine(opts =>
34+
{
35+
opts.ListenAtPort(port2);
36+
}).StartAsync();
37+
38+
await Should.ThrowAsync<WolverineRequestReplyException>(async () =>
39+
{
40+
var (tracked, response) = await sender.TrackActivity().IncludeExternalTransports().AlsoTrack(receiver)
41+
.InvokeAndWaitAsync<ResponseForRequest>(new RequestResponse(false, "Something"));
42+
});
43+
44+
45+
}
46+
47+
[Fact]
48+
public async Task deal_with_in_memory_scheduled_message()
49+
{
50+
using var host = await Host.CreateDefaultBuilder()
51+
.UseWolverine(opts =>
52+
{
53+
54+
}).StartAsync();
55+
56+
// Should finish cleanly
57+
var tracked = await host.SendMessageAndWaitAsync(new TriggerScheduledMessage("Chiefs"));
58+
59+
var records = tracked.AllRecordsInOrder().ToArray();
60+
61+
tracked.Sent.SingleMessage<ScheduledMessage>()
62+
.Text.ShouldBe("Chiefs");
63+
64+
tracked.Scheduled.SingleMessage<ScheduledMessage>()
65+
.Text.ShouldBe("Chiefs");
66+
}
67+
68+
[Fact]
69+
public async Task deal_with_locally_scheduled_execution()
70+
{
71+
#region sample_dealing_with_locally_scheduled_messages
72+
73+
// In this case we're just executing everything in memory
74+
using var host = await Host.CreateDefaultBuilder()
75+
.UseWolverine(opts =>
76+
{
77+
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "wolverine");
78+
opts.Policies.UseDurableInboxOnAllListeners();
79+
}).StartAsync();
80+
81+
// Should finish cleanly
82+
var tracked = await host.SendMessageAndWaitAsync(new TriggerScheduledMessage("Chiefs"));
83+
84+
// Here's how you can query against the messages that were detected to be scheduled
85+
tracked.Scheduled.SingleMessage<ScheduledMessage>()
86+
.Text.ShouldBe("Chiefs");
87+
88+
// This API will try to immediately play any scheduled messages immediately
89+
var replayed = await tracked.PlayScheduledMessagesAsync(10.Seconds());
90+
replayed.Executed.SingleMessage<ScheduledMessage>().Text.ShouldBe("Chiefs");
91+
92+
#endregion
93+
}
94+
95+
[Fact]
96+
public async Task handle_scheduled_delivery_to_external_transport()
97+
{
98+
#region sample_handling_scheduled_delivery_to_external_transport
99+
100+
var port1 = PortFinder.GetAvailablePort();
101+
var port2 = PortFinder.GetAvailablePort();
102+
103+
using var sender = await Host.CreateDefaultBuilder()
104+
.UseWolverine(opts =>
105+
{
106+
opts.PublishMessage<ScheduledMessage>().ToPort(port2);
107+
opts.ListenAtPort(port1);
108+
}).StartAsync();
109+
110+
using var receiver = await Host.CreateDefaultBuilder()
111+
.UseWolverine(opts =>
112+
{
113+
opts.ListenAtPort(port2);
114+
}).StartAsync();
115+
116+
// Should finish cleanly
117+
var tracked = await sender
118+
.TrackActivity()
119+
.IncludeExternalTransports()
120+
.AlsoTrack(receiver)
121+
.InvokeMessageAndWaitAsync(new TriggerScheduledMessage("Broncos"));
122+
123+
tracked.Scheduled.SingleMessage<ScheduledMessage>()
124+
.Text.ShouldBe("Broncos");
125+
126+
var replayed = await tracked.PlayScheduledMessagesAsync(10.Seconds());
127+
replayed.Executed.SingleMessage<ScheduledMessage>().Text.ShouldBe("Broncos");
128+
129+
#endregion
130+
}
131+
}
132+
133+
public record TriggerScheduledMessage(string Text);
134+
135+
public record ScheduledMessage(string Text);
136+
137+
public record TriggerResponse(bool WillReturn, string Text);
138+
139+
public record RequestResponse(bool WillReturn, string Text);
140+
141+
public record ResponseForRequest(string Text);
142+
143+
public static class RequestResponseHandler
144+
{
145+
public static async Task HandleAsync(TriggerResponse message, IMessageBus bus)
146+
{
147+
var final = await bus.InvokeAsync<ResponseForRequest>(new RequestResponse(message.WillReturn, message.Text));
148+
final.ShouldNotBeNull();
149+
}
150+
151+
public static ResponseForRequest? Handle(RequestResponse msg) => msg.WillReturn ? new(msg.Text) : null;
152+
153+
#region sample_handlers_for_trigger_scheduled_message
154+
155+
public static DeliveryMessage<ScheduledMessage> Handle(TriggerScheduledMessage message)
156+
{
157+
// This causes a message to be scheduled for delivery in 5 minutes from now
158+
return new ScheduledMessage(message.Text).DelayedFor(5.Minutes());
159+
}
160+
161+
public static void Handle(ScheduledMessage message) => Debug.WriteLine("Got scheduled message");
162+
163+
#endregion
164+
}

src/Wolverine/Runtime/MessageContext.cs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public async Task FlushOutgoingMessagesAsync()
4848
return;
4949
}
5050

51-
if (!Outstanding.Any()) return;
52-
5351
await AssertAnyRequiredResponseWasGenerated();
5452

5553
if (!Outstanding.Any())
@@ -67,6 +65,13 @@ public async Task FlushOutgoingMessagesAsync()
6765
{
6866
Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope);
6967
}
68+
69+
// If NullMessageStore, then we're calling a different Send method that is marking the message
70+
if (Runtime.Storage is not NullMessageStore)
71+
{
72+
// See https://github.com/JasperFx/wolverine/issues/1697
73+
Runtime.MessageTracking.Sent(envelope);
74+
}
7075
}
7176
else if (ReferenceEquals(this, Transaction))
7277
{
@@ -99,8 +104,18 @@ public async Task AssertAnyRequiredResponseWasGenerated()
99104
{
100105
if (hasRequestedReply() && _channel is not InvocationCallback && isMissingRequestedReply())
101106
{
102-
await SendFailureAcknowledgementAsync(
103-
$"No response was created for expected response '{Envelope.ReplyRequested}'");
107+
var failureDescription = $"No response was created for expected response '{Envelope.ReplyRequested}'. ";
108+
if (_outstanding.Any())
109+
{
110+
failureDescription += "Actual cascading messages were " +
111+
_outstanding.Select(x => x.MessageType).Join(", ");
112+
}
113+
else
114+
{
115+
failureDescription += "No cascading messages were created by this handler";
116+
}
117+
118+
await SendFailureAcknowledgementAsync( failureDescription);
104119
}
105120
}
106121

src/Wolverine/Runtime/WolverineRuntime.Tracking.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ static WolverineRuntime()
5353
public void Sent(Envelope envelope)
5454
{
5555
_sentCounter.Add(1, envelope.ToMetricsHeaders());
56-
ActiveSession?.Record(MessageEventType.Sent, envelope, _serviceName, _uniqueNodeId);
56+
ActiveSession?.MaybeRecord(MessageEventType.Sent, envelope, _serviceName, _uniqueNodeId);
5757
_sent(Logger, envelope.CorrelationId!, envelope.GetMessageTypeName(), envelope.Id,
5858
envelope.Destination?.ToString() ?? string.Empty,
5959
null);

0 commit comments

Comments
 (0)