Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
72a0ded
Initial implementation
desertaxle Oct 31, 2025
c3e5c50
Add pub/sub events for worker scheduler and improve type annotations
desertaxle Oct 31, 2025
4a08a0a
Add progress example
desertaxle Nov 3, 2025
6be37f5
Add comprehensive documentation to task progress example
desertaxle Nov 3, 2025
0a099fa
Consolidate Redis records by merging known and stream_id into runs hash
desertaxle Nov 3, 2025
eb5b1e4
Replace set_running with atomic claim_and_run method
desertaxle Nov 3, 2025
ba0ba37
Rename set_completed and set_failed for clarity
desertaxle Nov 3, 2025
7ed29c3
Rename claim_and_run to claim for clarity
desertaxle Nov 3, 2025
665a804
Get to 100% test coverage
desertaxle Nov 4, 2025
282cfce
Add fundamental tests
desertaxle Nov 4, 2025
7754f0d
Merge branch 'main' into exectuion-expansion
desertaxle Nov 4, 2025
0519481
Fix test failures after updates from `main`
desertaxle Nov 4, 2025
64ad325
Add ability to specify custom ttl
desertaxle Nov 4, 2025
18b91fc
Fixes test failure
desertaxle Nov 4, 2025
5aea763
Fix memory test failures
desertaxle Nov 4, 2025
5f37218
Fix race condition in subscribe() for fast-completing tasks
desertaxle Nov 4, 2025
ab84077
Always emit initial progress event in subscribe()
desertaxle Nov 4, 2025
bdabee2
Add verbose logging for Python 3.12
desertaxle Nov 4, 2025
fd2ed28
Remove checks for progress percentage in CLI tests
desertaxle Nov 4, 2025
921e2f6
Use started_at check instead of worker_name for progress display
desertaxle Nov 4, 2025
b6732ea
Increase timeout
desertaxle Nov 4, 2025
4fc3780
Fix indentation in test_custom_execution_ttl
desertaxle Nov 4, 2025
18697d3
Revert "Fix indentation in test_custom_execution_ttl"
desertaxle Nov 4, 2025
0c79581
Improve test timing
desertaxle Nov 4, 2025
6abe56a
Adjust test timing
desertaxle Nov 4, 2025
1a6e8aa
Fix test
desertaxle Nov 4, 2025
a8ba66b
Fix assertion
desertaxle Nov 4, 2025
c7854f4
Merge branch 'main' into exectuion-expansion
desertaxle Nov 4, 2025
aabacc9
Address review comments
desertaxle Nov 4, 2025
d668e4b
Add coverage
desertaxle Nov 4, 2025
8d0029f
Remove verbose pytest flag
desertaxle Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/docket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
Depends,
ExponentialRetry,
Perpetual,
Progress,
Retry,
TaskArgument,
TaskKey,
TaskLogger,
Timeout,
)
from .docket import Docket
from .execution import Execution
from .execution import Execution, ExecutionState
from .worker import Worker

