Skip to content

Commit 103dd35

Browse files
committed
raise on sync commit failed partitions for now
1 parent b25fa40 commit 103dd35

1 file changed

Lines changed: 6 additions & 8 deletions

File tree

src/kafkac/consumer.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -543,18 +543,16 @@ def _calculate_offsets(
543543

544544
return highest_offsets
545545

546-
async def _commit(self, *, asynchronous: bool) -> None:
547-
out = await self.consumer.commit(asynchronous=asynchronous)
548-
failed = [tp.error() for tp in out if tp.error is not None]
549-
if failed:
550-
# TODO: Remove later
551-
raise Exception("some partitions failed")
552-
553546
async def _ack_messages(self, *, messages: list[Message] | list[TopicPartition]) -> None:
554547
offsets = self._calculate_offsets(messages)
555548
try:
549+
# TODO: Always store + commit, feels excessive here after refactoring.
556550
await self.consumer.store_offsets(offsets=offsets)
557-
await self.consumer.commit(asynchronous=self.async_commit)
551+
result = await self.consumer.commit(asynchronous=self.async_commit)
552+
failed_tp = [tp.error() for tp in result if tp.error is not None]
553+
if failed_tp:
554+
# TODO: Doing anything meaningful here is maybe difficult.
555+
raise # (TODO: Crash)
558556
except KafkaException as exc:
559557
self._log_kafka_exception(exc)
560558
raise # (TODO: Crash)

0 commit comments

Comments
 (0)