Skip to content

fix subscribers list#246

Merged
Gsantomaggio merged 10 commits intomasterfrom
fix/subscribes
Sep 25, 2025
Merged

fix subscribers list#246
Gsantomaggio merged 10 commits intomasterfrom
fix/subscribes

Conversation

@Gsantomaggio
Copy link
Copy Markdown
Member

@Gsantomaggio Gsantomaggio commented Sep 23, 2025

Fixes: #241

This PR refactors the consumer/subscriber system to use numeric IDs instead of string names for internal tracking, addressing issues with subscriber list management. The changes enhance performance and eliminate bugs associated with string-based subscriber identification.

  • Converts subscriber tracking from string-based names to integer-based IDs for better performance and reliability
  • Updates MessageContext and EventContext to include stream information directly instead of requiring lookups
  • Adds validation for max_subscribers_by_connection limits and introduces new exception types

Breaking changes

  1. Subscriber returns not the subscriber_id, but instead thereference

The bug was here [Tag 0.3.1]:

rstream/rstream/consumer.py

Lines 197 to 203 in 654a9ef

# We can have multiple subscribers sharing same connection, so their ids must be distinct
# subscription_id = len([s for s in self._subscribers.values() if s.client is client]) + 1
subscription_id = await client.get_available_id()
reference = subscriber_name or f"{stream}_subscriber_{subscription_id}"
decoder = decoder or (lambda x: x)
subscriber = self._subscribers[reference] = _Subscriber(

Given two references with the same name the subscriber = self._subscribers[reference] is not consistent.

With this PR the _subscribers is [int, _Subscriber]

self._subscribers: dict[int, _Subscriber] = {}

Where the int is the subscriber id that must be unique for connection by protocol.

  1. remove the get_stream function message_context.consumer.get_stream(message_context.subscriber_name)

The get_stream is not needed anymore since the stream is now passed on the message_context and also event_context

  1. subscribe_name is now optional
class MessageContext:
    consumer: Consumer
    stream: str
    subscriber_id: int
    subscriber_name: Optional[str]
    offset: int
    timestamp: int

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio Gsantomaggio mentioned this pull request Sep 23, 2025
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio Gsantomaggio added this to the 0.4.0 milestone Sep 25, 2025
@Gsantomaggio Gsantomaggio self-assigned this Sep 25, 2025
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the consumer/subscriber system to use numeric IDs instead of string names for internal tracking, addressing issues with subscriber list management. The changes improve performance and eliminate bugs related to string-based subscriber identification.

  • Converts subscriber tracking from string-based names to integer-based IDs for better performance and reliability
  • Updates MessageContext and EventContext to include stream information directly instead of requiring lookups
  • Adds validation for max_subscribers_by_connection limits and introduces new exception types

Reviewed Changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
rstream/consumer.py Major refactor of subscriber tracking from string to integer IDs, updated MessageContext structure
rstream/superstream_consumer.py Updated to use integer subscriber IDs instead of string references
rstream/client.py Changed frame handling to use integer subscriber IDs, renamed get_available_id to inc_available_id
rstream/exceptions.py Added new exception classes for max consumers/publishers per connection limits
rstream/constants.py Added MAX_ITEM_ALLOWED constant for connection limits
tests/util.py Updated test utilities to use new MessageContext structure and adjusted timing
tests/test_consumer_validate_id.py New comprehensive test file for subscriber ID validation and stream routing
tests/test_consumer.py Updated existing tests to use integer subscriber IDs and new MessageContext structure
docs/examples/ Updated all example files to use new MessageContext.stream property instead of method calls
Comments suppressed due to low confidence (1)

rstream/consumer.py:350

  • The handler removal is using subscriber.reference (which can be None) instead of str(subscriber_id) to match how handlers were added in the subscribe method.
        await subscriber.client.stop_queue_listener_task(subscriber_id=subscriber_id)
        subscriber.client.remove_handler(
            schema.Deliver,
            name=subscriber.reference,
        )
        subscriber.client.remove_handler(
            schema.MetadataUpdate,
            name=subscriber.reference,
        )

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment thread rstream/constants.py Outdated
Comment thread tests/test_consumer.py Outdated
Gsantomaggio and others added 2 commits September 25, 2025 08:31
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@Gsantomaggio Gsantomaggio marked this pull request as ready for review September 25, 2025 06:54
@Gsantomaggio Gsantomaggio merged commit 9906f8d into master Sep 25, 2025
1 check passed
@Gsantomaggio Gsantomaggio deleted the fix/subscribes branch September 25, 2025 16:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

subscriber_name in the super stream consumer fails to return the right consumer

2 participants