Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions docs/advanced-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,138 @@ async def process_single_order(order_id: int) -> None:

This pattern separates discovery (finding work) from execution (doing work), allowing for better load distribution and fault isolation. The perpetual task stays lightweight and fast, while the actual work is distributed across many workers.

## Task Scattering with Agenda

For "find-and-flood" workloads, you often want to distribute a batch of tasks over time rather than scheduling them all immediately. The `Agenda` class collects related tasks and scatters them evenly across a time window.

### Basic Scattering

```python
from datetime import timedelta
from docket import Agenda, Docket

async def process_item(item_id: int) -> None:
await perform_expensive_operation(item_id)
await update_database(item_id)

async with Docket() as docket:
# Build an agenda of tasks
agenda = Agenda()
for item_id in range(1, 101): # 100 items to process
agenda.add(process_item)(item_id)

# Scatter them evenly over 50 minutes to avoid overwhelming the system
executions = await agenda.scatter(docket, over=timedelta(minutes=50))
print(f"Scheduled {len(executions)} tasks over 50 minutes")
```

Tasks are distributed evenly across the time window. For 100 tasks over 50 minutes, they'll be scheduled approximately 30 seconds apart.

### Jitter for Thundering Herd Prevention

Add random jitter to prevent multiple processes from scheduling identical work at exactly the same times:

```python
# Scatter with ±30 second jitter around each scheduled time
await agenda.scatter(
docket,
over=timedelta(minutes=50),
jitter=timedelta(seconds=30)
)
```

### Future Scatter Windows

Schedule the entire batch to start at a specific time in the future:

```python
from datetime import datetime, timezone

# Start scattering in 2 hours, spread over 30 minutes
start_time = datetime.now(timezone.utc) + timedelta(hours=2)
await agenda.scatter(
docket,
start=start_time,
over=timedelta(minutes=30)
)
```

### Mixed Task Types

Agendas can contain different types of tasks:

```python
async def send_email(user_id: str, template: str) -> None:
await email_service.send(user_id, template)

async def update_analytics(event_data: dict[str, str]) -> None:
await analytics_service.track(event_data)

# Create a mixed agenda
agenda = Agenda()
agenda.add(process_item)(item_id=1001)
agenda.add(send_email)("user123", "welcome")
agenda.add(update_analytics)({"event": "signup", "user": "user123"})
agenda.add(process_item)(item_id=1002)

# All tasks will be scattered in the order they were added
await agenda.scatter(docket, over=timedelta(minutes=10))
```

### Single Task Positioning

When scattering a single task, it's positioned at the midpoint of the time window:

```python
agenda = Agenda()
agenda.add(process_item)(item_id=42)

# This task will be scheduled 5 minutes from now (middle of 10-minute window)
await agenda.scatter(docket, over=timedelta(minutes=10))
```

### Agenda Reusability

Agendas can be reused for multiple scatter operations:

```python
# Create a reusable template
daily_cleanup_agenda = Agenda()
daily_cleanup_agenda.add(cleanup_temp_files)()
daily_cleanup_agenda.add(compress_old_logs)()
daily_cleanup_agenda.add(update_metrics)()

# Use it multiple times with different timing
await daily_cleanup_agenda.scatter(docket, over=timedelta(hours=1))

# Later, scatter the same tasks over a different window
tomorrow = datetime.now(timezone.utc) + timedelta(days=1)
await daily_cleanup_agenda.scatter(
docket,
start=tomorrow,
over=timedelta(minutes=30)
)
```

### Failure Behavior

Keep in mind that, if an error occurs during scheduling, some tasks may have already been scheduled successfully:

```python
agenda = Agenda()
agenda.add(valid_task)("arg1")
agenda.add(valid_task)("arg2")
agenda.add("nonexistent_task")("arg3") # This will cause an error
agenda.add(valid_task)("arg4")

try:
await agenda.scatter(docket, over=timedelta(minutes=10))
except KeyError:
# The first two tasks were scheduled successfully
# The error prevented the fourth task from being scheduled
pass
```

