Skip to content
This repository was archived by the owner on Jun 11, 2026. It is now read-only.
4 changes: 2 additions & 2 deletions src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ protected override void OnSetReceiveHandler(IPartitionReceiveHandler newReceiveH
{
lock (this.receivePumpLock)
{
if (this.receiveHandler != null)
if (newReceiveHandler != null && this.receiveHandler != null)
{
// Notify existing handler first (but don't wait).
this.receiveHandler.ProcessErrorAsync(new OperationCanceledException("New handler has registered for this receiver.")); // .Fork();
this.receiveHandler.ProcessErrorAsync(new OperationCanceledException("New handler has registered for this receiver."));
}

this.receiveHandler = newReceiveHandler;
Expand Down
12 changes: 11 additions & 1 deletion test/Microsoft.Azure.EventHubs.UnitTests/EventHubClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,13 @@ async Task PartitionReceiverSetReceiveHandler()

EventWaitHandle dataReceivedEvent = new EventWaitHandle(false, EventResetMode.ManualReset);
var handler = new TestPartitionReceiveHandler();
handler.ErrorReceived += (s, e) => Log($"TestPartitionReceiveHandler.ProcessError {e.GetType().Name}: {e.Message}");

// Not expecting any errors.
handler.ErrorReceived += (s, e) =>
{
throw new Exception($"TestPartitionReceiveHandler.ProcessError {e.GetType().Name}: {e.Message}");
};

handler.EventsReceived += (s, eventDatas) =>
{
int count = eventDatas != null ? eventDatas.Count() : 0;
Expand Down Expand Up @@ -564,6 +570,10 @@ async Task PartitionReceiverSetReceiveHandler()
}
finally
{
// Unregister handler.
partitionReceiver.SetReceiveHandler(null);

// Close clients.
await partitionSender.CloseAsync();
await partitionReceiver.CloseAsync();
}
Expand Down