Skip to content

Conversation

@desertaxle
Copy link
Collaborator

@desertaxle desertaxle commented Nov 5, 2025

Summary

Adds automatic result persistence for task executions using py-key-value-aio storage, plus critical bug fixes for resource cleanup and improved test coverage.

✨ New Feature: Result Persistence

Tasks can now store return values and exceptions for later retrieval:

async def calculate() -> int:
    return 42

execution = await docket.add(calculate)()
await worker.run_until_finished()

# Retrieve result (waits if task still running)
result = await execution.get_result()  # Returns 42

Key Features

  • Automatic serialization: Uses cloudpickle for any Python object
  • Exception storage: Failed tasks store exceptions, which are re-raised on get_result()
  • Smart skipping: Tasks returning None skip persistence
  • TTL management: Results expire with execution_ttl (default: 1 hour)
  • Timeout support: get_result(timeout=...) with graceful timeout handling
  • Pub/sub integration: Waits for completion via existing state subscription

Implementation Details

  • Docket.result_storage: RedisStore or MemoryStore backend
  • Execution.result_key: Tracks where result is stored
  • Execution.get_result(): Retrieves results, waiting via pub/sub if needed
  • Worker captures return values/exceptions after task execution
  • Base64-encoded JSON for storage compatibility

Storage Backend

Uses py-key-value-aio with pluggable storage:

  • Redis: RedisStore for production (separate connection pool)
  • Memory: MemoryStore for memory:// URLs (testing)

📚 API Changes

New Public Methods

execution.get_result(timeout=...)  # New method

Docket Configuration

Docket(
    result_storage=custom_storage  # Optional: provide custom AsyncKeyValue
)

Closes #166


🤖 Generated with Claude Code

desertaxle and others added 2 commits November 5, 2025 10:49
Add comprehensive result storage and retrieval for task executions using
py-key-value-aio as the backend. Results are serialized with cloudpickle,
base64-encoded for JSON compatibility, and stored with TTL matching
execution_ttl.

Key features:
- Tasks returning non-None values store results in result_storage
- Tasks annotated with -> None or returning None skip persistence
- Execution.get_result() retrieves results, waiting via pub/sub if needed
- Failed tasks store exceptions which are re-raised on retrieval
- Result TTL matches execution_ttl for consistent expiration
- Added returns_none() helper to check type annotations

Implementation:
- Added result field to Execution class for tracking result location
- Updated mark_as_completed() and mark_as_failed() to accept result_key
- Worker._execute() captures results and stores them after completion
- Worker._execute() stores exceptions for failed tasks
- Comprehensive test coverage for various scenarios

Addresses #166

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
This commit fixes several resource cleanup issues and adds comprehensive
tests to achieve 100% code coverage.

## Fixes

### RedisStore Cleanup (docket.py)
- Add proper cleanup of result_storage in Docket.__aexit__()
- RedisStore maintains its own connection pool that wasn't being closed
- Fixes ResourceWarning about unclosed connections

### Test Mock Cleanup (test_execution_progress.py)
- Fix incomplete Redis async context manager mocks
- Add __aexit__ configuration to prevent connection cleanup warnings
- Applied to test_execution_sync_with_missing_state_field and
  test_execution_sync_with_string_state_value

### Exception Pickling (test_results.py)
- Fix CustomError to properly pickle/unpickle
- Pass both args to super().__init__() to preserve exception state

### Timeout Handling (execution.py)
- Fix get_result() timeout to work even when no events arrive
- Wrap subscribe loop with asyncio.wait_for()
- Add early check for already-expired timeouts

## New Tests

Added 4 tests to test_results.py for 100% coverage:
- test_get_result_with_expired_timeout
- test_get_result_failed_task_without_result_key
- test_get_result_with_malformed_result_data
- test_get_result_failed_task_with_missing_exception_data

## Results
- All 395 tests passing
- 100% code coverage achieved
- No resource warnings

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@claude
Copy link
Contributor

claude bot commented Nov 5, 2025

Claude finished @desertaxle's task —— View job


Code Review: Result Persistence PR

✅ Overall Assessment

This PR successfully implements result persistence for task executions with a well-designed API and comprehensive test coverage. The implementation is clean, follows existing patterns in the codebase, and maintains 100% test coverage as required.

