Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
72a0ded
Initial implementation
desertaxle Oct 31, 2025
c3e5c50
Add pub/sub events for worker scheduler and improve type annotations
desertaxle Oct 31, 2025
4a08a0a
Add progress example
desertaxle Nov 3, 2025
6be37f5
Add comprehensive documentation to task progress example
desertaxle Nov 3, 2025
0a099fa
Consolidate Redis records by merging known and stream_id into runs hash
desertaxle Nov 3, 2025
eb5b1e4
Replace set_running with atomic claim_and_run method
desertaxle Nov 3, 2025
ba0ba37
Rename set_completed and set_failed for clarity
desertaxle Nov 3, 2025
7ed29c3
Rename claim_and_run to claim for clarity
desertaxle Nov 3, 2025
665a804
Get to 100% test coverage
desertaxle Nov 4, 2025
282cfce
Add fundamental tests
desertaxle Nov 4, 2025
7754f0d
Merge branch 'main' into exectuion-expansion
desertaxle Nov 4, 2025
0519481
Fix test failures after updates from `main`
desertaxle Nov 4, 2025
64ad325
Add ability to specify custom ttl
desertaxle Nov 4, 2025
18b91fc
Fixes test failure
desertaxle Nov 4, 2025
5aea763
Fix memory test failures
desertaxle Nov 4, 2025
5f37218
Fix race condition in subscribe() for fast-completing tasks
desertaxle Nov 4, 2025
ab84077
Always emit initial progress event in subscribe()
desertaxle Nov 4, 2025
bdabee2
Add verbose logging for Python 3.12
desertaxle Nov 4, 2025
fd2ed28
Remove checks for progress percentage in CLI tests
desertaxle Nov 4, 2025
921e2f6
Use started_at check instead of worker_name for progress display
desertaxle Nov 4, 2025
b6732ea
Increase timeout
desertaxle Nov 4, 2025
4fc3780
Fix indentation in test_custom_execution_ttl
desertaxle Nov 4, 2025
18697d3
Revert "Fix indentation in test_custom_execution_ttl"
desertaxle Nov 4, 2025
0c79581
Improve test timing
desertaxle Nov 4, 2025
6abe56a
Adjust test timing
desertaxle Nov 4, 2025
1a6e8aa
Fix test
desertaxle Nov 4, 2025
a8ba66b
Fix assertion
desertaxle Nov 4, 2025
c7854f4
Merge branch 'main' into exectuion-expansion
desertaxle Nov 4, 2025
aabacc9
Address review comments
desertaxle Nov 4, 2025
d668e4b
Add coverage
desertaxle Nov 4, 2025
8d0029f
Remove verbose pytest flag
desertaxle Nov 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions examples/task_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Example demonstrating task progress tracking and real-time monitoring.

This example shows how to:
- Report progress from within a task using ExecutionProgress
- Track progress with current value, total, and status messages
- Monitor task progress in real-time using the 'docket watch' command
- Schedule tasks for future execution

Key Concepts:
- ExecutionProgress: Tracks task progress (current/total) and status messages
- Progress dependency: Injected into tasks via Progress() default parameter
- Real-time monitoring: Use 'docket watch' CLI to monitor running tasks
- State tracking: Tasks transition through SCHEDULED → QUEUED → RUNNING → COMPLETED

Run this example with `uv run -m examples.task_progress` and use the printed 'docket watch' command to see live progress updates.
"""

from datetime import datetime, timedelta, timezone
from docket import Docket, Progress, Worker
import asyncio
import rich.console

from docket.execution import ExecutionProgress

from .common import run_redis


async def long_task(progress: ExecutionProgress = Progress()) -> None:
"""A long-running task that reports progress as it executes.

This demonstrates the key progress tracking patterns:
- Progress dependency injection via Progress() default parameter
- Incremental progress updates with increment()
- Status messages with set_message()

The ExecutionProgress object has a default total of 100, so we don't need
to call set_total() in this example. The progress automatically increments
from 0 to 100.

Args:
progress: Injected ExecutionProgress tracker (automatically provided by Docket)

Pattern for your own tasks:
1. Add progress parameter with Progress() default
2. Call increment() as work progresses (or set_total + increment)
3. Optionally set_message() to show current status
4. Monitor with: docket watch --url <redis_url> --docket <name> <task_key>
"""
# Simulate 100 steps of work, each taking 1 second
for i in range(1, 101):
await asyncio.sleep(1) # Simulate work being done

# Increment progress by 1 (tracks that one more unit is complete)
await progress.increment()

# Update status message every 10 items for demonstration
if i % 10 == 0:
await progress.set_message(f"{i} splines retriculated")


# Export tasks for docket CLI to discover
tasks = [long_task]

# Console for printing user-friendly messages
console = rich.console.Console()


async def main():
"""Run the progress tracking example.

This function demonstrates the complete lifecycle:
1. Start a Redis container for testing
2. Create a Docket (task queue)
3. Start a Worker (executes tasks)
4. Register and schedule a task
5. Monitor progress with the 'docket watch' command

The task is scheduled 20 seconds in the future to give you time to
run the watch command and see the task transition through states:
SCHEDULED → QUEUED → RUNNING → COMPLETED
"""
# Start a temporary Redis container for this example
# In production, you'd connect to your existing Redis instance
async with run_redis("7.4.2") as redis_url:
# Create a Docket connected to Redis
async with Docket(name="task-progress", url=redis_url) as docket:
# Start a Worker to execute tasks from the docket
async with Worker(docket, name="task-progress-worker") as worker:
# Register the task so the worker knows how to execute it
docket.register(long_task)

# Schedule the task to run 20 seconds from now
# This gives you time to run the watch command before it starts
in_twenty_seconds = datetime.now(timezone.utc) + timedelta(seconds=20)
execution = await docket.add(
long_task, key="long-task", when=in_twenty_seconds
)()

# Print instructions for monitoring
console.print(f"Execution {execution.key} started!")
console.print(
f"Run [blue]docket watch --url {redis_url} --docket {docket.name} {execution.key}[/blue] to see the progress!"
)

# Run the worker until all tasks complete
# The worker will wait for the scheduled time, then execute the task
await worker.run_until_finished()


if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dev = [
"pytest>=8.3.4",
"pytest-asyncio>=0.24.0",
"pytest-cov>=6.0.0",
"pytest-timeout>=2.4.0",
"pytest-xdist>=3.6.1",
"ruff>=0.9.7",
]
Expand Down
5 changes: 4 additions & 1 deletion src/docket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
Depends,
ExponentialRetry,
Perpetual,
Progress,
Retry,
TaskArgument,
TaskKey,
TaskLogger,
Timeout,
)
from .docket import Docket
from .execution import Execution
from .execution import Execution, ExecutionState
from .worker import Worker

__all__ = [
Expand All @@ -38,9 +39,11 @@
"Depends",
"Docket",
"Execution",
"ExecutionState",
"ExponentialRetry",
"Logged",
"Perpetual",
"Progress",
"Retry",
"TaskArgument",
"TaskKey",
Expand Down
1 change: 1 addition & 0 deletions src/docket/agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ async def scatter(
# Create execution with unique key
key = str(uuid7())
execution = Execution(
docket=docket,
function=resolved_func,
args=args,
kwargs=kwargs,
Expand Down
Loading