Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ jobs:
run: poetry run black --check .
- name: flake8
run: poetry run flake8 --exclude=venv,local_tests,docs/examples --max-line-length=120 --ignore=E203,W503,E701,E704
# - name: mypy
# run: |
# poetry add --group dev requests
# poetry add --group dev types-requests
# poetry run mypy .
- name: mypy
run: |
poetry run mypy .
- name: poetry run pytest
run: poetry run pytest
19 changes: 19 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
all: test

install:
pip3 install poetry
poetry install


format:
which poetry
poetry --version
poetry run isort --skip venv .
poetry run black --exclude=venv .
poetry run flake8 --exclude=venv,local_tests,docs/examples --max-line-length=120 --ignore=E203,W503,E701,E704
poetry run mypy .

test: format
poetry run pytest .
help:
cat Makefile
4 changes: 2 additions & 2 deletions docs/examples/reliable_client/BestPracticesClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def routing_extractor(message: AMQPMessage) -> str:


# Make producers (producer or superstream producer)
async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer:
async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer: # type: ignore
host = rabbitmq_data["Host"]
username = rabbitmq_data["Username"]
password = rabbitmq_data["Password"]
Expand Down Expand Up @@ -124,7 +124,7 @@ async def on_close_connection(on_closed_info: OnClosedErrorInfo) -> None:


