Skip to content

Commit da1afbf

Browse files
jsquireCopilot
andauthored
[Functions] Legacy Event Hubs Checkpoint Fix (#53342)
* [Functions] Legacy Event Hubs Checkpoint Fix The focus of these changes is to fix an issue where a legacy ownership record without checkpoint data was interpreted as a valid checkpoint and used to initialize a partition when no current generation checkpoint was present. * Update sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreInternalTests.cs Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent 80c1ec3 commit da1afbf

File tree

4 files changed

+47
-13
lines changed

4 files changed

+47
-13
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobCheckpointStoreInternal.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -603,10 +603,6 @@ private async Task<EventProcessorCheckpoint> CreateLegacyCheckpoint(string fully
603603
{
604604
startingPosition ??= EventPosition.FromOffset(offset, false);
605605
}
606-
else if (sequenceNumber.HasValue && sequenceNumber.Value != long.MinValue)
607-
{
608-
startingPosition = EventPosition.FromSequenceNumber(sequenceNumber.Value, false);
609-
}
610606
else
611607
{
612608
// Skip checkpoints without an offset without logging an error.

sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/BlobCheckpointStore/BlobsCheckpointStoreInternalTests.cs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -589,13 +589,13 @@ public async Task GetCheckpointUsesOffsetAsTheStartingPositionWhenPresentInLegac
589589
containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client =>
590590
{
591591
client.Content = Encoding.UTF8.GetBytes("{" +
592-
"\"PartitionId\":\"0\"," +
593-
"\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," +
594-
"\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," +
595-
"\"Epoch\":386," +
596-
"\"Offset\":\"13\"," +
597-
"\"SequenceNumber\":960180" +
598-
"}");
592+
"\"PartitionId\":\"0\"," +
593+
"\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," +
594+
"\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," +
595+
"\"Epoch\":386," +
596+
"\"Offset\":\"13\"," +
597+
"\"SequenceNumber\":960180" +
598+
"}");
599599
});
600600

601601
var target = new BlobCheckpointStoreInternal(containerClient, initializeWithLegacyCheckpoints: true);
@@ -606,6 +606,44 @@ public async Task GetCheckpointUsesOffsetAsTheStartingPositionWhenPresentInLegac
606606
Assert.That(checkpoint.PartitionId, Is.EqualTo("0"));
607607
}
608608

609+
/// <summary>
610+
/// Verifies basic functionality of GetCheckpointAsync and ensures the starting position is set correctly.
611+
/// </summary>
612+
///
613+
[Test]
614+
[TestCase(null)]
615+
[TestCase("")]
616+
public async Task GetCheckpointLegacyCheckpointWithoutOffset(string offsetValue)
617+
{
618+
var blobList = new List<BlobItem>
619+
{
620+
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0",
621+
false,
622+
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
623+
"snapshot")
624+
};
625+
626+
var offsetJsonValue = offsetValue is null ? "null" : $"\"{offsetValue}\"";
627+
var containerClient = new MockBlobContainerClient() { Blobs = blobList };
628+
629+
containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client =>
630+
{
631+
client.Content = Encoding.UTF8.GetBytes("{" +
632+
"\"PartitionId\":\"0\"," +
633+
"\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," +
634+
"\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," +
635+
"\"Epoch\":386," +
636+
$"\"Offset\":{offsetJsonValue}," +
637+
"\"SequenceNumber\":960180" +
638+
"}");
639+
});
640+
641+
var target = new BlobCheckpointStoreInternal(containerClient, initializeWithLegacyCheckpoints: true);
642+
var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None);
643+
644+
Assert.That(checkpoint, Is.Null, "A checkpoint should have not been returned.");
645+
}
646+
609647
/// <summary>
610648
/// Verifies basic functionality of GetCheckpointAsync and ensures the appropriate events are emitted on failure.
611649
/// </summary>

sdk/eventhub/Azure.Messaging.EventHubs.sln

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ Global
6767
EndGlobalSection
6868
GlobalSection(NestedProjects) = preSolution
6969
{327A0C67-D9B6-4831-A53F-BFE059934787} = {4C209B69-98C3-4B12-B777-111EFF0E6F66}
70-
{AF73A238-61D1-4D1C-807F-57C19F193DC6} = {4C209B69-98C3-4B12-B777-111EFF0E6F66}
71-
{5CFD80DB-98F3-4E59-9792-1C421786D3D0} = {4C209B69-98C3-4B12-B777-111EFF0E6F66}
7270
EndGlobalSection
7371
GlobalSection(ExtensibilityGlobals) = postSolution
7472
SolutionGuid = {A9B7844C-B066-4B3D-A0E7-D5008C5EC232}

sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
### Bugs Fixed
1010

11+
- Fixed an issue where a legacy ownership record without checkpoint data was interpreted as a valid checkpoint and used to initialize a partition when no current generation checkpoint was present.
12+
1113
### Other Changes
1214

1315
## 6.5.2 (2025-06-16)

0 commit comments

Comments
 (0)