Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ install:
format:
which poetry
poetry --version
poetry run isort --skip venv .
poetry run isort .
poetry run black --exclude=venv .
poetry run flake8 --exclude=venv,local_tests,docs/examples --max-line-length=120 --ignore=E203,W503,E701,E704
poetry run mypy .
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/basic_consumers/basic_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)

await consumer.start()
await consumer.subscribe(stream=STREAM, callback=on_message, decoder=amqp_decoder)
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/basic_consumers/basic_consumer_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)

await consumer.start()
await consumer.subscribe(stream=STREAM, callback=on_message)
Expand Down
11 changes: 7 additions & 4 deletions docs/examples/basic_consumers/basic_consumer_offset_next.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset

print("Got message: {}".format(msg) + " from stream " + stream + " offset: " + str(offset))
print(
"Got message: {}".format(msg)
+ " from stream "
+ message_context.stream
+ " offset: "
+ str(message_context.offset)
)

await consumer.start()
await consumer.subscribe(
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/basic_consumers/basic_consumer_offset_offset.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)

await consumer.start()
# Possible values of OffsetType are: FIRST (default), NEXT, LAST, TIMESTAMP and OFFSET
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/filtering/consumer_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)
print("Application property: " + str(msg.application_properties[b"region"]))
global cont
cont = cont + 1
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/filtering/super_stream_consumer_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ async def consume():
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)
print("Application property: " + str(msg.application_properties[b"region"]))
global cont
cont = cont + 1
Expand Down
17 changes: 11 additions & 6 deletions docs/examples/manual_server_offset_tracking/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):
global lock

consumer = message_context.consumer
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset

print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print(
"Got message: {} from stream {}, offset {}".format(
msg, message_context.stream, message_context.offset
)
)

# store the offset every 1000 messages received
async with lock:
cont = cont + 1
# store the offset every 1000 messages received
if cont % 1000 == 0:
await consumer.store_offset(
stream=stream, offset=offset, subscriber_name=message_context.subscriber_name
)
if message_context.subscriber_name is not None:
await consumer.store_offset(
stream=message_context.stream,
offset=message_context.offset,
subscriber_name=message_context.subscriber_name,
)


async def consume():
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/reliable_client/BestPracticesClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):
messages_consumed = messages_consumed + 1
# some printf after some messages consumed in order to check that we are working...
if (messages_consumed % 100000) == 0:
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset
print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset))
print(
"Received message: {} from stream: {} - message offset: {}".format(
msg, message_context.stream, message_context.offset
)
)


async def publish(rabbitmq_configuration: dict):
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/reliable_client/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
"Virtualhost": "/",
"LoadBalancer": true,
"SuperStream": true,
"MaxPublishersByConnection": 256,
"MaxSubscribersByConnection": 256,
"MaxPublishersByConnection": 250,
"MaxSubscribersByConnection": 250,
"Producers": 3,
"Consumers": 3,
"DelayDuringSendMs":0,
Expand Down
18 changes: 12 additions & 6 deletions docs/examples/single_active_consumer/single_active_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@ async def on_message(msg: AMQPMessage, message_context: MessageContext):
global lock

consumer = message_context.consumer
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset
# store the offset every message received
# you should not store the offset every message received in production
# it could be a performance issue
# this is just an example
await consumer.store_offset(stream=stream, offset=offset, subscriber_name=message_context.subscriber_name)
print("Got message: {} from stream {} offset {}".format(msg, stream, offset))
if message_context.subscriber_name is not None:
await consumer.store_offset(
stream=message_context.stream,
offset=message_context.offset,
subscriber_name=message_context.subscriber_name,
)
print(
"Got message: {} from stream {} offset {}".format(msg, message_context.stream, message_context.offset)
)


# We can decide a strategy to manage Offset specification in single active consumer based on is_active flag
# By default if not present the always the strategy OffsetType.NEXT will be set.
# This handle will be passed to subscribe.
async def consumer_update_handler_offset(is_active: bool, event_context: EventContext) -> OffsetSpecification:
stream = str(event_context.consumer.get_stream(event_context.subscriber_name))
print("stream is: " + stream + " subscriber_name" + event_context.subscriber_name)
if event_context.subscriber_name is not None:
print("stream is: " + event_context.stream + " subscriber_name" + event_context.subscriber_name)