# Make consumers
async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer:
async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer: # type: ignore
host = rabbitmq_data["Host"]
username = rabbitmq_data["Username"]
password = rabbitmq_data["Password"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async def routing_extractor(message: AMQPMessage) -> str:

async def publish():
global counter
counter = 0
sent = 0
# counter = 0
# sent = 0
async with SuperStreamProducer(
"localhost",
username="guest",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from rstream import (
AMQPMessage,
ConsumerOffsetSpecification,
EventContext,
MessageContext,
OffsetSpecification,
Expand Down Expand Up @@ -62,16 +63,14 @@ async def consume():
await consumer.start()

# properties of the consumer (enabling single active mode)
properties: dict[str, str] = defaultdict(str)
properties: dict[str, str] = defaultdict(str) # type: ignore
properties["single-active-consumer"] = "true"
properties["name"] = "consumer-group-1"
properties["super-stream"] = "invoices"

offset_specification = OffsetSpecification(OffsetType.FIRST, None)

await consumer.subscribe(
callback=on_message,
offset_specification=offset_specification,
offset_specification=ConsumerOffsetSpecification(offset_type=OffsetType.FIRST),
decoder=amqp_decoder,
properties=properties,
consumer_update_listener=consumer_update_handler_offset,
Expand Down
1 change: 1 addition & 0 deletions docs/examples/super_stream/super_stream_producer_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from rstream import (
AMQPMessage,
RouteType,
SuperStreamCreationOption,
SuperStreamProducer,
)

Expand Down
20 changes: 11 additions & 9 deletions rstream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@

from .utils import FilterConfiguration, OnClosedErrorInfo # noqa: E402

from .amqp import AMQPMessage, amqp_decoder # noqa: E402
from ._pyamqp.message import Properties # type: ignore
from ._pyamqp.message import Header # type: ignore
from .compression import CompressionType # type: ignore
from .constants import ( # noqa: E402
ConsumerOffsetSpecification,
SlasMechanism,
)

try:
__version__ = metadata.version(__package__)
__license__ = metadata.metadata(__package__)["license"]
except metadata.PackageNotFoundError:
__version__ = "dev"
__license__ = None
__license__ = "MIT"

del metadata

from .amqp import AMQPMessage, amqp_decoder # noqa: E402
from ._pyamqp.message import Properties # noqa: E402
from ._pyamqp.message import Header # noqa: E402
from .compression import CompressionType # noqa: E402
from .constants import ( # noqa: E402
ConsumerOffsetSpecification,
SlasMechanism,
)

from .consumer import ( # noqa: E402
Consumer,
EventContext,
Expand Down
6 changes: 3 additions & 3 deletions rstream/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from typing import Any, Optional, Protocol

from ._pyamqp._decode import decode_payload
from ._pyamqp._encode import encode_payload
from ._pyamqp.message import Message
from ._pyamqp._decode import decode_payload # type: ignore
from ._pyamqp._encode import encode_payload # type: ignore
from ._pyamqp.message import Message # type: ignore

# import uamqp

Expand Down
8 changes: 4 additions & 4 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ def __init__(

def start_task(self, name: str, coro: Awaitable[None]) -> None:
assert name not in self._tasks
task = self._tasks[name] = asyncio.create_task(coro)
task = self._tasks[name] = asyncio.create_task(coro) # type: ignore

def on_task_done(task: asyncio.Task[Any]) -> None:
if not task.cancelled():
task.result()
def on_task_done(task_done: asyncio.Task[Any]) -> None:
if not task_done.cancelled():
task_done.result()

task.add_done_callback(on_task_done)
logger.debug("Started task %s", name)
Expand Down
4 changes: 2 additions & 2 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(
max_retries: int = 20,
max_subscribers_by_connection: int = 256,
on_close_handler: Optional[CB_CONN[OnClosedErrorInfo]] = None,
connection_name: str = None,
connection_name: str = "",
sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain,
):
self._pool = ClientPool(
Expand All @@ -108,7 +108,7 @@ def __init__(
self._on_close_handler = on_close_handler
self._connection_name = connection_name
self._sasl_configuration_mechanism = sasl_configuration_mechanism
if self._connection_name is None:
if self._connection_name is None or self._connection_name == "":
self._connection_name = "rstream-consumer"
self._max_subscribers_by_connection = max_subscribers_by_connection

Expand Down
6 changes: 3 additions & 3 deletions rstream/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(
max_publishers_by_connection=256,
default_batch_publishing_delay: float = 3,
default_context_switch_value: int = 1000,
connection_name: str = None,
connection_name: str = "",
sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain,
filter_value_extractor: Optional[CB_F[Any]] = None,
):
Expand Down Expand Up @@ -127,7 +127,7 @@ def __init__(
self.publisher_id = 0
self._max_publishers_by_connection = max_publishers_by_connection

if self._connection_name is None:
if self._connection_name is None or self._connection_name == "":
self._connection_name = "rstream-producer"

@property
Expand Down Expand Up @@ -466,7 +466,7 @@ async def _send_batch_async(
value_filter = await self._filter_value_extractor(msg)
messages.append(
schema.Message(
publishing_id=msg.publishing_id,
publishing_id=msg.publishing_id, # type: ignore[arg-type]
filter_value=value_filter,
data=bytes(msg),
)
Expand Down
11 changes: 7 additions & 4 deletions rstream/superstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@


class Metadata(abc.ABC):
@abc.abstractmethod
async def partitions(self) -> list[str]:
pass

@abc.abstractmethod
async def routes(self, routing_key: str) -> list[str]:
pass

Expand Down Expand Up @@ -61,22 +63,23 @@ async def routes(self, routing_key: str) -> list[str]:


class RoutingStrategy(abc.ABC):
@abc.abstractmethod
async def route(self, message: MessageT, metadata: Metadata) -> list[str]:
pass


class RoutingKeyRoutingStrategy(RoutingStrategy):
def __init__(self, routingKeyExtractor: CB[Any]):
self.routingKeyExtractor: CB[Any] = routingKeyExtractor
def __init__(self, routing_key_extractor: CB[Any]):
self.routingKeyExtractor: CB[Any] = routing_key_extractor

async def route(self, message: MessageT, metadata: Metadata) -> list[str]:
key = await self.routingKeyExtractor(message)
return await metadata.routes(str(key))


class HashRoutingMurmurStrategy(RoutingStrategy):
def __init__(self, routingKeyExtractor: CB[Any]):
self.routingKeyExtractor: CB[Any] = routingKeyExtractor
def __init__(self, routing_key_extractor: CB[Any]):
self.routingKeyExtractor: CB[Any] = routing_key_extractor

async def route(self, message: MessageT, metadata: Metadata) -> list[str]:
logger.debug("route() Compute routing")
Expand Down
4 changes: 2 additions & 2 deletions rstream/superstream_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
max_subscribers_by_connection: int = 256,
super_stream: str,
super_stream_creation_option: Optional[SuperStreamCreationOption] = None,
connection_name: str = None,
connection_name: str = "",
on_close_handler: Optional[CB[OnClosedErrorInfo]] = None,
):
self._pool = ClientPool(
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(
self._subscribers: dict[str, str] = defaultdict(str)
self._on_close_handler = on_close_handler
self._connection_name = connection_name
if self._connection_name is None:
if self._connection_name is None or self._connection_name == "":
self._connection_name = "rstream-consumer"

self.super_stream_creation_option = super_stream_creation_option
Expand Down
6 changes: 3 additions & 3 deletions rstream/superstream_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
max_retries: int = 20,
max_publishers_by_connection=256,
default_batch_publishing_delay: float = 0.2,
connection_name: str = None,
connection_name: str = "",
filter_value_extractor: Optional[CB_F[Any]] = None,
):
self._pool = ClientPool(
Expand Down Expand Up @@ -89,10 +89,10 @@ def __init__(
self.max_retries = max_retries
self.default_batch_publishing_delay = default_batch_publishing_delay
self._default_client: Optional[Client] = None
self._producer: Producer | None = None
self._producer: Optional[Producer] = None
self._routing_strategy: RoutingStrategy
self._connection_name = connection_name
if self._connection_name is None:
if self._connection_name is None or self._connection_name == "":
self._connection_name = "rstream-producer"
self._filter_value_extractor: Optional[CB_F[Any]] = filter_value_extractor
self.super_stream_creation_option = super_stream_creation_option
Expand Down