Skip to content

Commit c870142

Browse files
authored
feat: add psycopg3 support (#19)
2 parents e1bf60f + 4767bf8 commit c870142

19 files changed

+1171
-333
lines changed

README.md

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,18 @@
77
<hr/>
88
</div>
99

10-
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.
10+
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy, psycopg and aiopg drivers.
1111

12-
See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/) or [examples directory](https://github.com/danfimov/taskiq-postgres/examples).
12+
## Features
13+
14+
- **PostgreSQL Broker** - high-performance message broker using PostgreSQL LISTEN/NOTIFY;
15+
- **Result Backend** - persistent task result storage with configurable retention;
16+
- **Scheduler Source** - cron-like task scheduling with PostgreSQL persistence;
17+
- **Multiple Drivers** - support for asyncpg, psycopg3, psqlpy and aiopg;
18+
- **Flexible Configuration** - customizable table names, field types, and connection options;
19+
- **Multiple Serializers** - support for different serialization methods (Pickle, JSON, etc.).
20+
21+
See usage guide in [documentation](https://danfimov.github.io/taskiq-postgres/) or explore examples in [separate directory](https://github.com/danfimov/taskiq-postgres/examples).
1322

1423
## Installation
1524

@@ -22,6 +31,9 @@ pip install taskiq-postgres[asyncpg]
2231
# with psqlpy
2332
pip install taskiq-postgres[psqlpy]
2433

34+
# with psycopg3
35+
pip install taskiq-postgres[psycopg]
36+
2537
# with aiopg
2638
pip install taskiq-postgres[aiopg]
2739
```
@@ -101,7 +113,7 @@ Your experience with other drivers will be pretty similar. Just change the impor
101113
schedule=[
102114
{
103115
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
104-
"cron_offset": None, # type: str | timedelta | None, can be omitted.
116+
"cron_offset": None, # type: str | None, can be omitted. For example "Europe/Berlin".
105117
"time": None, # type: datetime | None, either cron or time should be specified.
106118
"args": [], # type list[Any] | None, can be omitted.
107119
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.

docs/index.md

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ title: Overview
1111
<hr/>
1212
</div>
1313

14-
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.
14+
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy, psycopg and aiopg drivers.
1515

1616
## Motivation
1717

@@ -22,7 +22,6 @@ To address this issue I created this library with a common interface for most po
2222
- brokers;
2323
- schedule sources.
2424

25-
2625
## Installation
2726

2827
Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:
@@ -39,6 +38,12 @@ Depending on your preferred PostgreSQL driver, you can install this library with
3938
pip install taskiq-postgres[psqlpy]
4039
```
4140

41+
=== "psycopg"
42+
43+
```bash
44+
pip install taskiq-postgres[psycopg]
45+
```
46+
4247
=== "aiopg"
4348

4449
```bash
@@ -93,6 +98,36 @@ Depending on your preferred PostgreSQL driver, you can install this library with
9398
broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn))
9499

95100

101+
@broker.task("solve_all_problems")
102+
async def best_task_ever() -> None:
103+
"""Solve all problems in the world."""
104+
await asyncio.sleep(2)
105+
print("All problems are solved!")
106+
107+
108+
async def main():
109+
await broker.startup()
110+
task = await best_task_ever.kiq()
111+
print(await task.wait_result())
112+
await broker.shutdown()
113+
114+
115+
if __name__ == "__main__":
116+
asyncio.run(main())
117+
```
118+
119+
=== "psycopg"
120+
121+
```python
122+
# broker_example.py
123+
import asyncio
124+
from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend
125+
126+
127+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
128+
broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn))
129+
130+
96131
@broker.task("solve_all_problems")
97132
async def best_task_ever() -> None:
98133
"""Solve all problems in the world."""
@@ -184,7 +219,7 @@ Your experience with other drivers will be pretty similar. Just change the impor
184219
schedule=[
185220
{
186221
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
187-
"cron_offset": None, # type: str | timedelta | None, can be omitted.
222+
"cron_offset": None, # type: str | None, can be omitted.
188223
"time": None, # type: datetime | None, either cron or time should be specified.
189224
"args": [], # type list[Any] | None, can be omitted.
190225
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
@@ -223,7 +258,47 @@ Your experience with other drivers will be pretty similar. Just change the impor
223258
schedule=[
224259
{
225260
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
226-
"cron_offset": None, # type: str | timedelta | None, can be omitted.
261+
"cron_offset": None, # type: str | None, can be omitted.
262+
"time": None, # type: datetime | None, either cron or time should be specified.
263+
"args": [], # type list[Any] | None, can be omitted.
264+
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
265+
"labels": {}, # type: dict[str, Any] | None, can be omitted.
266+
},
267+
],
268+
)
269+
async def best_task_ever() -> None:
270+
"""Solve all problems in the world."""
271+
await asyncio.sleep(2)
272+
print("All problems are solved!")
273+
274+
```
275+
276+
=== "psycopg"
277+
278+
```python
279+
# scheduler_example.py
280+
import asyncio
281+
from taskiq import TaskiqScheduler
282+
from taskiq_pg.psycopg import PsycopgBroker, PsycopgScheduleSource
283+
284+
285+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
286+
broker = PsycopgBroker(dsn)
287+
scheduler = TaskiqScheduler(
288+
broker=broker,
289+
sources=[PsycopgScheduleSource(
290+
dsn=dsn,
291+
broker=broker,
292+
)],
293+
)
294+
295+
296+
@broker.task(
297+
task_name="solve_all_problems",
298+
schedule=[
299+
{
300+
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
301+
"cron_offset": None, # type: str | None, can be omitted.
227302
"time": None, # type: datetime | None, either cron or time should be specified.
228303
"args": [], # type list[Any] | None, can be omitted.
229304
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
@@ -263,7 +338,7 @@ Your experience with other drivers will be pretty similar. Just change the impor
263338
schedule=[
264339
{
265340
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
266-
"cron_offset": None, # type: str | timedelta | None, can be omitted.
341+
"cron_offset": None, # type: str | None, can be omitted.
267342
"time": None, # type: datetime | None, either cron or time should be specified.
268343
"args": [], # type list[Any] | None, can be omitted.
269344
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.

examples/example_with_broker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22
How to run:
33
44
1) Run worker in one terminal:
5-
uv run taskiq worker examples.example_with_broker:broker
5+
uv run taskiq worker examples.example_with_broker:broker --workers 1
66
77
2) Run this script in another terminal:
88
uv run python -m examples.example_with_broker
99
"""
1010

1111
import asyncio
1212

13-
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
13+
from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend
1414

1515

1616
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
17-
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
17+
broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn))
1818

1919

2020
@broker.task("solve_all_problems")

examples/example_with_schedule_source.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
How to run:
33
44
1) Run worker in one terminal:
5-
uv run taskiq worker examples.example_with_schedule_source:broker
5+
uv run taskiq worker examples.example_with_schedule_source:broker --workers 1
66
77
2) Run scheduler in another terminal:
88
uv run taskiq scheduler examples.example_with_schedule_source:scheduler
@@ -12,15 +12,15 @@
1212

1313
from taskiq import TaskiqScheduler
1414

15-
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource
15+
from taskiq_pg.psycopg import PsycopgBroker, PsycopgScheduleSource
1616

1717

1818
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
19-
broker = AsyncpgBroker(dsn)
19+
broker = PsycopgBroker(dsn)
2020
scheduler = TaskiqScheduler(
2121
broker=broker,
2222
sources=[
23-
AsyncpgScheduleSource(
23+
PsycopgScheduleSource(
2424
dsn=dsn,
2525
broker=broker,
2626
),

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ asyncpg = [
4848
psqlpy = [
4949
"psqlpy>=0.11.6",
5050
]
51+
psycopg = [
52+
"psycopg[binary,pool]>=3.2.10",
53+
]
5154

5255
[dependency-groups]
5356
dev = [
@@ -60,6 +63,8 @@ dev = [
6063
"pytest>=8.4.2",
6164
"pytest-asyncio>=1.1.0",
6265
"pytest-cov>=7.0.0",
66+
# for database in tests
67+
"sqlalchemy-utils>=0.42.0",
6368
# pre-commit hooks
6469
"prek>=0.2.4",
6570
# docs
@@ -149,6 +154,7 @@ ignore = [
149154
"RUF",
150155

151156
"PLR2004", # magic numbers in tests
157+
"ANN", # missing type annotations in tests
152158
]
153159
"tests/test_linting.py" = [
154160
"S603", # subprocess usage

src/taskiq_pg/asyncpg/broker.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,33 +27,34 @@
2727
class AsyncpgBroker(BasePostgresBroker):
2828
"""Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism."""
2929

30-
read_conn: asyncpg.Connection[asyncpg.Record] | None = None
31-
write_pool: asyncpg.pool.Pool[asyncpg.Record] | None = None
30+
_read_conn: asyncpg.Connection[asyncpg.Record] | None = None
31+
_write_pool: asyncpg.pool.Pool[asyncpg.Record] | None = None
3232

3333
async def startup(self) -> None:
3434
"""Initialize the broker."""
3535
await super().startup()
3636

37-
self.read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs)
38-
self.write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs)
37+
self._read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs)
38+
self._write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs)
3939

40-
if self.read_conn is None:
41-
msg = "read_conn not initialized"
40+
if self._read_conn is None:
41+
msg = "_read_conn not initialized"
4242
raise RuntimeError(msg)
4343

44-
async with self.write_pool.acquire() as conn:
45-
_ = await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name))
44+
async with self._write_pool.acquire() as conn:
45+
await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name))
4646

47-
await self.read_conn.add_listener(self.channel_name, self._notification_handler)
47+
await self._read_conn.add_listener(self.channel_name, self._notification_handler)
4848
self._queue = asyncio.Queue()
4949

5050
async def shutdown(self) -> None:
5151
"""Close all connections on shutdown."""
5252
await super().shutdown()
53-
if self.read_conn is not None:
54-
await self.read_conn.close()
55-
if self.write_pool is not None:
56-
await self.write_pool.close()
53+
if self._read_conn is not None:
54+
await self._read_conn.remove_listener(self.channel_name, self._notification_handler)
55+
await self._read_conn.close()
56+
if self._write_pool is not None:
57+
await self._write_pool.close()
5758

5859
def _notification_handler(
5960
self,
@@ -85,11 +86,11 @@ async def kick(self, message: BrokerMessage) -> None:
8586
8687
:param message: Message to send.
8788
"""
88-
if self.write_pool is None:
89+
if self._write_pool is None:
8990
msg = "Please run startup before kicking."
9091
raise ValueError(msg)
9192

92-
async with self.write_pool.acquire() as conn:
93+
async with self._write_pool.acquire() as conn:
9394
# Insert the message into the database
9495
message_inserted_id = tp.cast(
9596
"int",
@@ -117,9 +118,9 @@ async def kick(self, message: BrokerMessage) -> None:
117118
async def _schedule_notification(self, message_id: int, delay_seconds: int) -> None:
118119
"""Schedule a notification to be sent after a delay."""
119120
await asyncio.sleep(delay_seconds)
120-
if self.write_pool is None:
121+
if self._write_pool is None:
121122
return
122-
async with self.write_pool.acquire() as conn:
123+
async with self._write_pool.acquire() as conn:
123124
# Send NOTIFY
124125
_ = await conn.execute(f"NOTIFY {self.channel_name}, '{message_id}'")
125126

@@ -131,7 +132,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
131132
132133
:yields: AckableMessage instances.
133134
"""
134-
if self.write_pool is None:
135+
if self._write_pool is None:
135136
msg = "Call startup before starting listening."
136137
raise ValueError(msg)
137138
if self._queue is None:
@@ -142,7 +143,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
142143
try:
143144
payload = await self._queue.get()
144145
message_id = int(payload)
145-
async with self.write_pool.acquire() as conn:
146+
async with self._write_pool.acquire() as conn:
146147
claimed = await conn.fetchrow(
147148
CLAIM_MESSAGE_QUERY.format(self.table_name),
148149
message_id,
@@ -156,11 +157,11 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
156157
message_data = message_str.encode()
157158

158159
async def ack(*, _message_id: int = message_id) -> None:
159-
if self.write_pool is None:
160+
if self._write_pool is None:
160161
msg = "Call startup before starting listening."
161162
raise ValueError(msg)
162163

163-
async with self.write_pool.acquire() as conn:
164+
async with self._write_pool.acquire() as conn:
164165
_ = await conn.execute(
165166
DELETE_MESSAGE_QUERY.format(self.table_name),
166167
_message_id,

0 commit comments

Comments
 (0)