Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
strategy:
max-parallel: 5
matrix:
python: [3.9, "3.10", "3.11", "3.12", "3.13"]
python: [3.9, "3.10", "3.11", "3.12", "3.13", "3.14"]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.9 is EOL, are we able to drop it?

timeout-minutes: 10
steps:
- uses: actions/checkout@v2
Expand Down
71 changes: 49 additions & 22 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def assignment_callback(
try:
assignment: MutableSequence[ConfluentTopicPartition] = []

for partition in self.__consumer.committed(partitions):
for partition in self.__consumer.committed(list(partitions)):
if partition.offset >= 0:
assignment.append(partition)
elif partition.offset == OFFSET_INVALID:
Expand All @@ -364,7 +364,7 @@ def assignment_callback(

# Ensure that all partitions are resumed on assignment to avoid
# carrying over state from a previous assignment.
self.resume([p for p in offsets])
self.resume(list(offsets.keys()))

except Exception:
self.__state = KafkaConsumerState.ERROR
Expand All @@ -382,13 +382,15 @@ def revocation_callback(
) -> None:
self.__state = KafkaConsumerState.REVOKING

partitions = [Partition(Topic(i.topic), i.partition) for i in partitions]
revoked_partitions = [
Partition(Topic(i.topic), i.partition) for i in partitions
]

try:
if on_revoke is not None:
on_revoke(partitions)
on_revoke(revoked_partitions)
finally:
for partition in partitions:
for partition in revoked_partitions:
# Staged offsets are deleted during partition revocation to
# prevent later committing offsets for partitions that are
# no longer owned by this consumer.
Expand Down Expand Up @@ -476,8 +478,11 @@ def poll(
code = error.code()
if code == KafkaError._PARTITION_EOF:
raise EndOfPartition(
Partition(Topic(message.topic()), message.partition()),
message.offset(),
Partition(
Topic(cast(str, message.topic())),
cast(int, message.partition()),
),
cast(int, message.offset()),
)
elif code == KafkaError._TRANSPORT:
raise TransportError(str(error))
Expand All @@ -489,15 +494,33 @@ def poll(
else:
raise ConsumerError(str(error))

headers: Optional[Headers] = message.headers()
raw_headers = message.headers()
if raw_headers is None:
headers = []
elif isinstance(raw_headers, dict):
headers = [
(k, v if isinstance(v, bytes) else b"")
for k, v in raw_headers.items()
]
else:
headers = [
(k, v if isinstance(v, bytes) else b"")
for k, v in raw_headers
]
value = message.value()
if value is None:
value = b""
broker_value = BrokerValue(
KafkaPayload(
message.key(),
message.value(),
headers if headers is not None else [],
value,
headers,
),
Partition(
Topic(cast(str, message.topic())),
cast(int, message.partition()),
),
Partition(Topic(message.topic()), message.partition()),
message.offset(),
cast(int, message.offset()),
datetime.utcfromtimestamp(message.timestamp()[1] / 1000.0),
)
self.__offsets[broker_value.partition] = broker_value.next_offset
Expand Down Expand Up @@ -737,8 +760,8 @@ class KafkaProducer(Producer[KafkaPayload]):
def __init__(
self, configuration: Mapping[str, Any], use_simple_futures: bool = False
) -> None:
self.__configuration = configuration
self.__producer = ConfluentKafkaProducer(configuration)
self.__configuration = dict(configuration)
self.__producer = ConfluentKafkaProducer(self.__configuration)
self.__shutdown_requested = Event()

# The worker must execute in a separate thread to ensure that callbacks
Expand Down Expand Up @@ -778,8 +801,11 @@ def __delivery_callback(
future.set_result(
BrokerValue(
payload,
Partition(Topic(message.topic()), message.partition()),
message.offset(),
Partition(
Topic(cast(str, message.topic())),
cast(int, message.partition()),
),
cast(int, message.offset()),
datetime.utcfromtimestamp(timestamp_value / 1000.0),
),
)
Expand Down Expand Up @@ -815,7 +841,7 @@ def produce(
produce(
value=payload.value,
key=payload.key,
headers=payload.headers,
headers=cast(Any, payload.headers),
on_delivery=partial(self.__delivery_callback, future, payload),
)
return future
Expand All @@ -832,14 +858,15 @@ def close(self) -> Future[None]:
METRICS_FREQUENCY_SEC = 1.0


class ConfluentProducer(ConfluentKafkaProducer): # type: ignore[misc]
class ConfluentProducer(ConfluentKafkaProducer):
"""
A thin wrapper for confluent_kafka.Producer that adds metrics reporting.
"""

def __init__(self, configuration: Mapping[str, Any]) -> None:
super().__init__(configuration)
self.producer_name = configuration.get("client.id") or None
config_dict = dict(configuration)
super().__init__(config_dict)
self.producer_name = config_dict.get("client.id") or None
self.__metrics = get_metrics()
self.__produce_counters: MutableMapping[str, int] = defaultdict(int)
self.__reset_metrics()
Expand Down Expand Up @@ -873,7 +900,7 @@ def produce(self, *args: Any, **kwargs: Any) -> None:
on_delivery = kwargs.pop("on_delivery", None)
user_callback = callback or on_delivery
wrapped_callback = self.__delivery_callback(user_callback)
super().produce(*args, on_delivery=wrapped_callback, **kwargs)
super().produce(*args, on_delivery=wrapped_callback, **kwargs) # type: ignore[misc]

def __flush_metrics(self) -> None:
for status, count in self.__produce_counters.items():
Expand All @@ -890,7 +917,7 @@ def __flush_metrics(self) -> None:
def flush(self, timeout: float = -1) -> int:
# Kafka producer flush should flush metrics too
self.__flush_metrics()
return cast(int, super().flush(timeout))
return super().flush(timeout)

def __reset_metrics(self) -> None:
self.__produce_counters.clear()
Expand Down
4 changes: 3 additions & 1 deletion arroyo/backends/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,12 @@ def unsubscribe(self) -> None:
if self.__closed:
raise RuntimeError("consumer is closed")

subscription = self.__subscription
assert subscription is not None
self.__pending_callbacks.append(
partial(
self.__revoke,
self.__subscription,
subscription,
self.__broker.unsubscribe(self),
)
)
Expand Down
10 changes: 8 additions & 2 deletions arroyo/processing/strategies/run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,15 @@ def __getitem__(self, index: int) -> TBatchValue:
# was still "alive" in a different part of the processing pipeline, the
# contents of the message would be liable to be corrupted (at best --
# possibly causing a data leak/security issue at worst.)
buf = self.block.buf
if buf is None:
raise RuntimeError("shared memory block has no buffer")
return cast(
TBatchValue,
pickle.loads(
data,
buffers=[
self.block.buf[offset : offset + length].tobytes()
buf[offset : offset + length].tobytes()
for offset, length in buffers
],
),
Expand Down Expand Up @@ -159,7 +162,10 @@ def buffer_callback(buffer: PickleBuffer) -> None:
f"Value exceeds available space in block, {length} "
f"bytes needed but {self.block.size - offset} bytes free."
)
self.block.buf[offset : offset + length] = value
buf = self.block.buf
if buf is None:
raise RuntimeError("shared memory block has no buffer")
buf[offset : offset + length] = value
self.__offset += length
buffers.append((offset, length))

Expand Down
2 changes: 1 addition & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pytest==8.4.1
pytest-benchmark==4.0.0
mypy==0.961
mypy>=1.19.1
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
confluent-kafka>=2.11.0,<2.12.0
confluent-kafka>=2.12.1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing upper bound on confluent-kafka dependency version

Medium Severity

The confluent-kafka dependency previously had an upper bound (<2.12.0) but the new specification (>=2.12.1) removes it entirely. Since this is a published library (sentry-arroyo), a future major version of confluent-kafka (e.g., 3.x) with breaking API changes would be accepted by this constraint, potentially breaking downstream consumers at install time.

Fix in Cursor Fix in Web

12 changes: 9 additions & 3 deletions tests/backends/test_confluent_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def test_metrics_callback_records_success(self) -> None:
{"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"}
)
mock_message = mock.Mock(spec=ConfluentMessage)
producer._ConfluentProducer__metrics_delivery_callback(None, mock_message)
getattr(
producer, "_ConfluentProducer__metrics_delivery_callback"
)(None, mock_message)
producer.flush() # Flush buffered metrics
assert (
Increment(
Expand All @@ -44,7 +46,9 @@ def test_metrics_callback_records_error(self) -> None:
producer = ConfluentProducer({"bootstrap.servers": "fake:9092"})
mock_error = mock.Mock(spec=KafkaError)
mock_message = mock.Mock(spec=ConfluentMessage)
producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message)
getattr(
producer, "_ConfluentProducer__metrics_delivery_callback"
)(mock_error, mock_message)
producer.flush() # Flush buffered metrics
assert (
Increment("arroyo.producer.produce_status", 1, {"status": "error"})
Expand All @@ -63,7 +67,9 @@ def user_callback(
) -> None:
user_callback_invoked.append((error, message))

wrapped = producer._ConfluentProducer__delivery_callback(user_callback)
wrapped = getattr(
producer, "_ConfluentProducer__delivery_callback"
)(user_callback)
mock_message = mock.Mock(spec=ConfluentMessage)
wrapped(None, mock_message)
producer.flush() # Flush buffered metrics
Expand Down
2 changes: 1 addition & 1 deletion tests/backends/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from unittest import mock

import pytest
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.admin import AdminClient, NewTopic # type: ignore[attr-defined]

from arroyo.backends.kafka import KafkaConsumer, KafkaPayload, KafkaProducer
from arroyo.backends.kafka.commit import CommitCodec
Expand Down
2 changes: 1 addition & 1 deletion tests/processing/strategies/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def test_function(message: Message[bool]) -> bool:
def test_backpressure_in_join() -> None:
topic = Topic("topic")
next_step = Mock()
next_step.submit.side_effect = [None] * 6 + [MessageRejected] # type: ignore
next_step.submit.side_effect = [None] * 6 + [MessageRejected]

now = datetime.now()

Expand Down
2 changes: 1 addition & 1 deletion tests/test_kip848_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import closing
from typing import Any, Iterator, Mapping

from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.admin import AdminClient, NewTopic # type: ignore[attr-defined]

from arroyo.backends.kafka import KafkaProducer
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
Expand Down