Skip to content

Commit bee9520

Browse files
abhipatel12ruomengz
authored andcommitted
feat(core): implement model-driven parallel tool scheduler (#21933)
1 parent 993fdf7 commit bee9520

File tree

10 files changed

+187
-33
lines changed

10 files changed

+187
-33
lines changed

packages/core/src/agents/subagent-tool-wrapper.test.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,19 @@ describe('SubagentToolWrapper', () => {
103103

104104
expect(schema.name).toBe(mockDefinition.name);
105105
expect(schema.description).toBe(mockDefinition.description);
106-
expect(schema.parametersJsonSchema).toEqual(
107-
mockDefinition.inputConfig.inputSchema,
108-
);
106+
expect(schema.parametersJsonSchema).toEqual({
107+
...(mockDefinition.inputConfig.inputSchema as Record<string, unknown>),
108+
properties: {
109+
...((
110+
mockDefinition.inputConfig.inputSchema as Record<string, unknown>
111+
)['properties'] as Record<string, unknown>),
112+
wait_for_previous: {
113+
type: 'boolean',
114+
description:
115+
'Set to true to wait for all previously requested tools in this turn to complete before starting. Set to false (or omit) to run in parallel. Use true when this tool depends on the output of previous tools.',
116+
},
117+
},
118+
});
109119
});
110120
});
111121

packages/core/src/core/__snapshots__/prompts.test.ts.snap

Lines changed: 38 additions & 19 deletions
Large diffs are not rendered by default.

