diff --git a/README.md b/README.md index dab0623..37c5bff 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Seq.Input.RabbitMQ [![CI](https://github.com/datalust/seq-input-rabbitmq/actions/workflows/ci.yml/badge.svg)](https://github.com/datalust/seq-input-rabbitmq/actions/workflows/ci.yml) -A Seq custom input that pulls events from RabbitMQ. **Requires Seq 5.1+.** +A Seq custom input that pulls events from RabbitMQ. **Requires Seq 2025.2+.** ### Getting started diff --git a/example/Demo/Demo.csproj b/example/Demo/Demo.csproj index 6f5ce49..90b3739 100644 --- a/example/Demo/Demo.csproj +++ b/example/Demo/Demo.csproj @@ -6,9 +6,9 @@ - - - + + + diff --git a/example/Demo/Program.cs b/example/Demo/Program.cs index 280d691..e2d9bb0 100644 --- a/example/Demo/Program.cs +++ b/example/Demo/Program.cs @@ -1,34 +1,22 @@ -using System; -using System.Threading; +using System.Threading; using Serilog; using Serilog.Formatting.Compact; -using Serilog.Sinks.RabbitMQ.Sinks.RabbitMQ; -namespace Demo -{ - public class Program +Log.Logger = new LoggerConfiguration() + .Enrich.WithProperty("Application", "Demo") + .WriteTo.RabbitMQ((client, sink) => { - public static void Main() - { - var rmq = new RabbitMQConfiguration - { - Hostname = "localhost", - Username = "guest", - Password = "guest", - Exchange = "", - RouteKey = "logs" - }; - - Log.Logger = new LoggerConfiguration() - .Enrich.WithProperty("Application", "Demo") - .WriteTo.RabbitMQ(rmq, new CompactJsonFormatter()) - .CreateLogger(); + client.Hostnames.Add("localhost"); + client.Username = "guest"; + client.Password = "guest"; + client.Exchange = ""; + client.RoutingKey = "logs"; + sink.TextFormatter = new CompactJsonFormatter(); + }) + .CreateLogger(); - while (true) - { - Log.Information("Yo, RabbitMQ!"); - Thread.Sleep(1000); - } - } - } +while (true) +{ + Log.Information("Yo, RabbitMQ!"); + Thread.Sleep(1000); } diff --git a/seq-input-rabbitmq.sln.DotSettings b/seq-input-rabbitmq.sln.DotSettings new file mode 100644 index 0000000..3809fdb --- /dev/null +++ b/seq-input-rabbitmq.sln.DotSettings @@ -0,0 +1,2 @@ + + MQV \ No newline at end of file diff --git a/src/Seq.Input.RabbitMQ/RabbitMQInput.cs b/src/Seq.Input.RabbitMQ/RabbitMQInput.cs index cd41e8f..14079f2 100644 --- a/src/Seq.Input.RabbitMQ/RabbitMQInput.cs +++ b/src/Seq.Input.RabbitMQ/RabbitMQInput.cs @@ -1,134 +1,141 @@ using System; using System.IO; using System.Text; +using System.Threading.Tasks; using Seq.Apps; +// ReSharper disable MemberCanBePrivate.Global, UnusedType.Global, UnusedAutoPropertyAccessor.Global -namespace Seq.Input.RabbitMQ +namespace Seq.Input.RabbitMQ; + +[SeqApp("RabbitMQ Input", + Description = "Pulls JSON-formatted events from a RabbitMQ queue. For details of the " + + "supported JSON schema, see " + + "https://github.com/serilog/serilog-formatting-compact/#format-details.")] +public sealed class RabbitMQInput : SeqApp, IPublishJson, IDisposable { - [SeqApp("RabbitMQ Input", - Description = "Pulls JSON-formatted events from a RabbitMQ queue. For details of the " + - "supported JSON schema, see " + - "https://github.com/serilog/serilog-formatting-compact/#format-details.")] - public class RabbitMQInput : SeqApp, IPublishJson, IDisposable + RabbitMQListener _listener; + + [SeqAppSetting( + DisplayName = "RabbitMQ host", + IsOptional = true, + HelpText = "The hostname on which RabbitMQ is running. The default is `localhost`.")] + public string RabbitMQHost { get; set; } = "localhost"; + + [SeqAppSetting( + DisplayName = "RabbitMQ Virtual Host", + IsOptional = true, + HelpText = "The virtual host in RabbitMQ. The default is `/`.")] + public string RabbitMQVHost { get; set; } = "/"; + + [SeqAppSetting( + DisplayName = "RabbitMQ port", + IsOptional = true, + HelpText = "The port on which the RabbitMQ server is listening. The default is `5672`.")] + public int RabbitMQPort { get; set; } = 5672; + + [SeqAppSetting( + DisplayName = "RabbitMQ user", + IsOptional = true, + HelpText = "The username provided when connecting to RabbitMQ. The default is `guest`.")] + public string RabbitMQUser { get; set; } = "guest"; + + [SeqAppSetting( + DisplayName = "RabbitMQ password", + IsOptional = true, + InputType = SettingInputType.Password, + HelpText = "The password provided when connecting to RabbitMQ. The default is `guest`.")] + public string RabbitMQPassword { get; set; } = "guest"; + + [SeqAppSetting( + DisplayName = "RabbitMQ queue", + IsOptional = true, + HelpText = "The RabbitMQ queue name to receive events from. The default is `Logs`.")] + public string RabbitMQQueue { get; set; } = "logs"; + + [SeqAppSetting( + DisplayName = "Require SSL", + IsOptional = true, + HelpText = "Whether or not the connection is with SSL. The default is false.")] + public bool IsSsl { get; set; } + + [SeqAppSetting( + DisplayName = "Durable", + IsOptional = true, + HelpText = "Whether or not the queue is durable. The default is false.")] + public bool IsQueueDurable { get; set; } + + [SeqAppSetting( + DisplayName = "Exclusive", + IsOptional = true, + HelpText = "Whether or not the queue is exclusive. The default is false.")] + public bool IsQueueExclusive { get; set; } + + [SeqAppSetting( + DisplayName = "Auto-delete", + IsOptional = true, + HelpText = "Whether or not the queue subscription is durable. The default is false.")] + public bool IsQueueAutoDelete { get; set; } + + [SeqAppSetting( + DisplayName = "Auto-ACK", + IsOptional = true, + HelpText = "Whether or not messages should be auto-acknowledged. The default is true.")] + public bool IsReceiveAutoAck { get; set; } = true; + + [SeqAppSetting( + DisplayName = "Dead Letter Exchange", + IsOptional = true, + HelpText = "The name of the dead letter exchange associated with this queue. If specified, the exchange will be used when declaring the queue, otherwise no dead lettering will be configured.")] + public string Dlx { get; set; } + + public void Start(TextWriter inputWriter) { - RabbitMQListener _listener; - - [SeqAppSetting( - DisplayName = "RabbitMQ host", - IsOptional = true, - HelpText = "The hostname on which RabbitMQ is running. The default is `localhost`.")] - public string RabbitMQHost { get; set; } = "localhost"; - - [SeqAppSetting( - DisplayName = "RabbitMQ Virtual Host", - IsOptional = true, - HelpText = "The virtual host in RabbitMQ. The default is `/`.")] - public string RabbitMQVHost { get; set; } = "/"; - - [SeqAppSetting( - DisplayName = "RabbitMQ port", - IsOptional = true, - HelpText = "The port on which the RabbitMQ server is listening. The default is `5672`.")] - public int RabbitMQPort { get; set; } = 5672; - - [SeqAppSetting( - DisplayName = "RabbitMQ user", - IsOptional = true, - HelpText = "The username provided when connecting to RabbitMQ. The default is `guest`.")] - public string RabbitMQUser { get; set; } = "guest"; - - [SeqAppSetting( - DisplayName = "RabbitMQ password", - IsOptional = true, - InputType = SettingInputType.Password, - HelpText = "The password provided when connecting to RabbitMQ. The default is `guest`.")] - public string RabbitMQPassword { get; set; } = "guest"; - - [SeqAppSetting( - DisplayName = "RabbitMQ queue", - IsOptional = true, - HelpText = "The RabbitMQ queue name to receive events from. The default is `Logs`.")] - public string RabbitMQQueue { get; set; } = "logs"; - - [SeqAppSetting( - DisplayName = "Require SSL", - IsOptional = true, - HelpText = "Whether or not the connection is with SSL. The default is false.")] - public bool IsSsl { get; set; } - - [SeqAppSetting( - DisplayName = "Durable", - IsOptional = true, - HelpText = "Whether or not the queue is durable. The default is false.")] - public bool IsQueueDurable { get; set; } - - [SeqAppSetting( - DisplayName = "Exclusive", - IsOptional = true, - HelpText = "Whether or not the queue is exclusive. The default is false.")] - public bool IsQueueExclusive { get; set; } - - [SeqAppSetting( - DisplayName = "Auto-delete", - IsOptional = true, - HelpText = "Whether or not the queue subscription is durable. The default is false.")] - public bool IsQueueAutoDelete { get; set; } - - [SeqAppSetting( - DisplayName = "Auto-ACK", - IsOptional = true, - HelpText = "Whether or not messages should be auto-acknowledged. The default is true.")] - public bool IsReceiveAutoAck { get; set; } = true; - - [SeqAppSetting( - DisplayName = "Dead Letter Exchange", - IsOptional = true, - HelpText = "The name of the dead letter exchange associated with this queue. If specified, the exchange will be used when declaring the queue, otherwise no dead lettering will be configured.")] - public string Dlx { get; set; } - - public void Start(TextWriter inputWriter) + var sync = new object(); + Task ReceiveAsync(ReadOnlyMemory body) { - var sync = new object(); - void Receive(ReadOnlyMemory body) + try { - try - { - lock (sync) - { - var clef = Encoding.UTF8.GetString(body.ToArray()); - inputWriter.WriteLine(clef); - } - } - catch (Exception ex) + lock (sync) { - Log.Error(ex, "A received message could not be decoded"); + var clef = Encoding.UTF8.GetString(body.ToArray()); + inputWriter.WriteLine(clef); } } - - _listener = new RabbitMQListener( - Receive, - RabbitMQHost, - RabbitMQVHost, - RabbitMQPort, - RabbitMQUser, - RabbitMQPassword, - RabbitMQQueue, - IsSsl, - IsQueueDurable, - IsQueueAutoDelete, - IsQueueExclusive, - IsReceiveAutoAck, - Dlx); + catch (Exception ex) + { + Log.Error(ex, "A received message could not be decoded"); + } + + return Task.CompletedTask; } - public void Stop() - { - _listener.Close(); - } + // Not a deadlock risk on .NET 8, but ideally we'll introduce `IPublishJsonAsync` and provide + // async start/stop/dispose variants. + _listener = RabbitMQListener.CreateAsync( + ReceiveAsync, + RabbitMQHost, + RabbitMQVHost, + RabbitMQPort, + RabbitMQUser, + RabbitMQPassword, + RabbitMQQueue, + IsSsl, + IsQueueDurable, + IsQueueAutoDelete, + IsQueueExclusive, + IsReceiveAutoAck, + Dlx).Result; + } - public void Dispose() - { - _listener?.Dispose(); - } + public void Stop() + { + // Not a deadlock risk on .NET 8, but ideally we'll introduce `IPublishJsonAsync` and provide + // async start/stop/dispose variants. + _listener.CloseAsync().Wait(); + } + + public void Dispose() + { + _listener?.Dispose(); } -} +} \ No newline at end of file diff --git a/src/Seq.Input.RabbitMQ/RabbitMQListener.cs b/src/Seq.Input.RabbitMQ/RabbitMQListener.cs index 1006e66..3a9274d 100644 --- a/src/Seq.Input.RabbitMQ/RabbitMQListener.cs +++ b/src/Seq.Input.RabbitMQ/RabbitMQListener.cs @@ -1,71 +1,71 @@ using System; using System.Collections.Generic; -using System.Net.Security; +using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; -namespace Seq.Input.RabbitMQ +namespace Seq.Input.RabbitMQ; + +class RabbitMQListener(IConnection connection, IChannel channel) : IDisposable { - class RabbitMQListener : IDisposable + public static async Task CreateAsync( + Func, Task> receiveAsync, + string rabbitMQHost, + string rabbitMQVHost, + int rabbitMQPort, + string rabbitMQUser, + string rabbitMQPassword, + string rabbitMQQueue, + bool isSsl, + bool isQueueDurable, + bool isQueueAutoDelete, + bool isQueueExclusive, + bool isReceiveAutoAck, + string dlx) { - readonly IConnection _connection; - readonly IModel _channel; - - public RabbitMQListener(Action> receive, - string rabbitMQHost, - string rabbitMQVHost, - int rabbitMQPort, - string rabbitMQUser, - string rabbitMQPassword, - string rabbitMQQueue, - bool isSsl, - bool isQueueDurable, - bool isQueueAutoDelete, - bool isQueueExclusive, - bool isReceiveAutoAck, - string dlx) + var factory = new ConnectionFactory { - var factory = new ConnectionFactory + HostName = rabbitMQHost, + VirtualHost = rabbitMQVHost, + Port = rabbitMQPort, + UserName = rabbitMQUser, + Password = rabbitMQPassword, + Ssl = { - HostName = rabbitMQHost, - VirtualHost = rabbitMQVHost, - Port = rabbitMQPort, - UserName = rabbitMQUser, - Password = rabbitMQPassword, - Ssl = - { - Enabled = isSsl - } - }; + Enabled = isSsl + } + }; - _connection = factory.CreateConnection(); - _channel = _connection.CreateModel(); + var connection = await factory.CreateConnectionAsync(); + var channel = await connection.CreateChannelAsync(); - var arguments = string.IsNullOrWhiteSpace(dlx) - ? null - : new Dictionary { {"x-dead-letter-exchange", dlx} }; + var arguments = string.IsNullOrWhiteSpace(dlx) + ? null + : new Dictionary { {"x-dead-letter-exchange", dlx} }; - _channel.QueueDeclare( - rabbitMQQueue, - durable: isQueueDurable, - exclusive: isQueueExclusive, - autoDelete: isQueueAutoDelete, - arguments: arguments); + await channel.QueueDeclareAsync( + rabbitMQQueue, + durable: isQueueDurable, + exclusive: isQueueExclusive, + autoDelete: isQueueAutoDelete, + arguments: arguments); - var consumer = new EventingBasicConsumer(_channel); - consumer.Received += (model, ea) => receive(ea.Body); - _channel.BasicConsume(rabbitMQQueue, autoAck: isReceiveAutoAck, consumer: consumer); - } + var consumer = new AsyncEventingBasicConsumer(channel); + consumer.ReceivedAsync += async (_, ea) => await receiveAsync(ea.Body); + await channel.BasicConsumeAsync(rabbitMQQueue, autoAck: isReceiveAutoAck, consumer: consumer); - public void Close() - { - _channel.Close(); - } + return new RabbitMQListener(connection, channel); + } - public void Dispose() - { - _channel?.Dispose(); - _connection?.Close(); - } + public async Task CloseAsync() + { + await channel.CloseAsync(); + await connection.CloseAsync(); + } + + public void Dispose() + { + channel?.Dispose(); + connection?.Dispose(); } -} +} \ No newline at end of file diff --git a/src/Seq.Input.RabbitMQ/Seq.Input.RabbitMQ.csproj b/src/Seq.Input.RabbitMQ/Seq.Input.RabbitMQ.csproj index 136eb18..62cdd64 100644 --- a/src/Seq.Input.RabbitMQ/Seq.Input.RabbitMQ.csproj +++ b/src/Seq.Input.RabbitMQ/Seq.Input.RabbitMQ.csproj @@ -1,6 +1,7 @@ - netstandard2.0 + net8.0 + latest Ingest events into Seq directly from RabbitMQ Datalust and Contributors seq-app @@ -13,8 +14,8 @@ - - + +