## Striking and Restoring Tasks

Striking allows you to temporarily disable tasks without redeploying code. This is invaluable for incident response, gradual rollouts, or handling problematic customers.
Expand Down
128 changes: 128 additions & 0 deletions examples/agenda_scatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python
"""
Example demonstrating the Agenda scatter functionality for rate-limited workloads.

This example shows a real-world scenario: sending bulk notifications while respecting
rate limits to avoid overwhelming your notification service or triggering spam filters.

Without scatter: All 26 notifications would try to send immediately, potentially:
- Overwhelming your notification service
- Triggering rate limits or spam detection
- Creating a poor user experience with delayed/failed sends

With scatter: Notifications are distributed evenly over time, respecting limits.
"""

import asyncio
import logging
from datetime import datetime, timedelta, timezone

from docket import Agenda, CurrentExecution, Docket, Execution, Worker

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


async def send_notification(
user: str, message: str, execution: Execution = CurrentExecution()
) -> None:
"""Send a notification to a user."""
delay = (execution.when - datetime.now(timezone.utc)).total_seconds()
if delay > 0.1:
logger.info(f"📅 Notification for {user} scheduled {delay:.1f}s from now")
else:
logger.info(f"📧 Sending to {user}: '{message}'")
# Simulate API call to notification service
await asyncio.sleep(0.2)
logger.info(f"✓ Delivered to {user}")


async def main() -> None:
"""Demonstrate scatter for rate-limited notification sending."""

async with Docket(name="notification-scatter") as docket:
docket.register(send_notification)

logger.info("=== Bulk Notification Campaign ===")
logger.info("Scenario: Alert 26 users about a flash sale")
logger.info("Constraint: Notification service allows max 30 messages/minute")
logger.info("Strategy: Scatter over 60 seconds (~1 message every 2.3 seconds)")
logger.info("")

# Build the list of users to notify (e.g., from a database query)
users = [
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
]

agenda = Agenda()

# Queue all notifications
logger.info(f"📋 Preparing notifications for {len(users)} users...")
for user in users:
agenda.add(send_notification)(user, "Flash Sale: 50% off for next hour!")

# Scatter over 60 seconds to respect rate limit
logger.info("🎯 Scattering notifications over 60 seconds...")
logger.info("")

executions = await agenda.scatter(
docket,
over=timedelta(seconds=60),
jitter=timedelta(seconds=0.5), # Small jitter for natural spacing
)

# Show the distribution preview
first_three = executions[:3]
last_three = executions[-3:]
for i, exec in enumerate(first_three, 1):
delay = (exec.when - datetime.now(timezone.utc)).total_seconds()
logger.info(f" Message #{i} scheduled for +{delay:.1f}s")
logger.info(f" ... {len(executions) - 6} more evenly distributed ...")
for i, exec in enumerate(last_three, len(executions) - 2):
delay = (exec.when - datetime.now(timezone.utc)).total_seconds()
logger.info(f" Message #{i} scheduled for +{delay:.1f}s")
logger.info("")

# Run worker to process the scattered notifications
logger.info("🚀 Starting notification sender...")
logger.info(" Watch how notifications flow steadily, not in a flood!")
logger.info("")

start_time = datetime.now(timezone.utc)
async with Worker(docket, concurrency=2) as worker:
await worker.run_until_finished()

elapsed = (datetime.now(timezone.utc) - start_time).total_seconds()
logger.info("")
logger.info(f"✅ All {len(users)} notifications sent in {elapsed:.1f} seconds")


if __name__ == "__main__":
asyncio.run(main())
2 changes: 2 additions & 0 deletions src/docket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

__version__ = version("pydocket")

from .agenda import Agenda
from .annotations import Logged
from .dependencies import (
ConcurrencyLimit,
Expand All @@ -29,6 +30,7 @@

__all__ = [
"__version__",
"Agenda",
"ConcurrencyLimit",
"CurrentDocket",
"CurrentExecution",
Expand Down
Loading