🎯 Code Quality & Best Practices

Strengths:

  • Clean API Design: The get_result() method provides an intuitive interface with proper async/await support and timeout handling
  • Smart Storage Optimization: Skips persistence for None values to avoid unnecessary storage
  • Proper TTL Management: Results expire along with execution records, preventing orphaned data
  • Excellent Test Coverage: Comprehensive test suite covering edge cases, concurrent access, timeouts, and error conditions
  • Good Separation of Concerns: Uses py-key-value-aio for pluggable storage backends

Minor Suggestions:

  1. Line 739 in execution.py: The empty line could be removed as suggested in the review
  2. Line 741 in execution.py: Consider adding # pragma: no branch as suggested for the async for loop to handle uncoverable branches

🔒 Security Considerations

Potential Concerns:

  • Cloudpickle Usage: The use of cloudpickle for serializing arbitrary Python objects poses inherent security risks. Unpickling data from untrusted sources can lead to arbitrary code execution.

Mitigations in Place:

  • Results are only stored and retrieved within the same trusted system
  • Base64 encoding provides transport safety but not security
  • TTL ensures data doesn't persist indefinitely

Recommendation: Consider documenting that this feature should only be used in trusted environments where all task results come from known sources.

⚡ Performance Considerations

Positives:

  • Efficient Storage: Only stores non-None results, reducing storage overhead
  • Pub/Sub for Waiting: Uses Redis pub/sub for efficient waiting rather than polling
  • Connection Pooling: Properly uses connection pools for Redis operations

Potential Improvements:

  1. Large Object Handling: Consider implementing size limits or compression for large results to prevent memory issues
  2. Batch Operations: When multiple get_result() calls happen concurrently, they each create separate pub/sub connections - this is fine but worth monitoring in high-concurrency scenarios

🐛 Potential Issues

  1. Resource Cleanup: The result_storage cleanup in __aexit__ looks correct, properly handling both BaseContextManagerStore and regular stores

  2. Error Handling: The graceful fallback when result data is missing (returning None or generic exception) is well implemented

  3. Race Conditions: The implementation properly handles the case where a task completes between checking state and subscribing to updates

📚 Documentation & Testing

Test Coverage Highlights:

  • ✅ Various data types (int, str, dict, custom objects)
  • ✅ None handling (both annotated and runtime)
  • ✅ Exception storage and re-raising with preserved details
  • ✅ Timeout behavior
  • ✅ Concurrent access patterns
  • ✅ TTL expiration
  • ✅ Edge cases (missing data, malformed results)

Suggestion: The reviewer's request to copy the test to test_fundamentals.py has been addressed (line 739-752).

🔄 Backwards Compatibility

The implementation maintains backwards compatibility:

  • Result persistence is automatic but doesn't break existing code
  • Tasks returning None don't incur storage overhead
  • The result_key field is optional in execution records

📝 Summary

This is a well-implemented feature that adds significant value to the Docket system. The code is production-ready with only minor cosmetic suggestions. The comprehensive test suite and proper error handling demonstrate thorough engineering.

Recommended Actions:

  1. Address the minor line formatting suggestions from the reviewer
  2. Consider adding documentation about security implications of cloudpickle
  3. Monitor performance with large objects in production

Great work on this implementation! The feature is well-designed and thoroughly tested. 🎉

@github-actions
Copy link

github-actions bot commented Nov 5, 2025

📚 Documentation has been built for this PR!

You can download the documentation directly here:
https://github.com/chrisguidry/docket/actions/runs/19117058012/artifacts/4479942352

@codecov-commenter
Copy link

codecov-commenter commented Nov 5, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 100.00%. Comparing base (4cfbdd3) to head (2d565ac).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##              main      #184    +/-   ##
==========================================
  Coverage   100.00%   100.00%            
==========================================
  Files           36        37     +1     
  Lines         6100      6400   +300     
  Branches       295       312    +17     
