-
Notifications
You must be signed in to change notification settings - Fork 3
Add execution state tracking and progress monitoring #181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Ensures the worker scheduler emits pub/sub events when moving tasks from scheduled to queued state, providing consistent event monitoring across all state transitions. Removes the redundant set_queued() method since state transitions are now automated in the scheduling Lua scripts. Also improves type annotations to achieve 100% type completeness score in pyright verification by adding explicit return types to subscribe() methods and list type annotations in tests. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
|
Claude finished @desertaxle's task —— View job PR Review: Add execution state tracking and progress monitoringI'm reviewing this comprehensive PR that adds execution state tracking and progress monitoring capabilities to Docket. Review Checklist
SummaryThis is an excellent, well-architected implementation that adds comprehensive execution state tracking and real-time progress monitoring to Docket. The code quality is high, test coverage remains at 100%, and the design patterns are solid. Strengths ✅Architecture & Design
Code Quality
Performance
Review Comments & Suggestions 🔍1. Default Progress Total ValueIn # execution.py:127 - Currently 1
self.total: int = 1
# execution.py:236, 249 - Uses 100
self.total = 100
# worker.py:596 - Sets to 100
redis.call('HSET', progress_key, 'current', '0', 'total', '100')Recommendation: Set default to 1 consistently as suggested by reviewer, since it's more intuitive for simple use cases. 2. Progress Total ValidationThe validation in def __init__(self, docket: "Docket", key: str, initial_total: int = 1) -> None:
if initial_total < 1:
raise ValueError("Total must be at least 1")
self.total = initial_total3. Context Manager SupportThe reviewer asked about context managers. While not implemented, this could be a nice future enhancement for automatic progress cleanup: @asynccontextmanager
async def track_progress(execution: Execution):
"""Context manager for automatic progress tracking."""
try:
yield execution.progress
finally:
# Progress is already auto-deleted on completion/failure
pass4. Event Type DefinitionsGood use of TypedDict for ExecutionEvent = Union[StateEvent, ProgressEvent]5. Legacy Key CleanupExcellent handling of backwards compatibility with TODO comments marking legacy code for removal. The migration path is clear and safe. Minor Issues 🔬1. Redundant CommentsSome AI-generated breadcrumb comments should be removed (docket.py:439, as noted by reviewer). 2. Progress Subscribe CoverageThe 3. TTL ConfigurationGood addition of configurable Security Considerations 🔒
Performance Considerations ⚡
Test Coverage Analysis 🧪Test coverage is excellent at 100%. Tests comprehensively cover:
Recommendations 📝
ConclusionThis is a well-executed feature addition that significantly enhances Docket's observability capabilities. The code is production-ready with only minor adjustments needed. The architecture is solid, the implementation is clean, and the test coverage is comprehensive. Verdict: ✅ Approved with minor suggestions Excellent work on this significant enhancement to Docket! The execution state tracking and progress monitoring features are well-designed and thoroughly tested. |
|
📚 Documentation has been built for this PR! You can download the documentation directly here: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love where this is heading!!!
| initiating_context = initiating_span.get_span_context() | ||
| return [trace.Link(initiating_context)] if initiating_context.is_valid else [] | ||
|
|
||
| async def schedule(self, replace: bool = False) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wow this is a cool refactoring, didn't think of this!
Adds module-level docstring explaining progress tracking concepts, detailed function docstrings for long_task and main, and inline comments throughout to help users understand and adapt the example to their own use cases. Documentation covers: - Progress dependency injection pattern - ExecutionProgress API (increment, set_message) - Real-time monitoring with docket watch command - Task scheduling and state lifecycle - Complete setup from Redis to Worker to execution 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Reduces Redis key count from 4 to 2-3 per task by storing known marker and
stream_id as fields in the execution state hash instead of separate keys.
Changes:
- Immediate tasks: 4→2 keys (50% reduction)
- Future tasks: 4→3 keys (25% reduction)
Implementation:
- Store 'known' field in {docket}:runs:{key} hash for duplicate prevention
- Store 'stream_id' field in {docket}:runs:{key} hash for cancellation
- Delete known/stream_id fields from runs hash when task completes to allow rescheduling
- Update retry logic to use replace=True to handle existing runs hash
Backwards compatibility:
- Check both new (HGET runs hash) and legacy (GET separate key) locations
- Clean up legacy keys during replacement and cancellation
- All backwards compat code marked with TODO for removal in v0.14.0
Benefits:
- Fewer Redis keys to manage and clean up
- Reduced memory overhead (~200-300 bytes per task)
- Simpler cleanup logic (fewer DEL operations)
- Better data locality (related fields in same hash)
Testing:
- All 349 tests passing including legacy compatibility test
- Fixed test_redis_key_cleanup_cancelled_task to use correct assertion method
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
Consolidates worker operations when claiming a task into a single atomic Lua script, reducing Redis round trips from 3-4 to 1 operation. Changes: - Add claim_and_run() method that atomically: - Sets state to RUNNING with worker name and timestamp - Initializes progress tracking (current=0, total=100) - Deletes known/stream_id fields to allow task rescheduling - Cleans up legacy keys for backwards compatibility - Remove redundant set_running() method - Update worker to use claim_and_run() instead of set_running() - Update all tests to use claim_and_run() Benefits: - Single Redis round trip (75% reduction) - Atomic operation prevents partial state updates - Consistent with schedule() Lua script pattern - Better performance and reliability Testing: - All 349 tests passing - All state transitions verified - Progress initialization working correctly 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Renames state transition methods to be more explicit and self-documenting: - set_completed() → mark_as_completed() - set_failed() → mark_as_failed() This improves API clarity and consistency with the new claim_and_run() method. Changes: - Rename method definitions in Execution class - Update 2 call sites in Worker - Update 2 call sites in tests All 349 tests passing with 98% coverage. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Renames the method to better reflect its actual behavior - it claims the task and transitions to RUNNING state, but does not execute the function. Changes: - claim_and_run() → claim() in Execution class - Update Worker call site - Update 6 test call sites The new name is more accurate since the method only claims the task for execution; the actual function execution happens later in Worker._execute(). All 349 tests passing with 98% coverage. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #181 +/- ##
==========================================
Coverage 100.00% 100.00%
==========================================
Files 34 36 +2
Lines 5216 6100 +884
Branches 265 295 +30
==========================================
+ Hits 5216 6100 +884
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
The watch command was failing in CI when tasks completed quickly because: - subscribe() would sync() and find task already completed - Progress data would be deleted on completion - Watch would never see progress events Solution: subscribe() now emits initial progress event along with initial state event, ensuring subscribers capture progress state before deletion. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
The conditional check was preventing progress events from being emitted when tasks hadn't set progress (current=None, total=100 defaults). Now always emits initial progress event to ensure watch command and other subscribers can display progress state consistently. Also fixes test expecting 3 progress events (initial + 2 updates). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
The watch command was checking worker_name to determine if a task had started, but worker_name timing varied across Python versions in CI. Now checks execution.started_at which is set atomically when task starts, providing a more reliable indicator that's available in initial state event. Fixes Python 3.12-specific test failures in CI. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
The redis context was incorrectly dedented outside the Worker context. This caused branch coverage issues in CI. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
This reverts commit 4fc3780.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a beaut!
| from rich.console import Console | ||
| from rich.layout import Layout | ||
| from rich.live import Live | ||
| from rich.progress import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️ what a great toolkit
src/docket/docket.py
Outdated
| TaskFunction, | ||
| ) | ||
|
|
||
| # Run class has been consolidated into Execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably AI breadcrumb comments
| # ARGV: task_key | ||
| """ | ||
| local stream_key = KEYS[1] | ||
| -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
src/docket/execution.py
Outdated
| self.key = key | ||
| self._redis_key = f"{docket.name}:progress:{key}" | ||
| self.current: int | None = None | ||
| self.total: int = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make the default for total 1 instead.
| await instance.sync() | ||
| return instance | ||
|
|
||
| async def set_total(self, total: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's validate that total >= 1
src/docket/execution.py
Outdated
| } | ||
| await redis.publish(channel, json.dumps(payload)) | ||
|
|
||
| async def subscribe(self) -> AsyncGenerator[dict[str, Any], None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could define a ProgressEvent here if we wanted to
Summary
Adds comprehensive execution state tracking and progress monitoring capabilities to Docket, enabling real-time observability of task execution through Redis pub/sub.
ExecutionProgressclass with instance attributes for tracking task progressSCHEDULED → QUEUED → RUNNING → COMPLETED/FAILEDDockettoExecutionclass for better encapsulationdocket watchCLI to view progress for a single taskDemo
Screen.Recording.2025-11-03.at.10.17.22.AM.mov
Key Changes
Execution State Management
PENDINGstate toQUEUEDthroughout codebaseQUEUEDstateSCHEDULED, then move toQUEUEDwhen dueExecutionProgress Class
current,total,message,updated_atcreate()classmethod for initializationScheduling Refactor
Docket._schedule()toExecution.schedule()set_scheduled()andset_queued()methodsPub/Sub Event Publishing
{docket}:state:{key}channel{docket}:progress:{key}channelExecution.subscribe()andExecutionProgress.subscribe()methods availableCloses #88
🤖 Generated with Claude Code