Skip to content

Commit 1117c67

Browse files
committed
feat: add schedule source
1 parent 0b01713 commit 1117c67

File tree

16 files changed

+777
-120
lines changed

16 files changed

+777
-120
lines changed

README.md

Lines changed: 94 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,70 +9,125 @@
99

1010
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.
1111

12-
See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/).
12+
See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/) or [examples directory](https://github.com/danfimov/taskiq-postgres/examples).
1313

1414
## Installation
1515

16-
Depend on your preferred PostgreSQL driver, you can install this library:
16+
Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:
1717

18-
=== "asyncpg"
18+
```bash
19+
# with asyncpg
20+
pip install taskiq-postgres[asyncpg]
1921

20-
```bash
21-
pip install taskiq-postgres[asyncpg]
22-
```
22+
# with psqlpy
23+
pip install taskiq-postgres[psqlpy]
2324

24-
=== "psqlpy"
25+
# with aiopg
26+
pip install taskiq-postgres[aiopg]
27+
```
2528

26-
```bash
27-
pip install taskiq-postgres[psqlpy]
28-
```
29+
## Quick start
2930

30-
=== "aiopg"
31+
### Basic task processing
3132

32-
```bash
33-
pip install taskiq-postgres[aiopg]
34-
```
33+
1. Define your broker with [asyncpg](https://github.com/MagicStack/asyncpg):
3534

35+
```python
36+
# broker_example.py
37+
import asyncio
38+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
3639

37-
## Usage example
3840

39-
Simple example of usage with [asyncpg](https://github.com/MagicStack/asyncpg):
41+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
42+
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
4043

41-
```python
42-
# broker.py
43-
import asyncio
4444

45-
from taskiq_pg.asyncpg import AsyncpgResultBackend, AsyncpgBroker
45+
@broker.task("solve_all_problems")
46+
async def best_task_ever() -> None:
47+
"""Solve all problems in the world."""
48+
await asyncio.sleep(2)
49+
print("All problems are solved!")
4650

47-
result_backend = AsyncpgResultBackend(
48-
dsn="postgres://postgres:postgres@localhost:5432/postgres",
49-
)
5051

51-
broker = AsyncpgBroker(
52-
dsn="postgres://postgres:postgres@localhost:5432/postgres",
53-
).with_result_backend(result_backend)
52+
async def main():
53+
await broker.startup()
54+
task = await best_task_ever.kiq()
55+
print(await task.wait_result())
56+
await broker.shutdown()
5457

5558

56-
@broker.task
57-
async def best_task_ever() -> None:
58-
"""Solve all problems in the world."""
59-
await asyncio.sleep(5.5)
60-
print("All problems are solved!")
59+
if __name__ == "__main__":
60+
asyncio.run(main())
61+
```
6162

63+
2. Start a worker to process tasks (by default taskiq runs two instances of worker):
6264

63-
async def main():
64-
await broker.startup()
65-
task = await best_task_ever.kiq()
66-
print(await task.wait_result())
67-
await broker.shutdown()
65+
```bash
66+
taskiq worker broker_example:broker
67+
```
6868

69+
3. Run `broker_example.py` file to send a task to the worker:
6970

70-
if __name__ == "__main__":
71-
asyncio.run(main())
72-
```
71+
```bash
72+
python broker_example.py
73+
```
7374

7475
Your experience with other drivers will be pretty similar. Just change the import statement and that's it.
7576

77+
### Task scheduling
78+
79+
1. Define your broker and schedule source:
80+
81+
```python
82+
# scheduler_example.py
83+
import asyncio
84+
from taskiq import TaskiqScheduler
85+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource
86+
87+
88+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
89+
broker = AsyncpgBroker(dsn)
90+
scheduler = TaskiqScheduler(
91+
broker=broker,
92+
sources=[AsyncpgScheduleSource(
93+
dsn=dsn,
94+
broker=broker,
95+
)],
96+
)
97+
98+
99+
@broker.task(
100+
task_name="solve_all_problems",
101+
schedule=[
102+
{
103+
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
104+
"cron_offset": None, # type: str | timedelta | None, can be omitted.
105+
"time": None, # type: datetime | None, either cron or time should be specified.
106+
"args": [], # type list[Any] | None, can be omitted.
107+
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
108+
"labels": {}, # type: dict[str, Any] | None, can be omitted.
109+
},
110+
],
111+
)
112+
async def best_task_ever() -> None:
113+
"""Solve all problems in the world."""
114+
await asyncio.sleep(2)
115+
print("All problems are solved!")
116+
117+
```
118+
119+
2. Start worker processes:
120+
121+
```bash
122+
taskiq worker scheduler_example:broker
123+
```
124+
125+
3. Run scheduler process:
126+
127+
```bash
128+
taskiq scheduler scheduler_example:scheduler
129+
```
130+
76131
## Motivation
77132

78133
There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality.

docs/contributing.md

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,37 @@
11
---
2-
title: Contributing
2+
title: Contributing and Development
33
---
4+
5+
## Development
6+
7+
This project uses modern Python development tools:
8+
9+
- [uv](https://github.com/astral-sh/uv) — fast Python package installer and resolver
10+
- [ruff](https://github.com/astral-sh/ruff) — extremely fast Python linter and formatter
11+
12+
### Setup Development Environment
13+
14+
```bash
15+
# Clone the repository
16+
git clone https://github.com/danfimov/taskiq-postgres.git
17+
cd taskiq-postgres
18+
19+
# Create a virtual environment (optional but recommended)
20+
make venv
21+
22+
# Install dependencies
23+
make init
24+
```
25+
26+
You can see other useful commands by running `make help`.
27+
28+
29+
## Contributing
30+
31+
Contributions are welcome! Please:
32+
33+
1. Fork the repository
34+
2. Create a feature branch
35+
3. Add tests for new functionality
36+
4. Ensure all tests pass
37+
5. Submit a pull request

0 commit comments

Comments
 (0)