|
9 | 9 |
|
10 | 10 | PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers. |
11 | 11 |
|
12 | | -See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/). |
| 12 | +See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/) or [examples directory](https://github.com/danfimov/taskiq-postgres/examples). |
13 | 13 |
|
14 | 14 | ## Installation |
15 | 15 |
|
16 | | -Depend on your preferred PostgreSQL driver, you can install this library: |
| 16 | +Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra: |
17 | 17 |
|
18 | | -=== "asyncpg" |
| 18 | +```bash |
| 19 | +# with asyncpg |
| 20 | +pip install taskiq-postgres[asyncpg] |
19 | 21 |
|
20 | | - ```bash |
21 | | - pip install taskiq-postgres[asyncpg] |
22 | | - ``` |
| 22 | +# with psqlpy |
| 23 | +pip install taskiq-postgres[psqlpy] |
23 | 24 |
|
24 | | -=== "psqlpy" |
| 25 | +# with aiopg |
| 26 | +pip install taskiq-postgres[aiopg] |
| 27 | +``` |
25 | 28 |
|
26 | | - ```bash |
27 | | - pip install taskiq-postgres[psqlpy] |
28 | | - ``` |
| 29 | +## Quick start |
29 | 30 |
|
30 | | -=== "aiopg" |
| 31 | +### Basic task processing |
31 | 32 |
|
32 | | - ```bash |
33 | | - pip install taskiq-postgres[aiopg] |
34 | | - ``` |
| 33 | +1. Define your broker with [asyncpg](https://github.com/MagicStack/asyncpg): |
35 | 34 |
|
| 35 | + ```python |
| 36 | + # broker_example.py |
| 37 | + import asyncio |
| 38 | + from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend |
36 | 39 |
|
37 | | -## Usage example |
38 | 40 |
|
39 | | -Simple example of usage with [asyncpg](https://github.com/MagicStack/asyncpg): |
| 41 | + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" |
| 42 | + broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn)) |
40 | 43 |
|
41 | | -```python |
42 | | -# broker.py |
43 | | -import asyncio |
44 | 44 |
|
45 | | -from taskiq_pg.asyncpg import AsyncpgResultBackend, AsyncpgBroker |
| 45 | + @broker.task("solve_all_problems") |
| 46 | + async def best_task_ever() -> None: |
| 47 | + """Solve all problems in the world.""" |
| 48 | + await asyncio.sleep(2) |
| 49 | + print("All problems are solved!") |
46 | 50 |
|
47 | | -result_backend = AsyncpgResultBackend( |
48 | | - dsn="postgres://postgres:postgres@localhost:5432/postgres", |
49 | | -) |
50 | 51 |
|
51 | | -broker = AsyncpgBroker( |
52 | | - dsn="postgres://postgres:postgres@localhost:5432/postgres", |
53 | | -).with_result_backend(result_backend) |
| 52 | + async def main(): |
| 53 | + await broker.startup() |
| 54 | + task = await best_task_ever.kiq() |
| 55 | + print(await task.wait_result()) |
| 56 | + await broker.shutdown() |
54 | 57 |
|
55 | 58 |
|
56 | | -@broker.task |
57 | | -async def best_task_ever() -> None: |
58 | | - """Solve all problems in the world.""" |
59 | | - await asyncio.sleep(5.5) |
60 | | - print("All problems are solved!") |
| 59 | + if __name__ == "__main__": |
| 60 | + asyncio.run(main()) |
| 61 | + ``` |
61 | 62 |
|
| 63 | +2. Start a worker to process tasks (by default taskiq runs two instances of worker): |
62 | 64 |
|
63 | | -async def main(): |
64 | | - await broker.startup() |
65 | | - task = await best_task_ever.kiq() |
66 | | - print(await task.wait_result()) |
67 | | - await broker.shutdown() |
| 65 | + ```bash |
| 66 | + taskiq worker broker_example:broker |
| 67 | + ``` |
68 | 68 |
|
| 69 | +3. Run `broker_example.py` file to send a task to the worker: |
69 | 70 |
|
70 | | -if __name__ == "__main__": |
71 | | - asyncio.run(main()) |
72 | | -``` |
| 71 | + ```bash |
| 72 | + python broker_example.py |
| 73 | + ``` |
73 | 74 |
|
74 | 75 | Your experience with other drivers will be pretty similar. Just change the import statement and that's it. |
75 | 76 |
|
| 77 | +### Task scheduling |
| 78 | + |
| 79 | +1. Define your broker and schedule source: |
| 80 | + |
| 81 | + ```python |
| 82 | + # scheduler_example.py |
| 83 | + import asyncio |
| 84 | + from taskiq import TaskiqScheduler |
| 85 | + from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource |
| 86 | + |
| 87 | + |
| 88 | + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" |
| 89 | + broker = AsyncpgBroker(dsn) |
| 90 | + scheduler = TaskiqScheduler( |
| 91 | + broker=broker, |
| 92 | + sources=[AsyncpgScheduleSource( |
| 93 | + dsn=dsn, |
| 94 | + broker=broker, |
| 95 | + )], |
| 96 | + ) |
| 97 | + |
| 98 | + |
| 99 | + @broker.task( |
| 100 | + task_name="solve_all_problems", |
| 101 | + schedule=[ |
| 102 | + { |
| 103 | + "cron": "*/1 * * * *", # type: str, either cron or time should be specified. |
| 104 | + "cron_offset": None, # type: str | timedelta | None, can be omitted. |
| 105 | + "time": None, # type: datetime | None, either cron or time should be specified. |
| 106 | + "args": [], # type list[Any] | None, can be omitted. |
| 107 | + "kwargs": {}, # type: dict[str, Any] | None, can be omitted. |
| 108 | + "labels": {}, # type: dict[str, Any] | None, can be omitted. |
| 109 | + }, |
| 110 | + ], |
| 111 | + ) |
| 112 | + async def best_task_ever() -> None: |
| 113 | + """Solve all problems in the world.""" |
| 114 | + await asyncio.sleep(2) |
| 115 | + print("All problems are solved!") |
| 116 | + |
| 117 | + ``` |
| 118 | + |
| 119 | +2. Start worker processes: |
| 120 | + |
| 121 | + ```bash |
| 122 | + taskiq worker scheduler_example:broker |
| 123 | + ``` |
| 124 | + |
| 125 | +3. Run scheduler process: |
| 126 | + |
| 127 | + ```bash |
| 128 | + taskiq scheduler scheduler_example:scheduler |
| 129 | + ``` |
| 130 | + |
76 | 131 | ## Motivation |
77 | 132 |
|
78 | 133 | There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality. |
|
0 commit comments