if is_active:
# Put the logic of your use case here
return OffsetSpecification(OffsetType.OFFSET, 10)
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/super_stream/super_stream_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@


async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset
print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset))
print(
"Received message: {} from stream: {} - message offset: {}".format(
msg, message_context.stream, message_context.offset
)
)


async def consume():
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "local_tests.*"
ignore_errors = true

[tool.isort]
profile = "black"
32 changes: 16 additions & 16 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(
self._connection_closed_handler = connection_closed_handler
self._sasl_configuration_mechanism = sasl_configuration_mechanism

self._frames: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
self._frames: dict[int, asyncio.Queue] = defaultdict(asyncio.Queue)
self._is_not_closed: bool = True
self._max_clients_by_connections = max_clients_by_connections

Expand Down Expand Up @@ -163,9 +163,9 @@ async def remove_stream(self, stream: str):
if stream in self._streams:
self._streams.remove(stream)

async def get_available_id(self) -> int:
async def inc_available_id(self) -> int:
for publishing_subscribing_id in range(0, self._max_clients_by_connections):
if self._available_client_ids[publishing_subscribing_id] is True:
if self._available_client_ids[publishing_subscribing_id]:
self._available_client_ids[publishing_subscribing_id] = False
self._current_id = publishing_subscribing_id
return publishing_subscribing_id
Expand Down Expand Up @@ -239,22 +239,22 @@ async def start(self) -> None:
self.add_handler(schema.Heartbeat, self._on_heartbeat)
self.add_handler(schema.Close, self._on_close)

async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -> None:
task_name = f"run_delivery_handlers_{subscriber_name}"
async def run_queue_listener_task(self, subscriber_id: int, handler: HT[FT]) -> None:
task_name = f"run_delivery_handlers_{subscriber_id}"
if task_name not in self._tasks:
self.start_task(
task_name,
self._run_delivery_handlers(subscriber_name, handler),
self._run_delivery_handlers(subscriber_id, handler),
)

async def stop_queue_listener_task(self, subscriber_name: str) -> None:
await self.stop_task(name=f"run_delivery_handlers_{subscriber_name}")
while not self._frames[subscriber_name].empty():
self._frames[subscriber_name].get_nowait()
async def stop_queue_listener_task(self, subscriber_id: int) -> None:
await self.stop_task(name=f"run_delivery_handlers_{subscriber_id}")
while not self._frames[subscriber_id].empty():
self._frames[subscriber_id].get_nowait()

async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]):
async def _run_delivery_handlers(self, subscriber_id: int, handler: HT[FT]):
while self.is_connection_alive():
frame_entry = await self._frames[subscriber_name].get()
frame_entry = await self._frames[subscriber_id].get()
try:
if self.is_connection_alive():
maybe_coro = handler(frame_entry)
Expand Down Expand Up @@ -286,10 +286,10 @@ async def _listener(self) -> None:
fut.set_result(frame)
del self._waiters[_key]

for subscriber_name, handler in list(self._handlers.get(frame.__class__, {}).items()):
for subscriber_id, handler in list(self._handlers.get(frame.__class__, {}).items()):
try:
if frame.__class__ == schema.Deliver:
await self._frames[subscriber_name].put(frame)
await self._frames[int(subscriber_id)].put(frame)
else:
maybe_coro = handler(frame)
if maybe_coro is not None:
Expand Down Expand Up @@ -369,8 +369,8 @@ async def close(self) -> None:
await asyncio.sleep(0.2)
await self.stop_task("listener")

for subscriber_name in self._frames:
await self.stop_queue_listener_task(subscriber_name=subscriber_name)
for subscriber_id in self._frames:
await self.stop_queue_listener_task(subscriber_id=subscriber_id)

if self._conn is not None and connection_is_broken is False:
await self._conn.close()
Expand Down
5 changes: 5 additions & 0 deletions rstream/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ class ConsumerOffsetSpecification:

SUBSCRIPTION_PROPERTY_FILTER_PREFIX = "filter."
SUBSCRIPTION_PROPERTY_MATCH_UNFILTERED = "match-unfiltered"

# max consumers or producers allowed per connection
# even the protocol supports 256 items, we limit it to 200
# share to many consumers/producers can lead to performance issues
Comment thread
Gsantomaggio marked this conversation as resolved.
Outdated
MAX_ITEM_ALLOWED = 200
Loading