Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Seq.Input.RabbitMQ [![CI](https://github.com/datalust/seq-input-rabbitmq/actions/workflows/ci.yml/badge.svg)](https://github.com/datalust/seq-input-rabbitmq/actions/workflows/ci.yml)

A Seq custom input that pulls events from RabbitMQ. **Requires Seq 5.1+.**
A Seq custom input that pulls events from RabbitMQ. **Requires Seq 2025.2+.**

### Getting started

Expand Down
6 changes: 3 additions & 3 deletions example/Demo/Demo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Serilog" Version="2.8.0" />
<PackageReference Include="Serilog.Sinks.RabbitMQ" Version="2.0.2" />
<PackageReference Include="Serilog.Formatting.Compact" Version="1.0.0" />
<PackageReference Include="Serilog" Version="4.3.0" />
<PackageReference Include="Serilog.Sinks.RabbitMQ" Version="8.0.0" />
<PackageReference Include="Serilog.Formatting.Compact" Version="3.0.0" />
</ItemGroup>

</Project>
44 changes: 16 additions & 28 deletions example/Demo/Program.cs
Original file line number Diff line number Diff line change
@@ -1,34 +1,22 @@
using System;
using System.Threading;
using System.Threading;
using Serilog;
using Serilog.Formatting.Compact;
using Serilog.Sinks.RabbitMQ.Sinks.RabbitMQ;

namespace Demo
{
public class Program
Log.Logger = new LoggerConfiguration()
.Enrich.WithProperty("Application", "Demo")
.WriteTo.RabbitMQ((client, sink) =>
{
public static void Main()
{
var rmq = new RabbitMQConfiguration
{
Hostname = "localhost",
Username = "guest",
Password = "guest",
Exchange = "",
RouteKey = "logs"
};

Log.Logger = new LoggerConfiguration()
.Enrich.WithProperty("Application", "Demo")
.WriteTo.RabbitMQ(rmq, new CompactJsonFormatter())
.CreateLogger();
client.Hostnames.Add("localhost");
client.Username = "guest";
client.Password = "guest";
client.Exchange = "";
client.RoutingKey = "logs";
sink.TextFormatter = new CompactJsonFormatter();
})
.CreateLogger();

while (true)
{
Log.Information("Yo, RabbitMQ!");
Thread.Sleep(1000);
}
}
}
while (true)
{
Log.Information("Yo, RabbitMQ!");
Thread.Sleep(1000);
}
2 changes: 2 additions & 0 deletions seq-input-rabbitmq.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=MQV/@EntryIndexedValue">MQV</s:String></wpf:ResourceDictionary>
243 changes: 125 additions & 118 deletions src/Seq.Input.RabbitMQ/RabbitMQInput.cs
Original file line number Diff line number Diff line change
@@ -1,134 +1,141 @@
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Seq.Apps;
// ReSharper disable MemberCanBePrivate.Global, UnusedType.Global, UnusedAutoPropertyAccessor.Global

namespace Seq.Input.RabbitMQ
namespace Seq.Input.RabbitMQ;

[SeqApp("RabbitMQ Input",
Description = "Pulls JSON-formatted events from a RabbitMQ queue. For details of the " +
"supported JSON schema, see " +
"https://github.com/serilog/serilog-formatting-compact/#format-details.")]
public sealed class RabbitMQInput : SeqApp, IPublishJson, IDisposable
{
[SeqApp("RabbitMQ Input",
Description = "Pulls JSON-formatted events from a RabbitMQ queue. For details of the " +
"supported JSON schema, see " +
"https://github.com/serilog/serilog-formatting-compact/#format-details.")]
public class RabbitMQInput : SeqApp, IPublishJson, IDisposable
RabbitMQListener _listener;

[SeqAppSetting(
DisplayName = "RabbitMQ host",
IsOptional = true,
HelpText = "The hostname on which RabbitMQ is running. The default is `localhost`.")]
public string RabbitMQHost { get; set; } = "localhost";

[SeqAppSetting(
DisplayName = "RabbitMQ Virtual Host",
IsOptional = true,
HelpText = "The virtual host in RabbitMQ. The default is `/`.")]
public string RabbitMQVHost { get; set; } = "/";

[SeqAppSetting(
DisplayName = "RabbitMQ port",
IsOptional = true,
HelpText = "The port on which the RabbitMQ server is listening. The default is `5672`.")]
public int RabbitMQPort { get; set; } = 5672;

[SeqAppSetting(
DisplayName = "RabbitMQ user",
IsOptional = true,
HelpText = "The username provided when connecting to RabbitMQ. The default is `guest`.")]
public string RabbitMQUser { get; set; } = "guest";

[SeqAppSetting(
DisplayName = "RabbitMQ password",
IsOptional = true,
InputType = SettingInputType.Password,
HelpText = "The password provided when connecting to RabbitMQ. The default is `guest`.")]
public string RabbitMQPassword { get; set; } = "guest";

[SeqAppSetting(
DisplayName = "RabbitMQ queue",
IsOptional = true,
HelpText = "The RabbitMQ queue name to receive events from. The default is `Logs`.")]
public string RabbitMQQueue { get; set; } = "logs";

[SeqAppSetting(
DisplayName = "Require SSL",
IsOptional = true,
HelpText = "Whether or not the connection is with SSL. The default is false.")]
public bool IsSsl { get; set; }

[SeqAppSetting(
DisplayName = "Durable",
IsOptional = true,
HelpText = "Whether or not the queue is durable. The default is false.")]
public bool IsQueueDurable { get; set; }

[SeqAppSetting(
DisplayName = "Exclusive",
IsOptional = true,
HelpText = "Whether or not the queue is exclusive. The default is false.")]
public bool IsQueueExclusive { get; set; }

[SeqAppSetting(
DisplayName = "Auto-delete",
IsOptional = true,
HelpText = "Whether or not the queue subscription is durable. The default is false.")]
public bool IsQueueAutoDelete { get; set; }

[SeqAppSetting(
DisplayName = "Auto-ACK",
IsOptional = true,
HelpText = "Whether or not messages should be auto-acknowledged. The default is true.")]
public bool IsReceiveAutoAck { get; set; } = true;

[SeqAppSetting(
DisplayName = "Dead Letter Exchange",
IsOptional = true,
HelpText = "The name of the dead letter exchange associated with this queue. If specified, the exchange will be used when declaring the queue, otherwise no dead lettering will be configured.")]
public string Dlx { get; set; }

public void Start(TextWriter inputWriter)
{
RabbitMQListener _listener;

[SeqAppSetting(
DisplayName = "RabbitMQ host",
IsOptional = true,
HelpText = "The hostname on which RabbitMQ is running. The default is `localhost`.")]
public string RabbitMQHost { get; set; } = "localhost";

[SeqAppSetting(
DisplayName = "RabbitMQ Virtual Host",
IsOptional = true,
HelpText = "The virtual host in RabbitMQ. The default is `/`.")]
public string RabbitMQVHost { get; set; } = "/";

[SeqAppSetting(
DisplayName = "RabbitMQ port",
IsOptional = true,
HelpText = "The port on which the RabbitMQ server is listening. The default is `5672`.")]
public int RabbitMQPort { get; set; } = 5672;

[SeqAppSetting(
DisplayName = "RabbitMQ user",
IsOptional = true,
HelpText = "The username provided when connecting to RabbitMQ. The default is `guest`.")]
public string RabbitMQUser { get; set; } = "guest";

[SeqAppSetting(
DisplayName = "RabbitMQ password",
IsOptional = true,
InputType = SettingInputType.Password,
HelpText = "The password provided when connecting to RabbitMQ. The default is `guest`.")]
public string RabbitMQPassword { get; set; } = "guest";

[SeqAppSetting(
DisplayName = "RabbitMQ queue",
IsOptional = true,
HelpText = "The RabbitMQ queue name to receive events from. The default is `Logs`.")]
public string RabbitMQQueue { get; set; } = "logs";

[SeqAppSetting(
DisplayName = "Require SSL",
IsOptional = true,
HelpText = "Whether or not the connection is with SSL. The default is false.")]
public bool IsSsl { get; set; }

[SeqAppSetting(
DisplayName = "Durable",
IsOptional = true,
HelpText = "Whether or not the queue is durable. The default is false.")]
public bool IsQueueDurable { get; set; }

[SeqAppSetting(
DisplayName = "Exclusive",
IsOptional = true,
HelpText = "Whether or not the queue is exclusive. The default is false.")]
public bool IsQueueExclusive { get; set; }

[SeqAppSetting(
DisplayName = "Auto-delete",
IsOptional = true,
HelpText = "Whether or not the queue subscription is durable. The default is false.")]
public bool IsQueueAutoDelete { get; set; }

[SeqAppSetting(
DisplayName = "Auto-ACK",
IsOptional = true,
HelpText = "Whether or not messages should be auto-acknowledged. The default is true.")]
public bool IsReceiveAutoAck { get; set; } = true;

[SeqAppSetting(
DisplayName = "Dead Letter Exchange",
IsOptional = true,
HelpText = "The name of the dead letter exchange associated with this queue. If specified, the exchange will be used when declaring the queue, otherwise no dead lettering will be configured.")]
public string Dlx { get; set; }

public void Start(TextWriter inputWriter)
var sync = new object();
Task ReceiveAsync(ReadOnlyMemory<byte> body)
{
var sync = new object();
void Receive(ReadOnlyMemory<byte> body)
try
{
try
{
lock (sync)
{
var clef = Encoding.UTF8.GetString(body.ToArray());
inputWriter.WriteLine(clef);
}
}
catch (Exception ex)
lock (sync)
{
Log.Error(ex, "A received message could not be decoded");
var clef = Encoding.UTF8.GetString(body.ToArray());
inputWriter.WriteLine(clef);
}
}

_listener = new RabbitMQListener(
Receive,
RabbitMQHost,
RabbitMQVHost,
RabbitMQPort,
RabbitMQUser,
RabbitMQPassword,
RabbitMQQueue,
IsSsl,
IsQueueDurable,
IsQueueAutoDelete,
IsQueueExclusive,
IsReceiveAutoAck,
Dlx);
catch (Exception ex)
{
Log.Error(ex, "A received message could not be decoded");
}

return Task.CompletedTask;
}

public void Stop()
{
_listener.Close();
}
// Not a deadlock risk on .NET 8, but ideally we'll introduce `IPublishJsonAsync` and provide
// async start/stop/dispose variants.
_listener = RabbitMQListener.CreateAsync(
ReceiveAsync,
RabbitMQHost,
RabbitMQVHost,
RabbitMQPort,
RabbitMQUser,
RabbitMQPassword,
RabbitMQQueue,
IsSsl,
IsQueueDurable,
IsQueueAutoDelete,
IsQueueExclusive,
IsReceiveAutoAck,
Dlx).Result;
}

public void Dispose()
{
_listener?.Dispose();
}
public void Stop()
{
// Not a deadlock risk on .NET 8, but ideally we'll introduce `IPublishJsonAsync` and provide
// async start/stop/dispose variants.
_listener.CloseAsync().Wait();
}

public void Dispose()
{
_listener?.Dispose();
}
}
}
Loading