Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/taskiq-postgres?style=for-the-badge&logo=python)](https://pypi.org/project/taskiq-postgres/)
[![PyPI](https://img.shields.io/pypi/v/taskiq-postgres?style=for-the-badge&logo=pypi)](https://pypi.org/project/taskiq-postgres/)
[![Checks](https://img.shields.io/github/check-runs/danfimov/taskiq-postgres/main?style=for-the-badge&logo=pytest)](https://github.com/danfimov/taskiq-postgres)
[![Checks](https://img.shields.io/github/check-runs/danfimov/taskiq-postgres/main?nameFilter=Tests%20(3.12)&style=for-the-badge)](https://github.com/danfimov/taskiq-postgres)

<div align="center">
<a href="https://github.com/danfimov/taskiq-postgres/"><img src="https://raw.githubusercontent.com/danfimov/taskiq-postgres/main/assets/logo.png" width=400></a>
Expand Down
3 changes: 3 additions & 0 deletions docs/contributing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
title: Contributing
---
69 changes: 69 additions & 0 deletions docs/tutorial/broker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
title: Broker
---

To use broker with PostgreSQL you need to import broker and result backend from this library and provide a address for connection. For example, lets create a file `broker.py` with the following content:

=== "asyncpg"

```python
import asyncio
from taskiq_pg.asyncpg import AsyncpgResultBackend, AsyncpgBroker


dsn = "postgres://postgres:postgres@localhost:5432/postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))


@broker.task
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(5.5)
print("All problems are solved!")


async def main():
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
```

=== "psqlpy"

```python
import asyncio
from taskiq_pg.psqlpy import PSQLPyResultBackend, PSQLPyBroker


dsn = "postgres://postgres:postgres@localhost:5432/postgres"
broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn))


@broker.task
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(5.5)
print("All problems are solved!")


async def main():
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
```

Then you can run this file with:

```bash
python broker.py
```
3 changes: 3 additions & 0 deletions docs/tutorial/quickstart.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
title: Getting Started
---
3 changes: 3 additions & 0 deletions docs/tutorial/schedule_source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
title: Schedule Source
---
35 changes: 35 additions & 0 deletions examples/example_with_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
How to run:

1) Run worker in one terminal:
uv run taskiq worker examples.example_with_broker:broker

2) Run this script in another terminal:
uv run python -m examples.example_with_broker
"""

import asyncio

from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))


@broker.task("solve_all_problems")
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")


async def main():
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
4 changes: 4 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ edit_uri: edit/main/docs/
nav:
- Overview:
- index.md
- Tutorial:
- tutorial/quickstart.md
- tutorial/broker.md
- tutorial/schedule_source.md
- API:
- reference.md
- Contributing:
Expand Down
11 changes: 10 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ authors = [
]
requires-python = ">=3.10,<3.14"
dependencies = [
"taskiq>=0.11.17",
"taskiq>=0.11.18",
]

[project.urls]
Expand Down Expand Up @@ -143,10 +143,19 @@ ignore = [

"S101", # assert usage
"S311", # pseudo-random generators are not suitable for cryptographic purposes
"S608",

"RUF",
]
"tests/test_linting.py" = [
"S603", # subprocess usage
]
"examples/*" = [
"T201",
"D",
"ANN",
"INP001",
]

[tool.ruff.lint.isort]
known-local-folder = ["taskiq_pg"]
Expand Down
18 changes: 6 additions & 12 deletions src/taskiq_pg/asyncpg/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

from taskiq_pg._internal.broker import BasePostgresBroker
from taskiq_pg.asyncpg.queries import (
CLAIM_MESSAGE_QUERY,
CREATE_MESSAGE_TABLE_QUERY,
DELETE_MESSAGE_QUERY,
INSERT_MESSAGE_QUERY,
SELECT_MESSAGE_QUERY,
)


Expand Down Expand Up @@ -142,20 +142,14 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
try:
payload = await self._queue.get()
message_id = int(payload)
message_row = await self.read_conn.fetchrow(
SELECT_MESSAGE_QUERY.format(self.table_name),
message_id,
)
if message_row is None:
logger.warning(
"Message with id %s not found in database.",
async with self.write_pool.acquire() as conn:
claimed = await conn.fetchrow(
CLAIM_MESSAGE_QUERY.format(self.table_name),
message_id,
)
if claimed is None:
continue
if message_row.get("message") is None:
msg = "Message row does not have 'message' column"
raise ValueError(msg)
message_str = message_row["message"]
message_str = claimed["message"]
if not isinstance(message_str, str):
msg = "message is not a string"
raise TypeError(msg)
Expand Down
3 changes: 2 additions & 1 deletion src/taskiq_pg/asyncpg/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
task_name VARCHAR NOT NULL,
message TEXT NOT NULL,
labels JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""
Expand All @@ -47,6 +48,6 @@
RETURNING id
"""

SELECT_MESSAGE_QUERY = "SELECT * FROM {} WHERE id = $1"
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message"
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CLAIM_MESSAGE_QUERY only returns 'id' and 'message' fields, but the psqlpy version returns all fields (*). This inconsistency could lead to maintenance issues if additional fields need to be accessed in the future."

Suggested change
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message"
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *"

Copilot uses AI. Check for mistakes.

DELETE_MESSAGE_QUERY = "DELETE FROM {} WHERE id = $1"
19 changes: 12 additions & 7 deletions src/taskiq_pg/psqlpy/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
from dataclasses import dataclass

import psqlpy
from psqlpy.exceptions import ConnectionExecuteError
from psqlpy.extra_types import JSONB
from taskiq import AckableMessage, BrokerMessage

from taskiq_pg._internal.broker import BasePostgresBroker
from taskiq_pg.psqlpy.queries import (
CLAIM_MESSAGE_QUERY,
CREATE_MESSAGE_TABLE_QUERY,
DELETE_MESSAGE_QUERY,
INSERT_MESSAGE_QUERY,
SELECT_MESSAGE_QUERY,
)


Expand All @@ -35,6 +36,7 @@ class MessageRow:
task_name: str
message: str
labels: JSONB
status: str
created_at: datetime


Expand Down Expand Up @@ -165,14 +167,17 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
try:
payload = await self._queue.get()
message_id = int(payload) # payload is the message id
message_row = await self.read_conn.fetch_row(
SELECT_MESSAGE_QUERY.format(self.table_name),
[message_id],
)
# ugly type hacks b/c SingleQueryResult.as_class return type is wrong
try:
async with self.write_pool.acquire() as conn:
claimed_message = await conn.fetch_row(
CLAIM_MESSAGE_QUERY.format(self.table_name),
[message_id],
)
except ConnectionExecuteError: # message was claimed by another worker
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching the broad ConnectionExecuteError exception could mask other database connection issues unrelated to message claiming. Consider catching a more specific exception or adding additional error handling to distinguish between claim conflicts and genuine connection problems."

Suggested change
except ConnectionExecuteError: # message was claimed by another worker
except ConnectionExecuteError as exc: # message was claimed by another worker or other connection issue
# Check if the error is due to a claim conflict (e.g., unique violation)
# Adjust the condition below to match your DB's claim conflict error code/message
if hasattr(exc, "pgcode") and exc.pgcode == "23505": # unique_violation
# Message was claimed by another worker
continue
logger.exception("Database connection error while claiming message")

Copilot uses AI. Check for mistakes.
continue
message_row_result = tp.cast(
"MessageRow",
tp.cast("object", message_row.as_class(MessageRow)),
tp.cast("object", claimed_message.as_class(MessageRow)),
)
message_data = message_row_result.message.encode()

Expand Down
3 changes: 2 additions & 1 deletion src/taskiq_pg/psqlpy/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
task_name VARCHAR NOT NULL,
message TEXT NOT NULL,
labels JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
"""
Expand All @@ -47,6 +48,6 @@
RETURNING id
"""

SELECT_MESSAGE_QUERY = "SELECT * FROM {} WHERE id = $1"
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *"

DELETE_MESSAGE_QUERY = "DELETE FROM {} WHERE id = $1"
101 changes: 101 additions & 0 deletions tests/integration/test_broker_single_delivery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

import asyncio
import typing as tp
import uuid
from contextlib import suppress

import asyncpg
import pytest
from taskiq import BrokerMessage

from taskiq_pg.asyncpg import AsyncpgBroker
from taskiq_pg.psqlpy import PSQLPyBroker


@pytest.mark.integration
@pytest.mark.parametrize(
"broker_class",
[
AsyncpgBroker,
PSQLPyBroker,
],
)
async def test_when_two_workers_listen__then_single_message_processed_once(
pg_dsn: str,
broker_class: type[AsyncpgBroker | PSQLPyBroker],
) -> None:
# Given: уникальные имена таблицы и канала, два брокера, одна задача
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."

Copilot uses AI. Check for mistakes.
table_name: str = f"taskiq_messages_{uuid.uuid4().hex}"
channel_name: str = f"taskiq_channel_{uuid.uuid4().hex}"
task_id: str = uuid.uuid4().hex

broker1 = broker_class(dsn=pg_dsn, table_name=table_name, channel_name=channel_name)
broker2 = broker_class(dsn=pg_dsn, table_name=table_name, channel_name=channel_name)

# Подключение для проверок состояния в таблице
conn: asyncpg.Connection = await asyncpg.connect(dsn=pg_dsn)

# Сообщение для публикации
Comment on lines +36 to +39
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."

Copilot uses AI. Check for mistakes.
message: BrokerMessage = BrokerMessage(
task_id=task_id,
task_name="example:best_task_ever",
message=b'{"hello":"world"}',
labels={},
)

# When: стартуем брокеры и два слушателя, публикуем одно сообщение
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."

Copilot uses AI. Check for mistakes.
await broker1.startup()
await broker2.startup()

agen1 = broker1.listen()
agen2 = broker2.listen()

# Запускаем ожидание первого сообщения у обоих слушателей до публикации,
# чтобы оба гарантированно получили NOTIFY.
Comment on lines +54 to +55
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."

Copilot uses AI. Check for mistakes.
t1: asyncio.Task = asyncio.create_task(agen1.__anext__())
t2: asyncio.Task = asyncio.create_task(agen2.__anext__())

try:
await broker1.kick(message)

done, _ = await asyncio.wait(
{t1, t2},
timeout=5.0,
return_when=asyncio.FIRST_COMPLETED,
)

# Then: только один слушатель получает сообщение
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."

Copilot uses AI. Check for mistakes.
assert len(done) == 1, "Ровно один воркер должен получить сообщение"
winner_task: asyncio.Task = next(iter(done))
ack_message = tp.cast("tp.Any", winner_task.result())

# До подтверждения проверяем, что статус в таблице = 'processing'
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."

Copilot uses AI. Check for mistakes.
row = await conn.fetchrow(
f"SELECT id, status FROM {table_name} WHERE task_id = $1",
task_id,
)
assert row is not None, "Сообщение должно существовать в таблице"
assert row["status"] == "processing", "Сообщение должно быть помечено как processing после claim"

# Подтверждаем обработку победившим воркером
await ack_message.ack()

# И проверяем, что запись удалена
cnt: int = tp.cast(
"int",
await conn.fetchval(
f"SELECT COUNT(*) FROM {table_name} WHERE task_id = $1",
task_id,
),
)
assert cnt == 0, "Запись должна быть удалена после ack"
finally:
with suppress(Exception):
await broker1.shutdown()
await broker2.shutdown()

try:
await conn.execute(f"DROP TABLE IF EXISTS {table_name}")
finally:
await conn.close()
Loading