Skip to content

Commit cdaf8d1

Browse files
committed
working delayed route and scheduled route
1 parent f2757fc commit cdaf8d1

File tree

9 files changed

+160
-99
lines changed

9 files changed

+160
-99
lines changed

README.md

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,25 @@ If we host the task worker on Cloud Run, we get autoscaling workers.
3030
In practice, this is what it looks like:
3131

3232
```python
33-
router = APIRouter(route_class=TaskRouteBuilder(...))
33+
delayed_router = APIRouter(route_class=DelayedRouteBuilder(...))
34+
scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...))
3435

3536
class Recipe(BaseModel):
3637
ingredients: List[str]
3738

38-
@router.post("/{restaurant}/make_dinner")
39-
async def make_dinner(restaurant: str, recipe: Recipe,):
39+
@delayed_router.post("/{restaurant}/make_dinner")
40+
async def make_dinner(restaurant: str, recipe: Recipe):
4041
# Do a ton of work here.
4142

42-
app.include_router(router)
43+
@scheduled_router.post("/home_cook")
44+
async def home_cook(recipe: Recipe):
45+
# Make my own food
46+
47+
app.include_router(delayed_router)
48+
app.include_router(scheduled_router)
49+
50+
# If you wan to make your own breakfast every morning at 7AM IST.
51+
home_cook.scheduler(name="test-home-cook-at-7AM-IST", schedule="0 7 * * *", time_zone="Asia/Kolkata").schedule(recipe=Recipe(ingredients=["Milk","Cereal"]))
4352
```
4453

4554
Now we can trigger the task with
@@ -54,11 +63,6 @@ If we want to trigger the task 30 minutes later
5463
make_dinner.options(countdown=1800).delay(...)
5564
```
5665

57-
If we want to schedule it to run every evening at 7PM IST
58-
59-
```python
60-
make_dinner.scheduler(name="test-make-dinner-at-7PM-IST", schedule="0 19 * * *", time_zone="Asia/Kolkata").schedule(...)
61-
```
6266
## Running
6367

6468
### Local
@@ -83,9 +87,9 @@ Forwarding http://feda-49-207-221-153.ngrok.io -> http://loca
8387
```python
8488
# complete file: examples/simple/main.py
8589

