-
Notifications
You must be signed in to change notification settings - Fork 2.8k
SEP-1686: Tasks #1645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SEP-1686: Tasks #1645
Conversation
cdf6aa0 to
9bd2aa8
Compare
Analyzed the official SEP-1686 specification against both the MCP SDK's draft implementation (PR #1645) and FastMCP's current shims. Made corrections to match the spec exactly. **Key changes:** 1. **Removed `error` field** - Spec only defines `statusMessage` for error details. Changed all handlers to use `statusMessage` instead of separate `error` field. 2. **Removed non-spec status values** - Spec defines exactly 5 statuses: `working`, `input_required`, `completed`, `failed`, `cancelled`. Removed FastMCP's `"submitted"` and `"unknown"` extensions. 3. **Non-existent tasks raise errors** - Aligned with SDK behavior: `tasks/get` for non-existent/deleted tasks raises `ValueError` (JSON-RPC error) instead of returning synthetic `status="unknown"`. 4. **Test updates** - Fixed 12+ tests expecting removed statuses. Changed assertions to expect JSON-RPC errors for not-found scenarios. **What stayed the same:** - Client already sends both `task=` (spec-compliant) and `_meta=` (SDK compatibility) - Server monkeypatches work correctly for request params - `createdAt` as ISO 8601 string matches spec (SDK uses datetime but serializes same) - `ttl` field naming confirmed correct in both spec and SDK All 3270 tests passing. FastMCP is now fully aligned with SEP-1686 final specification. Related: modelcontextprotocol/python-sdk#1645 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
6c8c071 to
8460a5f
Compare
- Move task handler protocols to experimental/task_handlers.py - Add build_client_tasks_capability() helper to auto-build ClientTasksCapability from handlers - ClientSession now automatically infers tasks capability from provided handlers - Add Resolver class for async result handling in task message queues - Refactor result_handler to use Resolver pattern - Add test for auto-built capabilities from handlers
- Replace 6 individual task handler parameters with single `experimental_task_handlers: ExperimentalTaskHandlers` (keyword-only) - ExperimentalTaskHandlers dataclass groups all handlers and provides: - `build_capability()` - auto-builds ClientTasksCapability from handlers - `handles_request()` - checks if request is task-related - `handle_request()` - dispatches to appropriate handler - Simplify ClientSession._received_request by delegating task requests - Update tests to use new ExperimentalTaskHandlers API
This commit adds working examples for the Tasks SEP demonstrating elicitation and sampling flows, along with supporting infrastructure changes. Examples: - simple-task-interactive server: Exposes confirm_delete (elicitation) and write_haiku (sampling) tools that run as tasks - simple-task-interactive-client: Connects to server, handles callbacks, and demonstrates the correct task result retrieval pattern Key changes: - Move call_tool_as_task() from ClientSession to session.experimental.call_tool_as_task() for API consistency - Add comprehensive tests mirroring the example patterns - Add server-side print outputs for visibility into task execution The critical insight: clients must call get_task_result() to receive elicitation/sampling requests - simply polling get_task() will not trigger the callbacks.
Update ToolExecution.taskSupport values per the latest MCP tasks spec: - "never" → "forbidden" - "always" → "required" - "optional" unchanged Add typed constants TASK_FORBIDDEN, TASK_OPTIONAL, TASK_REQUIRED for consistent usage throughout the codebase instead of hardcoded strings. Update all examples, tests, and documentation to use the new terminology.
This addresses two critical spec compliance gaps:
1. Add `lastUpdatedAt` field to Task model
- Required by spec: ISO 8601 timestamp updated on every status change
- Added to Task model in types.py
- Initialized alongside createdAt in create_task_state()
- Updated in InMemoryTaskStore.update_task() on any change
- Included in all Task responses and notifications
2. Add related-task metadata to tasks/result response
- Per spec: tasks/result MUST include _meta with
io.modelcontextprotocol/related-task containing the taskId
- Required because result structure doesn't contain task ID
- Merges with any existing _meta from stored result
|
|
||
|
|
||
| @dataclass | ||
| class Experimental: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] is this a single Experimental class that will be used for all experimental features?
[nit] the name doesn't match the file, wonder if it makes sense to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will leave for now as it requires a bit of refactoring I'd rather defer for later
|
|
||
| return types.CallToolResult(content=[types.TextContent(type="text", text="Task completed!")]) | ||
|
|
||
| return await ctx.experimental.run_task(work) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] so here if I wanted to run the task outside this process, I would instead have to use ctx.experimental.create_task() and then whatever code I have to start the task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From Claude, way better at writing than I am:
Yes, you can run tasks outside the current process. The key is using a shared store (not in-memory) that both the MCP server and external worker can access.
# === MCP Server ===
from mcp.server.lowlevel import Server
from your_app import RedisTaskStore, RedisTaskMessageQueue
server = Server("my-server")
# Use shared Redis store
server.experimental.enable_tasks(
store=RedisTaskStore(redis_url),
queue=RedisTaskMessageQueue(redis_url),
)
@server.call_tool()
async def handle_expensive_tool(name: str, arguments: dict) -> CreateTaskResult:
ctx = server.request_context
task_support = server._experimental._task_support
# Create task in shared store
task = await task_support.store.create_task(ctx.experimental.task_metadata)
# Dispatch to external worker (Celery, Redis queue, etc.)
await celery_app.send_task("run_expensive_work", args=[task.taskId, arguments])
# Return immediately
return CreateTaskResult(task=task)# === External Worker (separate process) ===
from your_app import RedisTaskStore
from mcp.types import CallToolResult, TextContent
async def run_expensive_work(task_id: str, arguments: dict):
# Connect to the SAME shared store
store = RedisTaskStore(redis_url)
try:
await store.update_task(task_id, status="working", status_message="Processing...")
result = await do_expensive_computation(arguments)
# Store result and mark complete
await store.store_result(
task_id,
CallToolResult(content=[TextContent(type="text", text=result)])
)
await store.update_task(task_id, status="completed")
# CRITICAL: Wake up the MCP server waiting on tasks/result
await store.notify_update(task_id)
except Exception as e:
await store.update_task(task_id, status="failed", status_message=str(e))
await store.notify_update(task_id)The MCP server's tasks/result handler loops on store.wait_for_update() - when the worker calls notify_update(), it unblocks and reads the result from store.get_result().
Limitation: Elicitation/sampling from external workers isn't supported yet - the ServerTaskContext needs an active session to send requests back to the client. This is a good use case for the
TaskRunner abstraction you suggested.
| return False | ||
| return True | ||
|
|
||
| async def run_task( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] would start_task be more accurate? my brain sees this and thinks this is actually running the task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yea potentially, I'll leave it for now as we'll do some refactoring later when adding in more helper stuff and integration into fastmcp. This API is gonna change anyway as it's still marked experimental
|
|
||
| # Poll until done | ||
| while True: | ||
| status = await session.experimental.get_task(task_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] are all experimental features going to have top level methods in experimental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to fix this, but for now I'd prefer to keep it as is until we need to add more just cause it'd require quite a bit of refactoring. Could put this in a followup PR/issue?
| print("Calling confirm_delete tool...") | ||
|
|
||
| result = await session.experimental.call_tool_as_task("confirm_delete", {"filename": "important.txt"}) | ||
| task_id = result.task.taskId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[very minor - nit] should we use different names here for the different tasks? makes it easier to read imo
| ] | ||
|
|
||
|
|
||
| @server.call_tool() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] this is not using fastMCP, right? is this because experimental needs lower level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes for now. Adding to FastMCP can be done in a later PR
| ctx = server.request_context | ||
|
|
||
| # Validate task mode - this tool requires task augmentation | ||
| ctx.experimental.validate_task_mode(types.TASK_REQUIRED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] I understand validate_task_mode but I don't follow why this is needed before anything else?
by reading the function I sort of expected this to be done on a tool level based on the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea it really should be. In this example it's just that this is the only tool. I'll refactor the example to be a bit more clear
| response_data = await resolver.wait() | ||
| await self._store.update_task(self.task_id, status=TASK_STATUS_WORKING) | ||
| return ElicitResult.model_validate(response_data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] does this handle the case of augmented elicitation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should now!
| Returns: | ||
| True if response was routed, False if no pending request | ||
| """ | ||
| resolver = self._pending_requests.pop(request_id, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] is it possible that on two requests sent (let's say 1 and 2), we get a response from 2 before 1? this seems to assume that this response is for the last request. is that a correct assumption?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's right. This is popping the value out of a dictionary based off of the unique request_id. Doesn't matter what order those responses come in at.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yeah that makes sense, I misread this
src/mcp/shared/session.py
Outdated
| # Type guard: this method is only called for responses/errors | ||
| if not isinstance(root, JSONRPCResponse | JSONRPCError): # pragma: no cover | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at diff only, this seems like a different behavior from before? this is likely intentional but I can't see how this is needed to be able to first check the response routers.
when I say by looking at diff only, before, this was the logic:
stream = self._response_streams.pop(response_id, None)
if stream: # pragma: no cover
await stream.send(root)
else: # pragma: no cover
await self._handle_incoming(RuntimeError(f"Received response with an unknown request ID: {message}"))
there was no isinstance check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was actually indirectly true in the previous code. When it was inside the _receive_loop function it was inside an if/elif/else block which narrowed the type from JSONRPCRequest | JSONRPCNotification | JSONRPCResponse | JSONRPCError to JSONRPCResponse | JSONRPCError (since the first if is for JSONRPCRequest and the second is for JSONRPCNotification.
But, since we're inside a function where the param is SessionMessage whose message field is still a union of al the previous types, not just the narrowed ones. So we need to re-narrow the types inside this function even though we know it was done previously.
The full message object is still needed for the final RuntimeError we create in case the message there's no stream for the message.
I've added a comment to make this more clear for future readers, do you think that's good or a refactor instead?
Test improvements: - Replace unreachable handler bodies with raise NotImplementedError - Use pragma: no branch for no-op message handlers (ellipsis bodies) - Remove dead code (unused helper functions) - Add try/finally blocks for stream cleanup with pragma: no cover - Simplify handler error paths to use assert instead of if/return - Remove unnecessary store.cleanup() calls after cancelled task groups Source improvements: - Add pragma: no cover for defensive _meta checks (unreachable code paths)
The MCP Tasks spec requires clients to poll tasks/get watching for status changes, then call tasks/result when status becomes input_required to receive elicitation/sampling requests. - Add poll_task() async iterator to ExperimentalClientFeatures that yields status on each poll and respects the server's pollInterval hint - Update simple-task-client to use poll_task() instead of manual loop - Update simple-task-interactive-client to poll first, then call tasks/result on input_required per the spec pattern
This implements the bidirectional task-augmented request pattern where the server can send task-augmented elicitation/sampling requests to the client, and the client can defer processing by returning CreateTaskResult. Key changes: - Add ExperimentalServerSessionFeatures with get_task(), get_task_result(), poll_task(), elicit_as_task(), and create_message_as_task() methods for server→client task operations - Add shared polling utility (poll_until_terminal) used by both client and server to avoid code duplication - Add elicit_as_task() and create_message_as_task() to ServerTaskContext for use inside task-augmented tool calls - Add capability checks for task-augmented elicitation/sampling in ServerSession.check_client_capability() - Add comprehensive tests for all four elicitation scenarios: 1. Normal tool call + normal elicitation 2. Normal tool call + task-augmented elicitation 3. Task-augmented tool call + normal elicitation 4. Task-augmented tool call + task-augmented elicitation The implementation correctly handles the complex bidirectional flow where the server polls the client while the client's tasks/result call is still blocking, waiting for the tool task to complete.
Move all task-related capability checking logic into mcp/shared/experimental/tasks/capabilities.py to keep tasks code isolated from core session code. Changes: - Create capabilities.py with check_tasks_capability() and require_* helpers - Update ServerSession to import and use the shared function - Update ServerTaskContext to use require_* helpers instead of inline checks - Add missing capability checks to ExperimentalServerSessionFeatures This improves code organization and fixes a bug where session.experimental.elicit_as_task() wasn't checking capabilities.
- Add test_capabilities.py with unit tests for all capability checking functions - Add tests for elicit_as_task and create_message_as_task without handler - Add scenario 4 sampling test (task-augmented tool call + task-augmented sampling) - Replace sleep-based polling with event-based synchronization for faster, deterministic tests - Simplify for/else patterns in test code - Add additional check_tasks_capability edge case tests Test coverage improved to 99.94% with 0 missing statements.
This refactoring ensures all sampling and elicitation code paths use consistent validation and support the same features. Sampling changes: - Add shared validation module (mcp/server/validation.py) with validate_sampling_tools() and validate_tool_use_result_messages() - Add tools and tool_choice parameters to all sampling methods: - _build_create_message_request() - ExperimentalServerSessionFeatures.create_message_as_task() - ServerTaskContext.create_message() - ServerTaskContext.create_message_as_task() - Refactor ServerSession.create_message() to use shared validation Elicitation changes: - Rename _build_elicit_request to _build_elicit_form_request for clarity - Add _build_elicit_url_request() for URL mode elicitation - Add ServerTaskContext.elicit_url() so URL elicitation can be used from inside task-augmented tool calls (e.g., for OAuth flows) This fixes a gap where task-augmented code paths were missing: - tools/tool_choice parameters for sampling - URL mode for elicitation
- Add tests for validation.py (check_sampling_tools_capability, validate_sampling_tools, validate_tool_use_result_messages) - Add flow test for elicit_url() in ServerTaskContext - Add pragma no cover comments to defensive _meta checks in builder methods (model_dump never includes _meta with current types) - Fix test code to use assertions instead of conditional branches - Add pragma no branch to polling loops in test scenarios
Extract tool-specific logic into separate handler functions, keeping the call_tool decorator handler simple - it just dispatches based on tool name and returns an error for unknown tools.
- Remove section header comments from test files - Move all inline imports to top of files - Replace hardcoded error codes (-32600) with INVALID_REQUEST constant - Replace arbitrary sleeps with polling loops for deterministic tests - Add pragma no branch to polling conditions that always succeed on first try
…vents - Remove redundant '# Should not raise' comments in test_capabilities.py - Remove redundant '# No handler' comments in test_server_task_context.py - Replace arbitrary sleeps with deterministic event-based synchronization in test_task_result_handler.py (poll for wait events before proceeding)
Completely rewrote the experimental tasks documentation to cover the new simplified API and advanced features: tasks.md (Overview): - Clear task lifecycle diagram - Bidirectional flow explanation (client↔server) - Key concepts (metadata, store, capabilities) - Quick example with new enable_tasks() + run_task() API tasks-server.md (Server Guide): - Quick start with enable_tasks() + run_task() - Tool declaration (TASK_REQUIRED/OPTIONAL/FORBIDDEN) - Status updates and progress - Elicitation within tasks (form and URL modes) - Sampling within tasks - Cancellation support - Custom task stores - HTTP transport example - Testing patterns - Best practices tasks-client.md (Client Guide): - Quick start with poll_task() iterator - Handling input_required status - Elicitation and sampling callbacks - Client as task receiver (advanced) - Client-side task handlers - Error handling patterns - Complete working examples
- Update TaskSession references to ServerTaskContext - Update task_execution() to run_task() - Fix result.taskSupport.taskId to result.task.taskId
Replace NotImplementedError with pass since task requests are handled earlier by _task_handlers. The catch-all satisfies pyright's exhaustiveness check while making it clear these cases are intentionally handled elsewhere.
src/mcp/client/session.py
Outdated
| return await responder.respond(types.ClientResult(root=types.EmptyResult())) | ||
|
|
||
| case _: # pragma: no cover | ||
| raise NotImplementedError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also yes we do unforunately need a pragma here. it's never hit because the list of types is exhausted, but we need a default case otherwise pyright errors
src/mcp/server/session.py
Outdated
|
|
||
| return True | ||
|
|
||
| def set_task_result_handler(self, handler: ResponseRouter) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all this does is wrap another function, do we need this? can't we just self.add_response_router directly somewhere
src/mcp/server/session.py
Outdated
| # ========================================================================= | ||
| # Request builders for task queueing (internal use) | ||
| # ========================================================================= | ||
| # | ||
| # These methods build JSON-RPC requests without sending them. They are used | ||
| # by TaskContext to construct requests that will be queued instead of sent | ||
| # directly, avoiding code duplication between ServerSession and TaskContext. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we please remove
| params=params_data, | ||
| ) | ||
|
|
||
| async def send_message(self, message: SessionMessage) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again just a wrapper, seems kind of redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed as there's no way to access _write_stream externally
src/mcp/shared/context.py
Outdated
| meta: RequestParams.Meta | None | ||
| session: SessionT | ||
| lifespan_context: LifespanContextT | ||
| experimental: Any = field(default=None) # Set to Experimental instance by Server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks a little fishy, should it really be Any or Experimental | None or something?
| self._response_routers = [] | ||
| self._exit_stack = AsyncExitStack() | ||
|
|
||
| def add_response_router(self, router: ResponseRouter) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another 1 line wrapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm what would you recommend instead? I'd rather external code not access private fields and instead provide a way users should use this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair, didn't realize it was being passed through elsewhere
src/mcp/types.py
Outdated
|
|
||
| taskId: str | None = None | ||
| """Deprecated: Use the `tasks/cancel` request instead of this notification for task cancellation.""" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like the typescript thing we had where we had a deprecated field - I think this needs to be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
tests/client/test_stdio.py
Outdated
| Test basic parent-child process cleanup. | ||
| Parent spawns a single child process that writes continuously to a file. | ||
| """ | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this necessary to make coverage pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
tests/client/test_stdio.py
Outdated
| return | ||
| # Create temporary files for each process level | ||
| with tempfile.NamedTemporaryFile(mode="w", delete=False) as f1: | ||
| parent_file = f1.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks very strange - why are we returning before a statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| - Low-Level Server: low-level-server.md | ||
| - Authorization: authorization.md | ||
| - Testing: testing.md | ||
| - Experimental: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we didn't add anything about tasks to the README - should we at least point people to these docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
- Remove taskId from CancelledNotificationParams (removed in spec PR #1833) - Update requestId docstring with MUST/MUST NOT requirements from spec - Add missing docstrings for TaskMetadata, RelatedTaskMetadata.taskId, and Task.pollInterval to match schema.ts
- Remove set_task_result_handler wrapper method and its test - Remove internal comment block for request builders - Mark add_response_router as experimental in docstring - Add experimental tasks documentation link to README - Add comment explaining why experimental field uses Any type (circular import: mcp.server.__init__ -> fastmcp -> context)
Rename variables in the two demo sections to be more descriptive and avoid reusing the same names, making the example easier to follow.
SEP-1686: Tasks Implementation
This PR implements the experimental Tasks feature from the MCP specification, enabling asynchronous request handling with polling-based result retrieval.
Summary
Tasks allow servers to handle long-running operations asynchronously. Instead of blocking until completion, the server creates a task, returns immediately, and the client polls for status and retrieves results when ready.
Key Features
Server-side:
server.experimental.enable_tasks()- One-line setup that registers all task handlersctx.experimental.run_task(work)- Simplified pattern for spawning background workServerTaskContextwithelicit(),elicit_url(),create_message()for user interactionTaskStoreinterface for production deploymentsClient-side:
session.experimental.call_tool_as_task()- Call tools with task augmentationsession.experimental.poll_task()- Async iterator for polling until terminal statesession.experimental.get_task_result()- Retrieve final resultsExperimentalTaskHandlers- Handle task-augmented requests from serversBidirectional Flow:
Implementation Details
Core Components
TaskStoreInMemoryTaskStoreTaskMessageQueuetasks/resultTaskResultHandlertasks/resultwith dequeue-send-wait patternServerTaskContextExperimentalServerSessionFeaturesExperimentalClientFeaturesTask Lifecycle
Validation
mcp/server/validation.py) for sampling tools and message structuremcp/shared/experimental/tasks/capabilities.pyTesting
Documentation
Comprehensive documentation rewritten from scratch:
docs/experimental/tasks.md- Overview, lifecycle, conceptsdocs/experimental/tasks-server.md- Server implementation guidedocs/experimental/tasks-client.md- Client usage guideExamples
Two example servers demonstrating tasks:
examples/servers/simple-task/- Basic task with status updatesexamples/servers/simple-task-interactive/- Tasks with elicitation and samplingBreaking Changes
None - all task functionality is under the
experimentalnamespace.Types of changes
Checklist