Skip to content

Conversation

@megakid
Copy link
Contributor

@megakid megakid commented Sep 11, 2025

Redis Streams Transport for Wolverine

This PR introduces a first-class Redis Streams transport for Wolverine, providing reliable message publishing and consumption backed by Redis Streams and consumer groups. The implementation follows Wolverine's established transport patterns and includes support for both inline and buffered processing modes.

What's Included

Core Features

  • Inline and Buffered competing consumer modes — Inline processes one message at a time (BatchSize=1) for simplicity and ordered processing, while buffered mode uses batching and pipelined sends for better throughput
  • Configurable listener start position — New consumer groups can start from the beginning of the stream (0-0) or only consume new messages ($, which is the default)
  • Reliable message redelivery — Automatic recovery of stalled messages using XAUTOCLAIM to reclaim idle entries from the Pending Entries List
  • Request/reply support — Per-node reply stream endpoints are automatically provisioned for request/reply patterns
  • Auto-provisioning — Streams and consumer groups are created automatically, with optional auto-purge for test/dev environments

How It Works

The transport uses Redis Streams' consumer group functionality for reliable delivery:

  1. Message consumption: New messages are read using XREADGROUP with a configured consumer group and consumer name
  2. Acknowledgment: Successfully processed messages are acknowledged with XACK
  3. Recovery: A periodic XAUTOCLAIM operation reclaims idle messages after a configurable timeout, ensuring stalled messages get reprocessed
  4. Throughput optimization: Buffered mode uses pipelined XADD operations for improved performance

Configuration Examples

Setting up the transport:

opts.UseRedisTransport("localhost:6379")
    .AutoProvision(); // will create any streams/consumer groups that don't exist - as well as creating a dedicated reply stream for this process.

Publishing to a stream inline (ordered):

opts.PublishMessage<Bleb>()
    .ToRedisStream(streamKey: "streamName", databaseId: 1)
    .Inline(); // Must override the default of BufferedInMemory to ensure ordered publishing.

Listening with buffered processing, with autoclaiming of stalled messages enabled - if consumerGroup doesn't already exist, create one for new messages only:

    opts.ListenToRedisStream("streamName", "consumerGroupName", StartFrom.NewMessages) // '$' start point - Similar to Kafka AutoOffsetReset.Latest
        .EnableAutoClaim(period: TimeSpan.FromSeconds(30), minIdle: TimeSpan.FromMinutes(5)) // Automatically claim pending messages that have been idle for more than 5 minutes, checking every 30 seconds (results in out of order messages)
        .BufferedInMemory() // this is default, but just to be explicit
        .BatchSize(20); // default is 10

Listening with inline processing, without autoclaiming of stalled messages enabled - if consumerGroup doesn't already exist, create one that will process all messages currently on the stream:

    opts.ListenToRedisStream("streamName", "consumerGroupName", StartFrom.Beginning) // '0-0' start point - Similar to Kafka AutoOffsetReset.Earliest
        .DisableAutoClaim() // disable auto-claiming of pending messages - to avoid out of order messages - this is default but just to be explicit
        .Inline(); // inline sets batch size to 1 and no parallelism to avoid out of order messages - if you REALLY want to claim more than 1 message at a time, you can still override the batch size after this call.

Implementation Details

The transport follows Wolverine's standard patterns:

  • Dedicated endpoint base class and envelope mapper for clean separation of concerns
  • ISenderProtocol-based sender with batched operations
  • Standard IBrokerQueue interface for diagnostics and resource management
  • Non-secret URIs (redis://stream/{databaseId}/{key}?consumerGroup={consumerGroupName}) - thinking here is that in future, when support is added, you can create a listener without a consumer group and it will act as a "topic-style" listener (where messages during process downtime are missed - but smart enough that any redis disconnects whilst the process is up will not miss any messages - e.g. checkpointed in-memory)

Testing

Comprehensive test coverage includes:

  • Transport compliance tests for both inline and buffered modes
  • Integration tests for start position behavior and auto-claim functionality
  • Reply stream system endpoint validation
  • Local Redis testing with auto-provisioning support

Current Limitations

A few features are planned for future releases:

  • Explicit QoS switching: Currently defaults to at-least-once semantics (when AutoClaiming is enabled); at-most-once mode isn't exposed yet

  • Non-persistent subscriptions: Topic-like subscriptions using XREAD without consumer groups

  • Reply stream cleanup: Currently we litter Redis with a reply stream per Redis transport instance that are never reused or cleaned up - automatic cleanup of reply streams is essential, unsure how yet.

  • Stream expiry: We may want to consider adding an XADD followed by a stream-level EXPIRE which can be a TTL style duration. If we make these two calls as an atomic operation, within a lua script call or a redis transaction, then the streams will be automatically deleted when the last added message reaches a certain age. Note this is regardless of if all consumer groups have consumed the message so fair warning must be given when using this approach, and it isn't retention as such given if new message is always added to the stream within the TTL then the stream will grow infinitely - mix with stream truncation approach below?

  • Stream truncation: Redis supports automatic trimming of streams to a max length (unfortunately stream TTLs are not directly supported), we should allow publishers to configure the autotrim option when XADDing to a stream. I think this method of retention will need to be mixed with the stream expiry -

  • Multiple Redis connections: Need to confirm how this should work, if we can name each of the "UseRedisTransport" calls, perhaps the endpoint URI can contain the redis transport name and route to the respective redis transport

Compatibility

This change is fully backwards compatible with existing transports and follows established Wolverine configuration patterns. Documentation and examples are included to help with adoption.

I ask for all comments and criticisms on design decisions and implementation detail. I will be upfront and say that LLMs did help extensively with the code, taking slices of inspiration from other existing Wolverine transports and is still a little rough but I wanted to put this up to gather comments / feedback before cementing the design / polishing the code.

@megakid megakid changed the title Initial Redis Streams Transport commit Redis Streams Transport Sep 11, 2025
@jeremydmiller
Copy link
Member

@megakid Let's see how the day goes, but I'll try to add some basic docs in markdown. After that's in, would you mind reviewing/editing that? No huge hurry.

I'll review this today, but I'm inclined to get it out sooner rather than later and just roll w./ the issues that might come in.

And this is awesome!

@jeremydmiller
Copy link
Member

@megakid Oh, crap. Are you okay with this going into Wolverine 5.0 in a couple weeks, or hoping for it sooner so that it needs to be pushed into the 4.0 branch?

@megakid
Copy link
Contributor Author

megakid commented Sep 14, 2025

That's fine for me. Happy to wait for 5.0

@jeremydmiller
Copy link
Member

@megakid Okay, thank you for being patient! Hopefully around Oct. 1st. With this and a few other new things.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants