diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs index 8a1967940..9d93de863 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs @@ -265,6 +265,8 @@ await DeleteAutoDeleteExchangeAsync(binding.Source, } + internal int RecordedBindingsCount => _recordedBindings.Count; + internal async ValueTask RecordBindingAsync(RecordedBinding binding, bool recordedEntitiesSemaphoreHeld) { @@ -419,6 +421,15 @@ private void DeleteAutoDeleteQueue(string queue) if (!AnyConsumersOnQueue(queue)) { _recordedQueues.Remove(queue); + // remove bindings targeting this queue; also cascade to auto-delete exchanges + foreach (RecordedBinding binding in _recordedBindings.ToArray()) + { + if (binding.Destination == queue) + { + DoDeleteRecordedBinding(binding); + DoDeleteAutoDeleteExchange(binding.Source); + } + } } } } diff --git a/projects/Test/Integration/ConnectionRecovery/TestRecoveryWithDeletedEntities.cs b/projects/Test/Integration/ConnectionRecovery/TestRecoveryWithDeletedEntities.cs index 7d8737828..6fca5b6ac 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestRecoveryWithDeletedEntities.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestRecoveryWithDeletedEntities.cs @@ -31,7 +31,9 @@ using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; +using RabbitMQ.Client.Framing; using Xunit; using Xunit.Abstractions; @@ -150,5 +152,72 @@ public async Task TestThatDeletedQueuesDontReappearOnRecovery() AssertShutdownError(e.ShutdownReason, 404); } } + + [Fact] + public async Task TestAutoDeleteQueueBindingsRemovedWhenConsumerCancelled() + { + // See rabbitmq/rabbitmq-dotnet-client#1905. + // + // When the last consumer on an auto-delete queue is cancelled, the queue + // and its bindings must be removed from recorded topology so that recovery + // does not try to restore them. + string exchangeName = GenerateExchangeName(); + await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); + + var queueDeclareOk = await _channel.QueueDeclareAsync("", false, false, autoDelete: true); + string queueName = queueDeclareOk.QueueName; + await _channel.QueueBindAsync(queueName, exchangeName, routingKey: "key"); + + var autorecoveringConn = (AutorecoveringConnection)_conn; + Assert.Equal(1, autorecoveringConn.RecordedExchangesCount); + Assert.Equal(1, autorecoveringConn.RecordedQueuesCount); + Assert.Equal(1, autorecoveringConn.RecordedBindingsCount); + + var consumer = new AsyncEventingBasicConsumer(_channel); + string consumerTag = await _channel.BasicConsumeAsync(queueName, true, consumer); + await _channel.BasicCancelAsync(consumerTag); + + Assert.Equal(0, autorecoveringConn.RecordedExchangesCount); + Assert.Equal(0, autorecoveringConn.RecordedQueuesCount); + Assert.Equal(0, autorecoveringConn.RecordedBindingsCount); + + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + } + + [Fact] + public async Task TestAutoDeleteQueueBindingsRemovedWhenChannelClosed() + { + // See rabbitmq/rabbitmq-dotnet-client#1905. + // + // Same as above but uses channel closure as the trigger. + string exchangeName = GenerateExchangeName(); + var autorecoveringConn = (AutorecoveringConnection)_conn; + + IChannel ch = await _conn.CreateChannelAsync(_createChannelOptions); + await using (ch.ConfigureAwait(false)) + { + await ch.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); + + var queueDeclareOk = await ch.QueueDeclareAsync("", false, false, autoDelete: true); + string queueName = queueDeclareOk.QueueName; + await ch.QueueBindAsync(queueName, exchangeName, routingKey: "key"); + + Assert.Equal(1, autorecoveringConn.RecordedExchangesCount); + Assert.Equal(1, autorecoveringConn.RecordedQueuesCount); + Assert.Equal(1, autorecoveringConn.RecordedBindingsCount); + + var consumer = new AsyncEventingBasicConsumer(ch); + await ch.BasicConsumeAsync(queueName, true, consumer); + await ch.CloseAsync(); + } + + Assert.Equal(0, autorecoveringConn.RecordedExchangesCount); + Assert.Equal(0, autorecoveringConn.RecordedQueuesCount); + Assert.Equal(0, autorecoveringConn.RecordedBindingsCount); + + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + } } }