Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ install:
format:
which poetry
poetry --version
poetry run isort --skip venv .
poetry run isort .
poetry run black --exclude=venv .
poetry run flake8 --exclude=venv,local_tests,docs/examples --max-line-length=120 --ignore=E203,W503,E701,E704
poetry run mypy .
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/basic_consumers/basic_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)

await consumer.start()
await consumer.subscribe(stream=STREAM, callback=on_message, decoder=amqp_decoder)
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/basic_consumers/basic_consumer_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)

await consumer.start()
await consumer.subscribe(stream=STREAM, callback=on_message)
Expand Down
11 changes: 7 additions & 4 deletions docs/examples/basic_consumers/basic_consumer_offset_next.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset

print("Got message: {}".format(msg) + " from stream " + stream + " offset: " + str(offset))
print(
"Got message: {}".format(msg)
+ " from stream "
+ message_context.stream
+ " offset: "
+ str(message_context.offset)
)

await consumer.start()
await consumer.subscribe(
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/basic_consumers/basic_consumer_offset_offset.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)

await consumer.start()
# Possible values of OffsetType are: FIRST (default), NEXT, LAST, TIMESTAMP and OFFSET
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/filtering/consumer_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)
print("Application property: " + str(msg.application_properties[b"region"]))
global cont
cont = cont + 1
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/filtering/super_stream_consumer_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)
print("Application property: " + str(msg.application_properties[b"region"]))
global cont
cont = cont + 1
Expand Down
17 changes: 11 additions & 6 deletions docs/examples/manual_server_offset_tracking/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):
global lock

consumer = message_context.consumer
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset

print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)

# store the offset every 1000 messages received
async with lock:
cont = cont + 1
# store the offset every 1000 messages received
if cont % 1000 == 0:
await consumer.store_offset(
stream=stream, offset=offset, subscriber_name=message_context.subscriber_name
)
if message_context.subscriber_name is not None:
await consumer.store_offset(
stream=message_context.stream,
offset=message_context.offset,
subscriber_name=message_context.subscriber_name,
)


async def consume():
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/reliable_client/BestPracticesClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):
messages_consumed = messages_consumed + 1
# some printf after some messages consumed in order to check that we are working...
if (messages_consumed % 100000) == 0:
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset
print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset))
print(
"Received message: {} from stream: {} - message offset: {}".format(
msg, message_context.stream, message_context.offset
)
)


async def publish(rabbitmq_configuration: dict):
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/reliable_client/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
"Virtualhost": "/",
"LoadBalancer": true,
"SuperStream": true,
"MaxPublishersByConnection": 256,
"MaxSubscribersByConnection": 256,
"MaxPublishersByConnection": 250,
"MaxSubscribersByConnection": 250,
"Producers": 3,
"Consumers": 3,
"DelayDuringSendMs":0,
Expand Down
18 changes: 12 additions & 6 deletions docs/examples/single_active_consumer/single_active_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):
global lock

consumer = message_context.consumer
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset
# store the offset every message received
# you should not store the offset every message received in production
# it could be a performance issue
# this is just an example
await consumer.store_offset(stream=stream, offset=offset, subscriber_name=message_context.subscriber_name)
print("Got message: {} from stream {} offset {}".format(msg, stream, offset))
if message_context.subscriber_name is not None:
await consumer.store_offset(
stream=message_context.stream,
offset=message_context.offset,
subscriber_name=message_context.subscriber_name,
)
print(
"Got message: {} from stream {} offset {}".format(msg, message_context.stream, message_context.offset)
)


# We can decide a strategy to manage Offset specification in single active consumer based on is_active flag
# By default if not present the always the strategy OffsetType.NEXT will be set.
# This handle will be passed to subscribe.
async def consumer_update_handler_offset(is_active: bool, event_context: EventContext) -> OffsetSpecification:
stream = str(event_context.consumer.get_stream(event_context.subscriber_name))
print("stream is: " + stream + " subscriber_name" + event_context.subscriber_name)
if event_context.subscriber_name is not None:
print("stream is: " + event_context.stream + " subscriber_name" + event_context.subscriber_name)

if is_active:
# Put the logic of your use case here
return OffsetSpecification(OffsetType.OFFSET, 10)
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/super_stream/super_stream_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@


async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset
print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset))
print(
"Received message: {} from stream: {} - message offset: {}".format(
msg, message_context.stream, message_context.offset
)
)


async def consume():
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "local_tests.*"
ignore_errors = true

[tool.isort]
profile = "black"
32 changes: 16 additions & 16 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(
self._connection_closed_handler = connection_closed_handler
self._sasl_configuration_mechanism = sasl_configuration_mechanism

self._frames: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
self._frames: dict[int, asyncio.Queue] = defaultdict(asyncio.Queue)
self._is_not_closed: bool = True
self._max_clients_by_connections = max_clients_by_connections

Expand Down Expand Up @@ -163,9 +163,9 @@ async def remove_stream(self, stream: str):
if stream in self._streams:
self._streams.remove(stream)

async def get_available_id(self) -> int:
async def inc_available_id(self) -> int:
for publishing_subscribing_id in range(0, self._max_clients_by_connections):
if self._available_client_ids[publishing_subscribing_id] is True:
if self._available_client_ids[publishing_subscribing_id]:
self._available_client_ids[publishing_subscribing_id] = False
self._current_id = publishing_subscribing_id
return publishing_subscribing_id
Expand Down Expand Up @@ -239,22 +239,22 @@ async def start(self) -> None:
self.add_handler(schema.Heartbeat, self._on_heartbeat)
self.add_handler(schema.Close, self._on_close)

async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -> None:
task_name = f"run_delivery_handlers_{subscriber_name}"
async def run_queue_listener_task(self, subscriber_id: int, handler: HT[FT]) -> None:
task_name = f"run_delivery_handlers_{subscriber_id}"
if task_name not in self._tasks:
self.start_task(
task_name,
self._run_delivery_handlers(subscriber_name, handler),
self._run_delivery_handlers(subscriber_id, handler),
)

async def stop_queue_listener_task(self, subscriber_name: str) -> None:
await self.stop_task(name=f"run_delivery_handlers_{subscriber_name}")
while not self._frames[subscriber_name].empty():
self._frames[subscriber_name].get_nowait()
async def stop_queue_listener_task(self, subscriber_id: int) -> None:
await self.stop_task(name=f"run_delivery_handlers_{subscriber_id}")
while not self._frames[subscriber_id].empty():
self._frames[subscriber_id].get_nowait()

async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):
async def _run_delivery_handlers(self, subscriber_id: int, handler: HT[FT]):
while self.is_connection_alive():
frame_entry = await self._frames[subscriber_name].get()
frame_entry = await self._frames[subscriber_id].get()
try:
if self.is_connection_alive():
maybe_coro = handler(frame_entry)
Expand Down Expand Up @@ -286,10 +286,10 @@ async def _listener(self) -> None:
fut.set_result(frame)
del self._waiters[_key]

for subscriber_name, handler in list(self._handlers.get(frame.__class__, {}).items()):
for subscriber_id, handler in list(self._handlers.get(frame.__class__, {}).items()):
try:
if frame.__class__ == schema.Deliver:
await self._frames[subscriber_name].put(frame)
await self._frames[int(subscriber_id)].put(frame)
else:
maybe_coro = handler(frame)
if maybe_coro is not None:
Expand Down Expand Up @@ -369,8 +369,8 @@ async def close(self) -> None:
await asyncio.sleep(0.2)
await self.stop_task("listener")

for subscriber_name in self._frames:
await self.stop_queue_listener_task(subscriber_name=subscriber_name)
for subscriber_id in self._frames:
await self.stop_queue_listener_task(subscriber_id=subscriber_id)

if self._conn is not None and connection_is_broken is False:
await self._conn.close()
Expand Down
5 changes: 5 additions & 0 deletions rstream/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ class ConsumerOffsetSpecification:

SUBSCRIPTION_PROPERTY_FILTER_PREFIX = "filter."
SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED = "match-unfiltered"

# max consumers or producers allowed per connection
# even the protocol supports 256 items, we limit it to 200
# sharing too many consumers/producers can lead to performance issues
MAX_ITEM_ALLOWED = 200
Loading