-
Notifications
You must be signed in to change notification settings - Fork 1
fix: ensure that only one worker processing task #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements single-delivery guarantees to ensure only one worker processes each task, fixing issue #9. The implementation uses database-level atomic operations to prevent race conditions between multiple workers.
- Adds a
statuscolumn to track message processing state (pending→processing) - Replaces SELECT queries with atomic UPDATE operations that claim messages
- Includes comprehensive integration test to verify single-delivery behavior
Reviewed Changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/integration/test_broker_single_delivery.py | Adds integration test verifying only one worker processes each message |
| src/taskiq_pg/psqlpy/queries.py | Updates table schema and replaces SELECT with atomic CLAIM query |
| src/taskiq_pg/psqlpy/broker.py | Implements atomic message claiming with exception handling |
| src/taskiq_pg/asyncpg/queries.py | Updates table schema and replaces SELECT with atomic CLAIM query |
| src/taskiq_pg/asyncpg/broker.py | Implements atomic message claiming logic |
| pyproject.toml | Updates taskiq dependency and adds linting rule exceptions |
| mkdocs.yml | Adds tutorial navigation structure |
| examples/example_with_broker.py | Adds working example with broker usage |
| docs/tutorial/*.md | Adds basic tutorial documentation structure |
| docs/contributing.md | Adds contributing documentation placeholder |
| README.md | Updates CI badge filter |
Comments suppressed due to low confidence (1)
tests/integration/test_broker_single_delivery.py:1
- [nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
from __future__ import annotations
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| pg_dsn: str, | ||
| broker_class: type[AsyncpgBroker | PSQLPyBroker], | ||
| ) -> None: | ||
| # Given: уникальные имена таблицы и канала, два брокера, одна задача |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| # Подключение для проверок состояния в таблице | ||
| conn: asyncpg.Connection = await asyncpg.connect(dsn=pg_dsn) | ||
|
|
||
| # Сообщение для публикации |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| labels={}, | ||
| ) | ||
|
|
||
| # When: стартуем брокеры и два слушателя, публикуем одно сообщение |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| # Запускаем ожидание первого сообщения у обоих слушателей до публикации, | ||
| # чтобы оба гарантированно получили NOTIFY. |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
|
|
||
| # Then: только один слушатель получает сообщение |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| winner_task: asyncio.Task = next(iter(done)) | ||
| ack_message = tp.cast("tp.Any", winner_task.result()) | ||
|
|
||
| # До подтверждения проверяем, что статус в таблице = 'processing' |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| """ | ||
|
|
||
| SELECT_MESSAGE_QUERY = "SELECT * FROM {} WHERE id = $1" | ||
| CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message" |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CLAIM_MESSAGE_QUERY only returns 'id' and 'message' fields, but the psqlpy version returns all fields (*). This inconsistency could lead to maintenance issues if additional fields need to be accessed in the future."
| CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message" | |
| CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *" |
| CLAIM_MESSAGE_QUERY.format(self.table_name), | ||
| [message_id], | ||
| ) | ||
| except ConnectionExecuteError: # message was claimed by another worker |
Copilot
AI
Oct 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching the broad ConnectionExecuteError exception could mask other database connection issues unrelated to message claiming. Consider catching a more specific exception or adding additional error handling to distinguish between claim conflicts and genuine connection problems."
| except ConnectionExecuteError: # message was claimed by another worker | |
| except ConnectionExecuteError as exc: # message was claimed by another worker or other connection issue | |
| # Check if the error is due to a claim conflict (e.g., unique violation) | |
| # Adjust the condition below to match your DB's claim conflict error code/message | |
| if hasattr(exc, "pgcode") and exc.pgcode == "23505": # unique_violation | |
| # Message was claimed by another worker | |
| continue | |
| logger.exception("Database connection error while claiming message") |
closes #9