Skip to content

Commit 5b8c60b

Browse files
danielwinklerjeremydmiller
authored andcommitted
fix: Handle nullability and deserialization of RavenDB Message Uris
1 parent 303da2a commit 5b8c60b

File tree

3 files changed

+11
-10
lines changed

3 files changed

+11
-10
lines changed

src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,21 @@ private async Task tryRecoverIncomingMessages()
1515
using var session = _store.OpenAsyncSession();
1616
var listeners = await session.Query<IncomingMessage>()
1717
.Where(x => x.OwnerId == 0)
18-
.Select(x => x.ReceivedAt)
18+
.Select(x => new { x.ReceivedAt })
1919
.Distinct()
2020
.ToListAsync();
2121

22-
foreach (var listener in listeners)
22+
foreach (var listener in listeners.Where(x => x.ReceivedAt != null))
2323
{
24-
var circuit = _runtime.Endpoints.FindListenerCircuit(listener);
24+
var receivedAt = listener.ReceivedAt!;
25+
var circuit = _runtime.Endpoints.FindListenerCircuit(receivedAt);
2526
if (circuit.Status != ListeningStatus.Accepting)
2627
{
2728
continue;
2829
}
2930

3031
// Harden around this!
31-
await recoverMessagesForListener(listener, circuit);
32+
await recoverMessagesForListener(receivedAt, circuit);
3233
}
3334
}
3435
catch (Exception e)
@@ -55,5 +56,5 @@ private async Task recoverMessagesForListener(Uri listener, IListenerCircuit cir
5556
_logger.LogError(e, "Error trying to recover messages from the inbox for listener {Uri}", listener);
5657
}
5758
}
58-
59+
5960
}

src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Outgoing.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ private async Task tryRecoverOutgoingMessagesAsync()
1313

1414
var senders = (await session.Query<OutgoingMessage>().Customize(x => x.WaitForNonStaleResults())
1515
.Where(x => x.OwnerId == 0).ToListAsync())
16-
.Select(x => x.Destination)
16+
.Select(x => new { x.Destination })
1717
.Distinct()
1818
.ToList();
1919

20-
foreach (var sender in senders)
20+
foreach (var sender in senders.Where(x => x.Destination != null))
2121
{
22-
await tryRecoverOutgoingMessagesToSenderAsync(sender);
22+
await tryRecoverOutgoingMessagesToSenderAsync(sender.Destination!);
2323
}
2424
}
2525
catch (Exception e)
@@ -34,7 +34,7 @@ private async Task tryRecoverOutgoingMessagesToSenderAsync(Uri sender)
3434
{
3535
var sendingAgent = _runtime.Endpoints.GetOrBuildSendingAgent(sender);
3636
if (sendingAgent.Latched) return;
37-
37+
3838
var outgoing = await _parent.Outbox.LoadOutgoingAsync(sendingAgent.Destination);
3939
var expiredMessages = outgoing.Where(x => x.IsExpired()).ToArray();
4040
var good = outgoing.Where(x => !x.IsExpired()).ToArray();

src/Persistence/Wolverine.RavenDb/Internals/OutgoingMessage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public OutgoingMessage(Envelope envelope)
2222

2323
public string Id { get; set; }
2424
public int OwnerId { get; set; }
25-
public Uri Destination { get; set; }
25+
public Uri? Destination { get; set; }
2626
public DateTimeOffset? DeliverBy { get; set; }
2727
public byte[] Body { get; set; } = [];
2828
public int Attempts { get; set; }

0 commit comments

Comments
 (0)