Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions extensions/RabbitMQ/RabbitMQPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public sealed class RabbitMQPipeline : IQueue
private readonly IModel _channel;
private readonly AsyncEventingBasicConsumer _consumer;
private string _queueName = string.Empty;
private readonly int _messageTTLMsecs;

/// <summary>
/// Create a new RabbitMQ queue instance
Expand All @@ -40,6 +41,7 @@ public RabbitMQPipeline(RabbitMqConfig config, ILogger<RabbitMQPipeline>? log =
DispatchConsumersAsync = true
};

this._messageTTLMsecs = config.MessageTTLSecs * 1000;
this._connection = factory.CreateConnection();
this._channel = this._connection.CreateModel();
this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Expand Down Expand Up @@ -87,15 +89,20 @@ public Task EnqueueAsync(string message, CancellationToken cancellationToken = d
throw new InvalidOperationException("The client must be connected to a queue first");
}

this._log.LogDebug("Sending message...");
var properties = this._channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = Guid.NewGuid().ToString("N");
properties.Expiration = $"{this._messageTTLMsecs}";

this._log.LogDebug("Sending message: {0} (TTL: {1} secs)...", properties.MessageId, this._messageTTLMsecs / 1000);

this._channel.BasicPublish(
routingKey: this._queueName,
body: Encoding.UTF8.GetBytes(message),
exchange: string.Empty,
basicProperties: null);
basicProperties: properties);

this._log.LogDebug("Message sent");
this._log.LogDebug("Message sent: {0} (TTL: {1} secs)", properties.MessageId, this._messageTTLMsecs / 1000);

return Task.CompletedTask;
}
Expand All @@ -107,7 +114,7 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
{
try
{
this._log.LogDebug("Message '{0}' received, expires at {1}", args.BasicProperties.MessageId, args.BasicProperties.Expiration);
this._log.LogDebug("Message '{0}' received, expires after {1}ms", args.BasicProperties.MessageId, args.BasicProperties.Expiration);

byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
Expand Down
26 changes: 26 additions & 0 deletions extensions/RabbitMQ/RabbitMqConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,35 @@ namespace Microsoft.KernelMemory;

public class RabbitMqConfig
{
/// <summary>
/// RabbitMQ hostname, e.g. "127.0.0.1"
/// </summary>
public string Host { get; set; } = "";

/// <summary>
/// TCP port for the connection, e.g. 5672
/// </summary>
public int Port { get; set; } = 0;

/// <summary>
/// Authentication username
/// </summary>
public string Username { get; set; } = "";

/// <summary>
/// Authentication password
/// </summary>
public string Password { get; set; } = "";

/// <summary>
/// RabbitMQ virtual host name, e.g. "/"
/// See https://www.rabbitmq.com/docs/vhosts
/// </summary>
public string VirtualHost { get; set; } = "/";

/// <summary>
/// How long to retry messages delivery, ie how long to retry, in seconds.
/// Default: 3600 second, 1 hour.
/// </summary>
public int MessageTTLSecs { get; set; } = 3600;
}
3 changes: 2 additions & 1 deletion service/Service/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@
"Port": "5672",
"Username": "user",
"Password": "",
"VirtualHost": "/"
"VirtualHost": "/",
"MessageTTLSecs": 3600
},
"Redis": {
// Redis connection string, e.g. "localhost:6379,password=..."
Expand Down