diff --git a/README.md b/README.md index bbae03f..70f3079 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ [](https://pypi.org/project/taskiq-postgres/) [](https://pypi.org/project/taskiq-postgres/) -[](https://github.com/danfimov/taskiq-postgres) +[&style=for-the-badge)](https://github.com/danfimov/taskiq-postgres)
diff --git a/docs/contributing.md b/docs/contributing.md
index e69de29..59130b6 100644
--- a/docs/contributing.md
+++ b/docs/contributing.md
@@ -0,0 +1,3 @@
+---
+title: Contributing
+---
diff --git a/docs/tutorial/broker.md b/docs/tutorial/broker.md
new file mode 100644
index 0000000..600b22a
--- /dev/null
+++ b/docs/tutorial/broker.md
@@ -0,0 +1,69 @@
+---
+title: Broker
+---
+
+To use broker with PostgreSQL you need to import broker and result backend from this library and provide a address for connection. For example, lets create a file `broker.py` with the following content:
+
+=== "asyncpg"
+
+ ```python
+ import asyncio
+ from taskiq_pg.asyncpg import AsyncpgResultBackend, AsyncpgBroker
+
+
+ dsn = "postgres://postgres:postgres@localhost:5432/postgres"
+ broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
+
+
+ @broker.task
+ async def best_task_ever() -> None:
+ """Solve all problems in the world."""
+ await asyncio.sleep(5.5)
+ print("All problems are solved!")
+
+
+ async def main():
+ await broker.startup()
+ task = await best_task_ever.kiq()
+ print(await task.wait_result())
+ await broker.shutdown()
+
+
+ if __name__ == "__main__":
+ asyncio.run(main())
+ ```
+
+=== "psqlpy"
+
+ ```python
+ import asyncio
+ from taskiq_pg.psqlpy import PSQLPyResultBackend, PSQLPyBroker
+
+
+ dsn = "postgres://postgres:postgres@localhost:5432/postgres"
+ broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn))
+
+
+ @broker.task
+ async def best_task_ever() -> None:
+ """Solve all problems in the world."""
+ await asyncio.sleep(5.5)
+ print("All problems are solved!")
+
+
+ async def main():
+ await broker.startup()
+ task = await best_task_ever.kiq()
+ print(await task.wait_result())
+ await broker.shutdown()
+
+
+ if __name__ == "__main__":
+ asyncio.run(main())
+ ```
+
+Then you can run this file with:
+
+```bash
+python broker.py
+```
diff --git a/docs/tutorial/quickstart.md b/docs/tutorial/quickstart.md
new file mode 100644
index 0000000..f15cbd3
--- /dev/null
+++ b/docs/tutorial/quickstart.md
@@ -0,0 +1,3 @@
+---
+title: Getting Started
+---
diff --git a/docs/tutorial/schedule_source.md b/docs/tutorial/schedule_source.md
new file mode 100644
index 0000000..9b9fbdc
--- /dev/null
+++ b/docs/tutorial/schedule_source.md
@@ -0,0 +1,3 @@
+---
+title: Schedule Source
+---
diff --git a/examples/example_with_broker.py b/examples/example_with_broker.py
new file mode 100644
index 0000000..9bb245e
--- /dev/null
+++ b/examples/example_with_broker.py
@@ -0,0 +1,35 @@
+"""
+How to run:
+
+ 1) Run worker in one terminal:
+ uv run taskiq worker examples.example_with_broker:broker
+
+ 2) Run this script in another terminal:
+ uv run python -m examples.example_with_broker
+"""
+
+import asyncio
+
+from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
+
+
+dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
+broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
+
+
+@broker.task("solve_all_problems")
+async def best_task_ever() -> None:
+ """Solve all problems in the world."""
+ await asyncio.sleep(2)
+ print("All problems are solved!")
+
+
+async def main():
+ await broker.startup()
+ task = await best_task_ever.kiq()
+ print(await task.wait_result())
+ await broker.shutdown()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/mkdocs.yml b/mkdocs.yml
index 0d0a2a1..fe9b48e 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -8,6 +8,10 @@ edit_uri: edit/main/docs/
nav:
- Overview:
- index.md
+ - Tutorial:
+ - tutorial/quickstart.md
+ - tutorial/broker.md
+ - tutorial/schedule_source.md
- API:
- reference.md
- Contributing:
diff --git a/pyproject.toml b/pyproject.toml
index d9304cc..2e6af73 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -31,7 +31,7 @@ authors = [
]
requires-python = ">=3.10,<3.14"
dependencies = [
- "taskiq>=0.11.17",
+ "taskiq>=0.11.18",
]
[project.urls]
@@ -143,10 +143,19 @@ ignore = [
"S101", # assert usage
"S311", # pseudo-random generators are not suitable for cryptographic purposes
+ "S608",
+
+ "RUF",
]
"tests/test_linting.py" = [
"S603", # subprocess usage
]
+"examples/*" = [
+ "T201",
+ "D",
+ "ANN",
+ "INP001",
+]
[tool.ruff.lint.isort]
known-local-folder = ["taskiq_pg"]
diff --git a/src/taskiq_pg/asyncpg/broker.py b/src/taskiq_pg/asyncpg/broker.py
index efb07ec..1b9fe0b 100644
--- a/src/taskiq_pg/asyncpg/broker.py
+++ b/src/taskiq_pg/asyncpg/broker.py
@@ -10,10 +10,10 @@
from taskiq_pg._internal.broker import BasePostgresBroker
from taskiq_pg.asyncpg.queries import (
+ CLAIM_MESSAGE_QUERY,
CREATE_MESSAGE_TABLE_QUERY,
DELETE_MESSAGE_QUERY,
INSERT_MESSAGE_QUERY,
- SELECT_MESSAGE_QUERY,
)
@@ -142,20 +142,14 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
try:
payload = await self._queue.get()
message_id = int(payload)
- message_row = await self.read_conn.fetchrow(
- SELECT_MESSAGE_QUERY.format(self.table_name),
- message_id,
- )
- if message_row is None:
- logger.warning(
- "Message with id %s not found in database.",
+ async with self.write_pool.acquire() as conn:
+ claimed = await conn.fetchrow(
+ CLAIM_MESSAGE_QUERY.format(self.table_name),
message_id,
)
+ if claimed is None:
continue
- if message_row.get("message") is None:
- msg = "Message row does not have 'message' column"
- raise ValueError(msg)
- message_str = message_row["message"]
+ message_str = claimed["message"]
if not isinstance(message_str, str):
msg = "message is not a string"
raise TypeError(msg)
diff --git a/src/taskiq_pg/asyncpg/queries.py b/src/taskiq_pg/asyncpg/queries.py
index 7033de1..89b6173 100644
--- a/src/taskiq_pg/asyncpg/queries.py
+++ b/src/taskiq_pg/asyncpg/queries.py
@@ -37,6 +37,7 @@
task_name VARCHAR NOT NULL,
message TEXT NOT NULL,
labels JSONB NOT NULL,
+ status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""
@@ -47,6 +48,6 @@
RETURNING id
"""
-SELECT_MESSAGE_QUERY = "SELECT * FROM {} WHERE id = $1"
+CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message"
DELETE_MESSAGE_QUERY = "DELETE FROM {} WHERE id = $1"
diff --git a/src/taskiq_pg/psqlpy/broker.py b/src/taskiq_pg/psqlpy/broker.py
index d66ca7a..f1a16fd 100644
--- a/src/taskiq_pg/psqlpy/broker.py
+++ b/src/taskiq_pg/psqlpy/broker.py
@@ -6,15 +6,16 @@
from dataclasses import dataclass
import psqlpy
+from psqlpy.exceptions import ConnectionExecuteError
from psqlpy.extra_types import JSONB
from taskiq import AckableMessage, BrokerMessage
from taskiq_pg._internal.broker import BasePostgresBroker
from taskiq_pg.psqlpy.queries import (
+ CLAIM_MESSAGE_QUERY,
CREATE_MESSAGE_TABLE_QUERY,
DELETE_MESSAGE_QUERY,
INSERT_MESSAGE_QUERY,
- SELECT_MESSAGE_QUERY,
)
@@ -35,6 +36,7 @@ class MessageRow:
task_name: str
message: str
labels: JSONB
+ status: str
created_at: datetime
@@ -165,14 +167,17 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
try:
payload = await self._queue.get()
message_id = int(payload) # payload is the message id
- message_row = await self.read_conn.fetch_row(
- SELECT_MESSAGE_QUERY.format(self.table_name),
- [message_id],
- )
- # ugly type hacks b/c SingleQueryResult.as_class return type is wrong
+ try:
+ async with self.write_pool.acquire() as conn:
+ claimed_message = await conn.fetch_row(
+ CLAIM_MESSAGE_QUERY.format(self.table_name),
+ [message_id],
+ )
+ except ConnectionExecuteError: # message was claimed by another worker
+ continue
message_row_result = tp.cast(
"MessageRow",
- tp.cast("object", message_row.as_class(MessageRow)),
+ tp.cast("object", claimed_message.as_class(MessageRow)),
)
message_data = message_row_result.message.encode()
diff --git a/src/taskiq_pg/psqlpy/queries.py b/src/taskiq_pg/psqlpy/queries.py
index 7033de1..9770e99 100644
--- a/src/taskiq_pg/psqlpy/queries.py
+++ b/src/taskiq_pg/psqlpy/queries.py
@@ -37,6 +37,7 @@
task_name VARCHAR NOT NULL,
message TEXT NOT NULL,
labels JSONB NOT NULL,
+ status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""
@@ -47,6 +48,6 @@
RETURNING id
"""
-SELECT_MESSAGE_QUERY = "SELECT * FROM {} WHERE id = $1"
+CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *"
DELETE_MESSAGE_QUERY = "DELETE FROM {} WHERE id = $1"
diff --git a/tests/integration/test_broker_single_delivery.py b/tests/integration/test_broker_single_delivery.py
new file mode 100644
index 0000000..960e390
--- /dev/null
+++ b/tests/integration/test_broker_single_delivery.py
@@ -0,0 +1,101 @@
+from __future__ import annotations
+
+import asyncio
+import typing as tp
+import uuid
+from contextlib import suppress
+
+import asyncpg
+import pytest
+from taskiq import BrokerMessage
+
+from taskiq_pg.asyncpg import AsyncpgBroker
+from taskiq_pg.psqlpy import PSQLPyBroker
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize(
+ "broker_class",
+ [
+ AsyncpgBroker,
+ PSQLPyBroker,
+ ],
+)
+async def test_when_two_workers_listen__then_single_message_processed_once(
+ pg_dsn: str,
+ broker_class: type[AsyncpgBroker | PSQLPyBroker],
+) -> None:
+ # Given: уникальные имена таблицы и канала, два брокера, одна задача
+ table_name: str = f"taskiq_messages_{uuid.uuid4().hex}"
+ channel_name: str = f"taskiq_channel_{uuid.uuid4().hex}"
+ task_id: str = uuid.uuid4().hex
+
+ broker1 = broker_class(dsn=pg_dsn, table_name=table_name, channel_name=channel_name)
+ broker2 = broker_class(dsn=pg_dsn, table_name=table_name, channel_name=channel_name)
+
+ # Подключение для проверок состояния в таблице
+ conn: asyncpg.Connection = await asyncpg.connect(dsn=pg_dsn)
+
+ # Сообщение для публикации
+ message: BrokerMessage = BrokerMessage(
+ task_id=task_id,
+ task_name="example:best_task_ever",
+ message=b'{"hello":"world"}',
+ labels={},
+ )
+
+ # When: стартуем брокеры и два слушателя, публикуем одно сообщение
+ await broker1.startup()
+ await broker2.startup()
+
+ agen1 = broker1.listen()
+ agen2 = broker2.listen()
+
+ # Запускаем ожидание первого сообщения у обоих слушателей до публикации,
+ # чтобы оба гарантированно получили NOTIFY.
+ t1: asyncio.Task = asyncio.create_task(agen1.__anext__())
+ t2: asyncio.Task = asyncio.create_task(agen2.__anext__())
+
+ try:
+ await broker1.kick(message)
+
+ done, _ = await asyncio.wait(
+ {t1, t2},
+ timeout=5.0,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ # Then: только один слушатель получает сообщение
+ assert len(done) == 1, "Ровно один воркер должен получить сообщение"
+ winner_task: asyncio.Task = next(iter(done))
+ ack_message = tp.cast("tp.Any", winner_task.result())
+
+ # До подтверждения проверяем, что статус в таблице = 'processing'
+ row = await conn.fetchrow(
+ f"SELECT id, status FROM {table_name} WHERE task_id = $1",
+ task_id,
+ )
+ assert row is not None, "Сообщение должно существовать в таблице"
+ assert row["status"] == "processing", "Сообщение должно быть помечено как processing после claim"
+
+ # Подтверждаем обработку победившим воркером
+ await ack_message.ack()
+
+ # И проверяем, что запись удалена
+ cnt: int = tp.cast(
+ "int",
+ await conn.fetchval(
+ f"SELECT COUNT(*) FROM {table_name} WHERE task_id = $1",
+ task_id,
+ ),
+ )
+ assert cnt == 0, "Запись должна быть удалена после ack"
+ finally:
+ with suppress(Exception):
+ await broker1.shutdown()
+ await broker2.shutdown()
+
+ try:
+ await conn.execute(f"DROP TABLE IF EXISTS {table_name}")
+ finally:
+ await conn.close()
diff --git a/uv.lock b/uv.lock
index 4bfe487..81df24f 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1148,7 +1148,7 @@ wheels = [
[[package]]
name = "taskiq"
-version = "0.11.17"
+version = "0.11.18"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
@@ -1161,9 +1161,9 @@ dependencies = [
{ name = "taskiq-dependencies" },
{ name = "typing-extensions" },
]
-sdist = { url = "https://files.pythonhosted.org/packages/ee/02/3257ed9da7a6207d095d9abdd7fe5c8aad4a6d484cc309a657d6422a4a6c/taskiq-0.11.17.tar.gz", hash = "sha256:9c1c402beea452e8e834c53494035d653499d044ef1c7e6250c8fb7b31e52165", size = 54083, upload-time = "2025-04-27T11:43:52.629Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/91/4d/0d1b3b6c77a45d7a8c685a9c916b2532cca36a26771831949b874f6d15c3/taskiq-0.11.18.tar.gz", hash = "sha256:b83e1b70aee74d0a197d4a4a5ba165b8ba85b12a2b3b7ebfa3c6fdcc9e3128a7", size = 54323, upload-time = "2025-07-15T16:25:54.37Z" }
wheels = [
- { url = "https://files.pythonhosted.org/packages/34/f4/c11112085b2a86f1c2787700a23278d94263c700bf2b55753bda37d72440/taskiq-0.11.17-py3-none-any.whl", hash = "sha256:a01fe1fc9c646f71113d0b886761a5e5253a35e625491d62e3379a14a99563b7", size = 80019, upload-time = "2025-04-27T11:43:50.682Z" },
+ { url = "https://files.pythonhosted.org/packages/4a/d5/46505f57c140d10d4c36f6bd2f2047fb0460e4d5b9b841dc3b93ab8c893d/taskiq-0.11.18-py3-none-any.whl", hash = "sha256:0df58be24e4ef5d19c8ef02581d35d392b0d780d3fe37950e0478022b85ce288", size = 79608, upload-time = "2025-07-15T16:25:52.707Z" },
]
[[package]]
@@ -1212,7 +1212,7 @@ requires-dist = [
{ name = "aiopg", marker = "extra == 'aiopg'", specifier = ">=1.4.0" },
{ name = "asyncpg", marker = "extra == 'asyncpg'", specifier = ">=0.30.0" },
{ name = "psqlpy", marker = "extra == 'psqlpy'", specifier = ">=0.11.6" },
- { name = "taskiq", specifier = ">=0.11.17" },
+ { name = "taskiq", specifier = ">=0.11.18" },
]
provides-extras = ["aiopg", "asyncpg", "psqlpy"]