==========================================
+ Hits          6100      6400   +300     
Flag Coverage Δ
python-3.10 100.00% <100.00%> (ø)
python-3.11 99.01% <100.00%> (+0.04%) ⬆️
python-3.12 100.00% <100.00%> (ø)
python-3.13 100.00% <100.00%> (ø)
python-3.14 100.00% <100.00%> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/docket/cli.py 100.00% <100.00%> (ø)
src/docket/docket.py 100.00% <100.00%> (ø)
src/docket/execution.py 100.00% <100.00%> (ø)
src/docket/worker.py 100.00% <100.00%> (ø)
tests/cli/test_snapshot.py 100.00% <100.00%> (ø)
tests/conftest.py 100.00% <100.00%> (ø)
tests/test_execution_progress.py 100.00% <100.00%> (ø)
tests/test_fundamentals.py 100.00% <100.00%> (ø)
tests/test_results.py 100.00% <100.00%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@desertaxle desertaxle changed the title Add result persistence and fix resource cleanup Add result persistence Nov 5, 2025
@desertaxle desertaxle marked this pull request as ready for review November 5, 2025 20:28
@desertaxle desertaxle merged commit 7acf0eb into main Nov 5, 2025
25 checks passed
@desertaxle desertaxle deleted the results-persistence branch November 5, 2025 21:38
desertaxle added a commit that referenced this pull request Nov 7, 2025
… persistence

Documents features added in PRs #181, #184, and #189:

- Add comprehensive "Task State and Progress Monitoring" section to advanced-patterns.md
  covering execution state machine, Redis data model, pub/sub events, progress reporting,
  result retrieval, and CLI monitoring

- Add Progress() dependency documentation to dependencies.md with usage examples
  and links to detailed patterns

- Add "State and Result Storage" section to production.md documenting execution_ttl
  configuration, fire-and-forget mode (execution_ttl=0), and custom result storage backends

- Add "Task Observability" section to getting-started.md introducing state tracking,
  progress reporting, and result retrieval with links to detailed documentation

All documentation includes working code examples and follows the existing tone and style.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
chrisguidry pushed a commit that referenced this pull request Nov 9, 2025
… persistence (#191)

## Summary

Adds comprehensive documentation for the execution state tracking,
progress monitoring, and result persistence features introduced in PRs
#181, #184, and #189.

**Related PRs:**
- #181 - Add execution state tracking and progress monitoring
- #184 - Add result persistence  
- #189 - Add support for execution_ttl=0 and timeout/deadline parameters

## Documentation Changes

### docs/advanced-patterns.md
Added a new major section **"Task State and Progress Monitoring"**
covering:

- **High-Level Design** - Architecture overview explaining the execution
state machine, Redis data model, pub/sub event system, and result
storage
- **Tracking Execution State** - How to query and monitor task states
- **Reporting Task Progress** - Using the `Progress()` dependency with
`ExecutionProgress` API
- **Monitoring Progress in Real-Time** - Subscribing to progress and
state events programmatically
- **Advanced Progress Patterns** - Incremental progress, batch updates,
and nested tracking patterns
- **Retrieving Task Results** - Using `get_result()` with
timeout/deadline parameters and exception handling
- **CLI Monitoring with Watch** - Using the `docket watch` command for
real-time monitoring
- **Fire-and-Forget Mode** - Documenting `execution_ttl=0` for
high-throughput scenarios

### docs/dependencies.md
Added **"Reporting Task Progress"** subsection to Built-in Context
Dependencies:

- Documents the `Progress()` dependency injection pattern
- Explains `ExecutionProgress` methods (`set_total`, `increment`,
`set_message`, `sync`)
- Highlights key characteristics (atomic, real-time, observable,
ephemeral)
- Links to detailed documentation in advanced-patterns.md

### docs/production.md
Added **"State and Result Storage"** section:

- Execution state tracking configuration with `execution_ttl`
- Fire-and-forget mode using `execution_ttl=0` for maximum throughput
- Result storage configuration with custom backends (RedisStore,
MemoryStore)
- Best practices for TTL management, separate databases, and large
results
- Monitoring state and result storage in Redis

### docs/getting-started.md
Added **"Task Observability"** section:

- Introduction to execution state tracking
- Basic progress reporting examples
- Task result retrieval with `get_result()`
- Links to detailed documentation in advanced-patterns.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task result storage and retrieval

4 participants