86-
# First we construct our TaskRoute class with all relevant settings
90+
# First we construct our DelayedRoute class with all relevant settings
8791
# This can be done once across the entire project
88-
TaskRoute = TaskRouteBuilder(
92+
DelayedRoute = DelayedRouteBuilder(
8993
base_url="http://feda-49-207-221-153.ngrok.io",
9094
queue_path=queue_path(
9195
project="gcp-project-id",
@@ -94,13 +98,12 @@ TaskRoute = TaskRouteBuilder(
9498
),
9599
)
96100

97-
# Wherever we use
98-
task_router = APIRouter(route_class=TaskRoute, prefix="/tasks")
101+
delayed_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")
99102

100103
class Payload(BaseModel):
101104
message: str
102105

103-
@task_router.post("/hello")
106+
@delayed_router.post("/hello")
104107
async def hello(p: Payload = Payload(message="Default")):
105108
logger.warning(f"Hello task ran with payload: {p.message}")
106109

@@ -114,7 +117,7 @@ async def trigger():
114117
hello.delay(p=Payload(message="Triggered task"))
115118
return {"message": "Hello task triggered"}
116119

117-
app.include_router(task_router)
120+
app.include_router(delayed_router)
118121

119122
```
120123

@@ -156,7 +159,7 @@ We'll only edit the parts from above that we need changed from above example.
156159
# URL of the Cloud Run service
157160
base_url = "https://hello-randomchars-el.a.run.app"
158161

159-
TaskRoute = TaskRouteBuilder(
162+
DelayedRoute = DelayedRouteBuilder(
160163
base_url=base_url,
161164
# Task queue, same as above.
162165
queue_path=queue_path(...),
@@ -174,15 +177,15 @@ Check the fleshed out example at [`examples/full/tasks.py`](examples/full/tasks.
174177

175178
## Configuration
176179

177-
### TaskRouteBuilder
180+
### DelayedRouteBuilder
178181

179182
Usage:
180183

181184
```python
182-
TaskRoute = TaskRouteBuilder(...)
183-
task_router = APIRouter(route_class=TaskRoute)
185+
DelayedRoute = DelayedRouteBuilder(...)
186+
delayed_router = APIRouter(route_class=DelayedRoute)
184187

185-
@task_router.get("/simple_task")
188+
@delayed_router.get("/simple_task")
186189
def mySimpleTask():
187190
return {}
188191
```
@@ -197,7 +200,7 @@ def mySimpleTask():
197200

198201
- `client` - If you need to override the Cloud Tasks client, pass the client here. (eg: changing credentials, transport etc)
199202

200-
### Task level default options
203+
#### Task level default options
201204

202205
Usage:
203206

@@ -225,15 +228,15 @@ def mySimpleTask():
225228
return {}
226229
```
227230

228-
### Delayer Options
231+
#### Delayer Options
229232

230233
Usage:
231234

232235
```python
233236
mySimpleTask.options(...).delay()
234237
```
235238

236-
All options from above can be overriden per call (including TaskRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay.
239+
All options from above can be overriden per call (including DelayedRouteBuilder options like `base_url`) with kwargs to the `options` function before calling delay.
237240

238241
Example:
239242

@@ -242,14 +245,31 @@ Example:
242245
mySimpleTask.options(countdown=120).delay()
243246
```
244247

248+
### ScheduledRouteBuilder
249+
250+
Usage:
251+
252+
```python
253+
ScheduledRoute = ScheduledRouteBuilder(...)
254+
scheduled_router = APIRouter(route_class=ScheduledRoute)
255+
256+
@scheduled_router.get("/simple_scheduled_task")
257+
def mySimpleScheduledTask():
258+
return {}
259+
260+
261+
mySimpleScheduledTask.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()
262+
```
263+
264+
245265
## Hooks
246266

247267
We might need to override things in the task being sent to Cloud Tasks. The `pre_create_hook` allows us to do that.
248268

249269
Some hooks are included in the library.
250270

251-
- `oidc_task_hook` / `oidc_scheduler_hook` - Used to pass OIDC token (for Cloud Run etc).
252-
- `deadline_task_hook` / `deadline_scheduler_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
271+
- `oidc_delayed_hook` / `oidc_scheduled_hook` - Used to pass OIDC token (for Cloud Run etc).
272+
- `deadline_delayed_hook` / `deadline_scheduled_hook` - Used to change the timeout for the worker of a task. (PS: this deadline is decided by the sender to the queue and not the worker)
253273
- `chained_hook` - If you need to chain multiple hooks together, you can do that with `chained_hook(hook1, hook2)`
254274

255275
## Future work

examples/full/settings.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
from google.cloud import tasks_v2
77

88
# Imports from this repository
9+
from fastapi_cloud_tasks.utils import location_path
910
from fastapi_cloud_tasks.utils import queue_path
1011

11-
TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="https://example.com")
12+
TASK_LISTENER_BASE_URL = os.getenv("TASK_LISTENER_BASE_URL", default="https://65fb-35-207-241-4.ngrok.io")
1213
TASK_PROJECT_ID = os.getenv("TASK_PROJECT_ID", default="sample-project")
1314
TASK_LOCATION = os.getenv("TASK_LOCATION", default="asia-south1")
15+
SCHEDULED_LOCATION = os.getenv("SCHEDULED_LOCATION", default="us-central1")
1416
TASK_QUEUE = os.getenv("TASK_QUEUE", default="test-queue")
1517

1618
TASK_SERVICE_ACCOUNT = os.getenv(
@@ -24,7 +26,12 @@
2426
queue=TASK_QUEUE,
2527
)
2628

29+
SCHEDULED_LOCATION_PATH = location_path(
30+
project=TASK_PROJECT_ID,
31+
location=SCHEDULED_LOCATION,
32+
)
33+
2734
TASK_OIDC_TOKEN = tasks_v2.OidcToken(service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL)
28-
SCHEDULER_OIDC_TOKEN = scheduler_v1.OidcToken(
35+
SCHEDULED_OIDC_TOKEN = scheduler_v1.OidcToken(
2936
service_account_email=TASK_SERVICE_ACCOUNT, audience=TASK_LISTENER_BASE_URL
3037
)

examples/full/tasks.py

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,55 +8,65 @@
88

99
# Imports from this repository
1010
from examples.full.serializer import Payload
11-
from examples.full.settings import SCHEDULER_OIDC_TOKEN
11+
from examples.full.settings import SCHEDULED_LOCATION_PATH
12+
from examples.full.settings import SCHEDULED_OIDC_TOKEN
1213
from examples.full.settings import TASK_LISTENER_BASE_URL
1314
from examples.full.settings import TASK_OIDC_TOKEN
1415
from examples.full.settings import TASK_QUEUE_PATH
16+
from fastapi_cloud_tasks.delayed_route import DelayedRouteBuilder
1517
from fastapi_cloud_tasks.hooks import chained_hook
16-
from fastapi_cloud_tasks.hooks import deadline_scheduler_hook
17-
from fastapi_cloud_tasks.hooks import deadline_task_hook
18-
from fastapi_cloud_tasks.hooks import oidc_scheduler_hook
19-
from fastapi_cloud_tasks.hooks import oidc_task_hook
20-
from fastapi_cloud_tasks.taskroute import TaskRouteBuilder
18+
from fastapi_cloud_tasks.hooks import deadline_delayed_hook
19+
from fastapi_cloud_tasks.hooks import deadline_scheduled_hook
20+
from fastapi_cloud_tasks.hooks import oidc_delayed_hook
21+
from fastapi_cloud_tasks.hooks import oidc_scheduled_hook
22+
from fastapi_cloud_tasks.scheduled_route import ScheduledRouteBuilder
2123

2224
app = FastAPI()
2325

2426

2527
logger = logging.getLogger("uvicorn")
2628

27-
TaskRoute = TaskRouteBuilder(
29+
DelayedRoute = DelayedRouteBuilder(
2830
base_url=TASK_LISTENER_BASE_URL,
2931
queue_path=TASK_QUEUE_PATH,
3032
# Chain multiple hooks together
3133
pre_create_hook=chained_hook(
3234
# Add service account for cloud run
33-
oidc_task_hook(
35+
oidc_delayed_hook(
3436
token=TASK_OIDC_TOKEN,
3537
),
3638
# Wait for half an hour
37-
deadline_task_hook(duration=duration_pb2.Duration(seconds=1800)),
39+
deadline_delayed_hook(duration=duration_pb2.Duration(seconds=1800)),
3840
),
39-
pre_scheduler_hook=chained_hook(
41+
)
42+
43+
ScheduledRoute = ScheduledRouteBuilder(
44+
base_url=TASK_LISTENER_BASE_URL,
45+
location_path=SCHEDULED_LOCATION_PATH,
46+
pre_create_hook=chained_hook(
4047
# Add service account for cloud run
41-
oidc_scheduler_hook(
42-
token=SCHEDULER_OIDC_TOKEN,
48+
oidc_scheduled_hook(
49+
token=SCHEDULED_OIDC_TOKEN,
4350
),
4451
# Wait for half an hour
45-
deadline_scheduler_hook(duration=duration_pb2.Duration(seconds=1800)),
52+
deadline_scheduled_hook(duration=duration_pb2.Duration(seconds=1800)),
4653
),
4754
)
4855

49-
router = APIRouter(route_class=TaskRoute, prefix="/tasks")
56+
task_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")
5057

5158

52-
@router.post("/hello")
59+
@task_router.post("/hello")
5360
async def hello(p: Payload = Payload(message="Default")):
5461
message = f"Hello task ran with payload: {p.message}"
5562
logger.warning(message)
5663
return {"message": message}
5764

5865

59-
@router.post("/timed_hello")
66+
scheduled_router = APIRouter(route_class=ScheduledRoute, prefix="/scheduled")
67+
68+
69+
@scheduled_router.post("/timed_hello")
6070
async def scheduled_hello(p: Payload = Payload(message="Default")):
6171
message = f"Scheduled hello task ran with payload: {p.message}"
6272
logger.warning(message)
@@ -65,8 +75,9 @@ async def scheduled_hello(p: Payload = Payload(message="Default")):
6575

6676
scheduled_hello.scheduler(
6777
name="testing-examples-scheduled-hello",
68-
schedule="*/5 * * * *",
78+
schedule="* * * * *",
6979
time_zone="Asia/Kolkata",
7080
).schedule(p=Payload(message="Scheduled"))
7181

72-
app.include_router(router)
82+
app.include_router(task_router)
83+
app.include_router(scheduled_router)

examples/simple/main.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
from pydantic import BaseModel
1111

1212
# Imports from this repository
13-
from fastapi_cloud_tasks.taskroute import TaskRouteBuilder
13+
from fastapi_cloud_tasks.delayed_route import DelayedRouteBuilder
1414
from fastapi_cloud_tasks.utils import queue_path
1515

16-
TaskRoute = TaskRouteBuilder(
16+
DelayedRoute = DelayedRouteBuilder(
1717
# Base URL where the task server will get hosted
1818
base_url=os.getenv("TASK_LISTENER_BASE_URL", default="https://d860-35-208-83-220.ngrok.io"),
1919
# Full queue path to which we'll send tasks.
@@ -25,7 +25,7 @@
2525
),
2626
)
2727

28-
task_router = APIRouter(route_class=TaskRoute, prefix="/tasks")
28+
task_router = APIRouter(route_class=DelayedRoute, prefix="/tasks")
2929

3030
logger = logging.getLogger("uvicorn")
3131

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,46 +8,31 @@
88

99
# Imports from this repository
1010
from fastapi_cloud_tasks.delayer import Delayer
11-
from fastapi_cloud_tasks.hooks import SchedulerHook
12-
from fastapi_cloud_tasks.hooks import TaskHook
11+
from fastapi_cloud_tasks.hooks import DelayedTaskHook
1312
from fastapi_cloud_tasks.hooks import noop_hook
1413
from fastapi_cloud_tasks.scheduler import Scheduler
1514

1615

17-
def TaskRouteBuilder(
16+
def DelayedRouteBuilder(
1817
*,
1918
base_url: str,
2019
queue_path: str,
2120
task_create_timeout: float = 10.0,
22-
schedule_create_timeout: float = 10.0,
23-
pre_create_hook: TaskHook = None,
24-
pre_scheduler_hook: SchedulerHook = None,
21+
pre_create_hook: DelayedTaskHook = None,
2522
client=None,
2623
scheduler_client=None,
2724
):
2825
if client is None:
2926
client = tasks_v2.CloudTasksClient()
3027

31-
if scheduler_client is None:
32-
scheduler_client = scheduler_v1.CloudSchedulerClient()
33-
3428
if pre_create_hook is None:
3529
pre_create_hook = noop_hook
3630

37-
if pre_scheduler_hook is None:
38-
pre_scheduler_hook = noop_hook
39-
40-
q_path = client.parse_queue_path(queue_path)
41-
default_location_path = scheduler_client.common_location_path(
42-
project=q_path["project"], location=q_path["location"]
43-
)
44-
4531
class TaskRouteMixin(APIRoute):
4632
def get_route_handler(self) -> Callable:
4733
original_route_handler = super().get_route_handler()
4834
self.endpoint.options = self.delayOptions
4935
self.endpoint.delay = self.delay
50-
self.endpoint.scheduler = self.schedulerOptions
5136
return original_route_handler
5237

5338
def delayOptions(self, **options) -> Delayer:
@@ -70,19 +55,4 @@ def delayOptions(self, **options) -> Delayer:
7055
def delay(self, **kwargs):
7156
return self.delayOptions().delay(**kwargs)
7257

73-
def schedulerOptions(self, *, name, schedule, **options) -> Scheduler:
74-
schedulerOpts = dict(
75-
base_url=base_url,
76-
location_path=default_location_path,
77-
client=scheduler_client,
78-
pre_scheduler_hook=pre_scheduler_hook,
79-
schedule_create_timeout=schedule_create_timeout,
80-
name=name,
81-
schedule=schedule,
82-
)
83-
84-
schedulerOpts.update(options)
85-
86-
return Scheduler(route=self, **schedulerOpts)
87-
8858
return TaskRouteMixin

0 commit comments

Comments
 (0)