packages/core/src/prompts/snippets.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,8 @@ export function renderOperationalGuidelines(
355355
- **Security First:** Always apply security best practices. Never introduce code that exposes, logs, or commits secrets, API keys, or other sensitive information.
356356
357357
## Tool Usage
358-
- **Parallelism:** Execute multiple independent tool calls in parallel when feasible (i.e. searching the codebase).
358+
- **Parallelism & Sequencing:** Tools execute in parallel by default. Execute multiple independent tool calls in parallel when feasible (e.g., searching, reading files, independent shell commands, or editing *different* files). If a tool depends on the output or side-effects of a previous tool in the same turn (e.g., running a shell command that depends on the success of a previous command), you MUST set the \`wait_for_previous\` parameter to \`true\` on the dependent tool to ensure sequential execution.
359+
- **File Editing Collisions:** Do NOT make multiple calls to the ${formatToolName(EDIT_TOOL_NAME)} tool for the SAME file in a single turn. To make multiple edits to the same file, you MUST perform them sequentially across multiple conversational turns to prevent race conditions and ensure the file state is accurate before each edit.
359360
- **Command Execution:** Use the ${formatToolName(SHELL_TOOL_NAME)} tool for running shell commands, remembering the safety rule to explain modifying commands first.${toolUsageInteractive(
360361
options.interactive,
361362
options.interactiveShellEnabled,

packages/core/src/scheduler/scheduler.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ describe('Scheduler (Orchestrator)', () => {
134134
const req2: ToolCallRequestInfo = {
135135
callId: 'call-2',
136136
name: 'test-tool',
137-
args: { foo: 'baz' },
137+
args: { foo: 'baz', wait_for_previous: true },
138138
isClientInitiated: false,
139139
prompt_id: 'prompt-1',
140140
schedulerId: ROOT_SCHEDULER_ID,

packages/core/src/scheduler/scheduler.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import { PolicyDecision, type ApprovalMode } from '../policy/types.js';
2929
import {
3030
ToolConfirmationOutcome,
3131
type AnyDeclarativeTool,
32-
Kind,
3332
} from '../tools/tools.js';
3433
import { getToolSuggestion } from '../utils/tool-utils.js';
3534
import { runInDevTraceSpan } from '../telemetry/trace.js';
@@ -434,10 +433,10 @@ export class Scheduler {
434433
}
435434

436435
// If the first tool is parallelizable, batch all contiguous parallelizable tools.
437-
if (this._isParallelizable(next.tool)) {
436+
if (this._isParallelizable(next.request)) {
438437
while (this.state.queueLength > 0) {
439438
const peeked = this.state.peekQueue();
440-
if (peeked && this._isParallelizable(peeked.tool)) {
439+
if (peeked && this._isParallelizable(peeked.request)) {
441440
this.state.dequeue();
442441
} else {
443442
break;
@@ -522,9 +521,16 @@ export class Scheduler {
522521
return false;
523522
}
524523

525-
private _isParallelizable(tool?: AnyDeclarativeTool): boolean {
526-
if (!tool) return false;
527-
return tool.isReadOnly || tool.kind === Kind.Agent;
524+
private _isParallelizable(request: ToolCallRequestInfo): boolean {
525+
if (request.args) {
526+
const wait = request.args['wait_for_previous'];
527+
if (typeof wait === 'boolean') {
528+
return !wait;
529+
}
530+
}
531+
532+
// Default to parallel if the flag is omitted.
533+
return true;
528534
}
529535

530536
private async _processValidatingCall(

packages/core/src/scheduler/scheduler_parallel.test.ts

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ describe('Scheduler Parallel Execution', () => {
119119
const req3: ToolCallRequestInfo = {
120120
callId: 'call-3',
121121
name: 'write-tool',
122-
args: { path: 'c.txt', content: 'hi' },
122+
args: { path: 'c.txt', content: 'hi', wait_for_previous: true },
123123
isClientInitiated: false,
124124
prompt_id: 'p1',
125125
schedulerId: ROOT_SCHEDULER_ID,
@@ -505,4 +505,50 @@ describe('Scheduler Parallel Execution', () => {
505505
const start1 = executionLog.indexOf('start-call-1');
506506
expect(start1).toBeGreaterThan(end3);
507507
});
508+
509+
it('should execute non-read-only tools in parallel if wait_for_previous is false', async () => {
510+
const executionLog: string[] = [];
511+
mockExecutor.execute.mockImplementation(async ({ call }) => {
512+
const id = call.request.callId;
513+
executionLog.push(`start-${id}`);
514+
await new Promise<void>((resolve) => setTimeout(resolve, 10));
515+
executionLog.push(`end-${id}`);
516+
return {
517+
status: 'success',
518+
response: { callId: id, responseParts: [] },
519+
} as unknown as SuccessfulToolCall;
520+
});
521+
522+
const w1 = { ...req3, callId: 'w1', args: { wait_for_previous: false } };
523+
const w2 = { ...req3, callId: 'w2', args: { wait_for_previous: false } };
524+
525+
await scheduler.schedule([w1, w2], signal);
526+
527+
expect(executionLog.slice(0, 2)).toContain('start-w1');
528+
expect(executionLog.slice(0, 2)).toContain('start-w2');
529+
});
530+
531+
it('should execute read-only tools sequentially if wait_for_previous is true', async () => {
532+
const executionLog: string[] = [];
533+
mockExecutor.execute.mockImplementation(async ({ call }) => {
534+
const id = call.request.callId;
535+
executionLog.push(`start-${id}`);
536+
await new Promise<void>((resolve) => setTimeout(resolve, 10));
537+
executionLog.push(`end-${id}`);
538+
return {
539+
status: 'success',
540+
response: { callId: id, responseParts: [] },
541+
} as unknown as SuccessfulToolCall;
542+
});
543+
544+
const r1 = { ...req1, callId: 'r1', args: { wait_for_previous: false } };
545+
const r2 = { ...req1, callId: 'r2', args: { wait_for_previous: true } };
546+
547+
await scheduler.schedule([r1, r2], signal);
548+
549+
expect(executionLog[0]).toBe('start-r1');
550+
expect(executionLog[1]).toBe('end-r1');
551+
expect(executionLog[2]).toBe('start-r2');
552+
expect(executionLog[3]).toBe('end-r2');
553+
});
508554
});

packages/core/src/tools/mcp-client.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,11 @@ describe('mcp-client', () => {
752752
param1: {
753753
$ref: '#/$defs/MyType',
754754
},
755+
wait_for_previous: {
756+
type: 'boolean',
757+
description:
758+
'Set to true to wait for all previously requested tools in this turn to complete before starting. Set to false (or omit) to run in parallel. Use true when this tool depends on the output of previous tools.',
759+
},
755760
},
756761
$defs: {
757762
MyType: {

packages/core/src/tools/mcp-tool.test.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,17 @@ describe('DiscoveredMCPTool', () => {
150150
);
151151
expect(tool.schema.description).toBe(baseDescription);
152152
expect(tool.schema.parameters).toBeUndefined();
153-
expect(tool.schema.parametersJsonSchema).toEqual(inputSchema);
153+
expect(tool.schema.parametersJsonSchema).toEqual({
154+
...inputSchema,
155+
properties: {
156+
...(inputSchema['properties'] as Record<string, unknown>),
157+
wait_for_previous: {
158+
type: 'boolean',
159+
description:
160+
'Set to true to wait for all previously requested tools in this turn to complete before starting. Set to false (or omit) to run in parallel. Use true when this tool depends on the output of previous tools.',
161+
},
162+
},
163+
});
154164
expect(tool.serverToolName).toBe(serverToolName);
155165
});
156166
});

packages/core/src/tools/tool-registry.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,11 @@ describe('ToolRegistry', () => {
541541
type: 'string',
542542
format: 'uuid',
543543
},
544+
wait_for_previous: {
545+
type: 'boolean',
546+
description:
547+
'Set to true to wait for all previously requested tools in this turn to complete before starting. Set to false (or omit) to run in parallel. Use true when this tool depends on the output of previous tools.',
548+
},
544549
},
545550
});
546551
});

packages/core/src/tools/tools.ts

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type { ShellExecutionConfig } from '../services/shellExecutionService.js'
1111
import { SchemaValidator } from '../utils/schemaValidator.js';
1212
import type { AnsiOutput } from '../utils/terminalSerializer.js';
1313
import type { MessageBus } from '../confirmation-bus/message-bus.js';
14+
import { isRecord } from '../utils/markdownUtils.js';
1415
import { randomUUID } from 'node:crypto';
1516
import {
1617
MessageBusType,
@@ -394,6 +395,15 @@ export interface ToolBuilder<
394395
build(params: TParams): ToolInvocation<TParams, TResult>;
395396
}
396397

398+
/**
399+
* Represents the expected JSON Schema structure for tool parameters.
400+
*/
401+
export interface ToolParameterSchema {
402+
type: string;
403+
properties?: unknown;
404+
[key: string]: unknown;
405+
}
406+
397407
/**
398408
* New base class for tools that separates validation from execution.
399409
* New tools should extend this class.
@@ -428,7 +438,49 @@ export abstract class DeclarativeTool<
428438
return {
429439
name: this.name,
430440
description: this.description,
431-
parametersJsonSchema: this.parameterSchema,
441+
parametersJsonSchema: this.addWaitForPreviousParameter(
442+
this.parameterSchema,
443+
),
444+
};
445+
}
446+
447+
/**
448+
* Type guard to check if an unknown value represents a ToolParameterSchema object.
449+
*/
450+
private isParameterSchema(obj: unknown): obj is ToolParameterSchema {
451+
return isRecord(obj) && 'type' in obj;
452+
}
453+
454+
/**
455+
* Adds the `wait_for_previous` parameter to the tool's schema.
456+
* This allows the model to explicitly control parallel vs sequential execution.
457+
*/
458+
private addWaitForPreviousParameter(schema: unknown): unknown {
459+
if (!this.isParameterSchema(schema) || schema.type !== 'object') {
460+
return schema;
461+
}
462+
463+
const props = schema.properties;
464+
let propertiesObj: Record<string, unknown> = {};
465+
466+
if (props !== undefined) {
467+
if (!isRecord(props)) {
468+
// properties exists but is not an object, so it's a malformed schema.
469+
return schema;
470+
}
471+
propertiesObj = props;
472+
}
473+
474+
return {
475+
...schema,
476+
properties: {
477+
...propertiesObj,
478+
wait_for_previous: {
479+
type: 'boolean',
480+
description:
481+
'Set to true to wait for all previously requested tools in this turn to complete before starting. Set to false (or omit) to run in parallel. Use true when this tool depends on the output of previous tools.',
482+
},
483+
},
432484
};
433485
}
434486

0 commit comments

Comments
 (0)