__all__ = [
Expand All @@ -38,9 +39,11 @@
"Depends",
"Docket",
"Execution",
"ExecutionState",
"ExponentialRetry",
"Logged",
"Perpetual",
"Progress",
"Retry",
"TaskArgument",
"TaskKey",
Expand Down
1 change: 1 addition & 0 deletions src/docket/agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ async def scatter(
# Create execution with unique key
key = str(uuid7())
execution = Execution(
docket=docket,
function=resolved_func,
args=args,
kwargs=kwargs,
Expand Down
183 changes: 166 additions & 17 deletions src/docket/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
from datetime import datetime, timedelta, timezone
from functools import partial
from typing import Annotated, Any, Collection
from unittest.mock import AsyncMock

import typer
from rich.console import Console
from rich.table import Table

from . import __version__, tasks
from .docket import Docket, DocketSnapshot, WorkerInfo
from .execution import Operator
from .execution import Execution, ExecutionState, Operator
from .worker import Worker

app: typer.Typer = typer.Typer(
Expand Down Expand Up @@ -376,7 +377,7 @@ async def run() -> None:
asyncio.run(run())


@app.command(help="Clear all pending and scheduled tasks from the docket")
@app.command(help="Clear all queued and scheduled tasks from the docket")
def clear(
docket_: Annotated[
str,
Expand Down Expand Up @@ -508,11 +509,8 @@ def trace(
async def run() -> None:
async with Docket(name=docket_, url=url) as docket:
when = datetime.now(timezone.utc) + delay
execution = await docket.add(tasks.trace, when)(message)
print(
f"Added {execution.function.__name__} task {execution.key!r} to "
f"the docket {docket.name!r}"
)
task_run = await docket.add(tasks.trace, when)(message)
print(f"Added trace task {task_run.key!r} to the docket {docket.name!r}")

asyncio.run(run())

Expand Down Expand Up @@ -552,11 +550,8 @@ def fail(
async def run() -> None:
async with Docket(name=docket_, url=url) as docket:
when = datetime.now(timezone.utc) + delay
execution = await docket.add(tasks.fail, when)(message)
print(
f"Added {execution.function.__name__} task {execution.key!r} to "
f"the docket {docket.name!r}"
)
task_run = await docket.add(tasks.fail, when)(message)
print(f"Added fail task {task_run.key!r} to the docket {docket.name!r}")

asyncio.run(run())

Expand Down Expand Up @@ -596,11 +591,8 @@ def sleep(
async def run() -> None:
async with Docket(name=docket_, url=url) as docket:
when = datetime.now(timezone.utc) + delay
execution = await docket.add(tasks.sleep, when)(seconds)
print(
f"Added {execution.function.__name__} task {execution.key!r} to "
f"the docket {docket.name!r}"
)
task_run = await docket.add(tasks.sleep, when)(seconds)
print(f"Added sleep task {task_run.key!r} to the docket {docket.name!r}")

asyncio.run(run())

Expand Down Expand Up @@ -810,6 +802,163 @@ async def run() -> DocketSnapshot:
console.print(stats_table)


@app.command(help="Monitor progress of a specific task execution")
def progress(
key: Annotated[str, typer.Argument(help="The task execution key to monitor")],
url: Annotated[
str,
typer.Option(
"--url",
"-u",
envvar="DOCKET_REDIS_URL",
help="Redis URL (e.g., redis://localhost:6379/0)",
),
] = "redis://localhost:6379/0",
docket_name: Annotated[
str,
typer.Option(
"--docket",
"-d",
envvar="DOCKET_NAME",
help="Docket name",
),
] = "docket",
) -> None:
"""Monitor the progress of a specific task execution in real-time using event-driven updates."""

async def monitor() -> None:
docket = Docket(docket_name, url)
execution = Execution(
docket, AsyncMock(), (), {}, datetime.now(timezone.utc), key, 1
) # TODO: Replace AsyncMock with actual task function
console = Console()

# State colors for display
state_colors = {
ExecutionState.SCHEDULED: "yellow",
ExecutionState.QUEUED: "cyan",
ExecutionState.RUNNING: "blue",
ExecutionState.COMPLETED: "green",
ExecutionState.FAILED: "red",
}

# Load initial snapshot
initial_state = await execution.get_state()
await execution.progress.sync()

if initial_state is None:
console.print(f"[red]No state found for task key: {key}[/red]")
return

# Track current state for display
current_state = initial_state
current_progress = {
"current": execution.progress.current
if execution.progress.current is not None
else 0,
"total": execution.progress.total,
"message": execution.progress.message,
}
start_time = datetime.now(timezone.utc)
worker_name: str | None = None
error_message: str | None = None

def display_status() -> None:
"""Display current task status."""
console.clear()
console.print(f"[bold]Task Key:[/bold] {key}")
console.print(f"[bold]Docket:[/bold] {docket_name}")
console.print()

# Display state with color
state_color = state_colors.get(current_state, "white")
console.print(
f"[bold]State:[/bold] [{state_color}]{current_state.value.upper()}[/{state_color}]"
)

# Display worker if running
if worker_name:
console.print(f"[bold]Worker:[/bold] {worker_name}")

# Display elapsed time
elapsed = datetime.now(timezone.utc) - start_time
elapsed_str = str(elapsed).split(".")[0] # Remove microseconds
console.print(f"[bold]Elapsed:[/bold] {elapsed_str}")
console.print()

# Display progress if available
if current_progress["current"] > 0 or current_progress["total"] is not None:
current_val = current_progress["current"]
total_val = current_progress["total"]

if total_val is not None and total_val > 0:
percent = current_val / total_val * 100
console.print(
f"[bold]Progress:[/bold] {current_val}/{total_val} ({percent:.1f}%)"
)

# Simple progress bar
bar_width = 40
filled = int(bar_width * percent / 100)
bar = "█" * filled + "░" * (bar_width - filled)
console.print(f"[cyan]{bar}[/cyan]")
else:
console.print(f"[bold]Progress:[/bold] {current_val}")

if current_progress["message"]:
console.print(
f"[bold]Message:[/bold] {current_progress['message']}"
)

# Display error if failed
if error_message:
console.print()
console.print(f"[red bold]Error:[/red bold] {error_message}")

# Display completion status
if current_state == ExecutionState.COMPLETED:
console.print()
console.print("[green bold]✓ Task completed successfully[/green bold]")
elif current_state == ExecutionState.FAILED:
console.print()
console.print("[red bold]✗ Task failed[/red bold]")

# Display initial snapshot
display_status()

# If already in terminal state, exit
if current_state in (ExecutionState.COMPLETED, ExecutionState.FAILED):
return

# Subscribe to events and update display
async for event in execution.subscribe():
if event["type"] == "state":
# Update state information
current_state = ExecutionState(event["state"])
if "worker" in event:
worker_name = event["worker"]
if "error" in event:
error_message = event["error"]

display_status()

# Exit if terminal state reached
if current_state in (ExecutionState.COMPLETED, ExecutionState.FAILED):
break

elif event["type"] == "progress":
# Update progress information
current_progress["current"] = event["current"]
if event.get("total") is not None:
current_progress["total"] = event["total"]
if event.get("message") is not None:
current_progress["message"] = event["message"]

display_status()

asyncio.run(monitor())


workers_app: typer.Typer = typer.Typer(
help="Look at the workers on a docket", no_args_is_help=True
)
Expand Down
30 changes: 29 additions & 1 deletion src/docket/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
)

from .docket import Docket
from .execution import Execution, TaskFunction, get_signature
from .execution import Execution, ExecutionProgress, TaskFunction, get_signature
from .instrumentation import CACHE_SIZE
# Run and RunProgress have been consolidated into Execution

if TYPE_CHECKING: # pragma: no cover
from .worker import Worker
Expand Down Expand Up @@ -192,6 +193,33 @@ async def my_task(logger: "LoggerAdapter[Logger]" = TaskLogger()) -> None:
return cast("logging.LoggerAdapter[logging.Logger]", _TaskLogger())


class _Progress(Dependency):
async def __aenter__(self) -> ExecutionProgress:
execution = self.execution.get()
return execution.progress


def Progress() -> ExecutionProgress:
"""A dependency to report progress updates for the currently executing task.

Tasks can use this to report their current progress (current/total values) and
status messages to external observers.

Example:

```python
@task
async def process_records(records: list, progress: ExecutionProgress = Progress()) -> None:
await progress.set_total(len(records))
for i, record in enumerate(records):
await process(record)
await progress.increment()
await progress.set_message(f"Processed {record.id}")
```
"""
return cast(ExecutionProgress, _Progress())


class ForcedRetry(Exception):
"""Raised when a task requests a retry via `in_` or `at`"""

Expand Down
Loading
Loading