Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 30 additions & 94 deletions codex-rs/core/src/tools/handlers/multi_agents_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ async fn multi_agent_v2_send_message_rejects_structured_items() {
}

#[tokio::test]
async fn multi_agent_v2_send_message_interrupts_busy_child_without_triggering_turn() {
async fn multi_agent_v2_send_message_rejects_interrupt_parameter() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
Expand Down Expand Up @@ -1043,107 +1043,43 @@ async fn multi_agent_v2_send_message_interrupts_busy_child_without_triggering_tu
.resolve_agent_reference(session.conversation_id, &turn.session_source, "worker")
.await
.expect("worker should resolve");
let thread = manager
.get_thread(agent_id)
.await
.expect("worker thread should exist");

let active_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.spawn_task(
Arc::clone(&active_turn),
vec![UserInput::Text {
text: "working".to_string(),
text_elements: Vec::new(),
}],
NeverEndingTask,
)
.await;
let invocation = invocation(
session,
turn,
"send_message",
function_payload(json!({
"target": agent_id.to_string(),
"items": [{"type": "text", "text": "continue"}],
"interrupt": true
})),
);

SendMessageHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
"send_message",
function_payload(json!({
"target": agent_id.to_string(),
"items": [{"type": "text", "text": "continue"}],
"interrupt": true
})),
))
.await
.expect("interrupting v2 send_message should succeed");
let Err(err) = SendMessageHandlerV2.handle(invocation).await else {
panic!("send_message interrupt parameter should be rejected");
};
let FunctionCallError::RespondToModel(message) = err else {
panic!("expected model-facing parse error");
};
assert!(message.starts_with(
"failed to parse function arguments: unknown field `interrupt`, expected `target` or `items`"
));

let ops = manager.captured_ops();
let ops_for_agent: Vec<&Op> = ops
.iter()
.filter_map(|(id, op)| (*id == agent_id).then_some(op))
.collect();
assert!(ops_for_agent.iter().any(|op| matches!(op, Op::Interrupt)));
assert!(ops_for_agent.iter().any(|op| {
matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == AgentPath::root()
&& communication.recipient.as_str() == "/root/worker"
&& communication.other_recipients.is_empty()
&& communication.content == "continue"
&& !communication.trigger_turn
)
}));

timeout(Duration::from_secs(5), async {
loop {
if !thread.codex.session.has_pending_input().await {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
let history_items = thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
let saw_envelope = history_contains_inter_agent_communication(
&history_items,
&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ false,
),
);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "user"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::InputText { text } if text == "continue"
))
)
});
if saw_envelope && !saw_user_message {
panic!("send_message should not materialize the envelope into history");
}
if !saw_user_message {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("interrupting v2 send_message should queue the redirected message without a turn");

let _ = thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
assert!(!ops_for_agent.iter().any(|op| matches!(op, Op::Interrupt)));
assert!(!ops_for_agent.iter().any(|op| matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == AgentPath::root()
&& communication.recipient.as_str() == "/root/worker"
&& communication.other_recipients.is_empty()
&& communication.content == "continue"
&& !communication.trigger_turn
)));
}

#[tokio::test]
Expand Down
12 changes: 11 additions & 1 deletion codex-rs/core/src/tools/handlers/multi_agents_v2/assign_task.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::message_tool::AssignTaskArgs;
use super::message_tool::MessageDeliveryMode;
use super::message_tool::MessageToolResult;
use super::message_tool::handle_message_tool;
Expand All @@ -18,6 +19,15 @@ impl ToolHandler for Handler {
}

async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
handle_message_tool(invocation, MessageDeliveryMode::TriggerTurn).await
let arguments = function_arguments(invocation.payload.clone())?;
let args: AssignTaskArgs = parse_arguments(&arguments)?;
handle_message_tool(
invocation,
MessageDeliveryMode::TriggerTurn,
args.target,
args.items,
args.interrupt,
)
.await
}
}
25 changes: 18 additions & 7 deletions codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,17 @@ impl MessageDeliveryMode {
}

#[derive(Debug, Deserialize)]
/// Input shared by the MultiAgentV2 `send_message` and `assign_task` tools.
pub(crate) struct MessageToolArgs {
#[serde(deny_unknown_fields)]
/// Input for the MultiAgentV2 `send_message` tool.
pub(crate) struct SendMessageArgs {
pub(crate) target: String,
pub(crate) items: Vec<UserInput>,
}

#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
/// Input for the MultiAgentV2 `assign_task` tool.
pub(crate) struct AssignTaskArgs {
pub(crate) target: String,
pub(crate) items: Vec<UserInput>,
#[serde(default)]
Expand Down Expand Up @@ -95,6 +104,9 @@ fn text_content(
pub(crate) async fn handle_message_tool(
invocation: ToolInvocation,
mode: MessageDeliveryMode,
target: String,
items: Vec<UserInput>,
interrupt: bool,
) -> Result<MessageToolResult, FunctionCallError> {
let ToolInvocation {
session,
Expand All @@ -103,16 +115,15 @@ pub(crate) async fn handle_message_tool(
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: MessageToolArgs = parse_arguments(&arguments)?;
let receiver_thread_id = resolve_agent_target(&session, &turn, &args.target).await?;
let prompt = text_content(&args.items, mode)?;
let _ = payload;
let receiver_thread_id = resolve_agent_target(&session, &turn, &target).await?;
let prompt = text_content(&items, mode)?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(receiver_thread_id)
.unwrap_or_default();
if args.interrupt {
if interrupt {
session
.services
.agent_control
Expand Down
12 changes: 11 additions & 1 deletion codex-rs/core/src/tools/handlers/multi_agents_v2/send_message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::message_tool::MessageDeliveryMode;
use super::message_tool::MessageToolResult;
use super::message_tool::SendMessageArgs;
use super::message_tool::handle_message_tool;
use super::*;

Expand All @@ -18,6 +19,15 @@ impl ToolHandler for Handler {
}

async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
handle_message_tool(invocation, MessageDeliveryMode::QueueOnly).await
let arguments = function_arguments(invocation.payload.clone())?;
let args: SendMessageArgs = parse_arguments(&arguments)?;
handle_message_tool(
invocation,
MessageDeliveryMode::QueueOnly,
args.target,
args.items,
/*interrupt*/ false,
)
.await
}
}
1 change: 1 addition & 0 deletions codex-rs/core/src/tools/spec_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ fn test_build_specs_multi_agent_v2_uses_task_names_and_hides_resume() {
panic!("send_message should use object params");
};
assert!(properties.contains_key("target"));
assert!(!properties.contains_key("interrupt"));
assert!(!properties.contains_key("message"));
assert_eq!(
required.as_ref(),
Expand Down
11 changes: 1 addition & 10 deletions codex-rs/tools/src/agent_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,11 @@ pub fn create_send_message_tool() -> ToolSpec {
},
),
("items".to_string(), create_collab_input_items_schema()),
(
"interrupt".to_string(),
JsonSchema::Boolean {
description: Some(
"When true, stop the agent's current task and handle this immediately. When false (default), queue this message."
.to_string(),
),
},
),
]);

ToolSpec::Function(ResponsesApiTool {
name: "send_message".to_string(),
description: "Add a message to an existing agent without triggering a new turn. Use interrupt=true to stop the current task first. In MultiAgentV2, this tool currently supports text content only."
description: "Add a message to an existing agent without triggering a new turn. In MultiAgentV2, this tool currently supports text content only."
.to_string(),
strict: false,
defer_loading: None,
Expand Down
1 change: 1 addition & 0 deletions codex-rs/tools/src/agent_tool_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ fn send_message_tool_requires_items_and_uses_submission_output() {
};
assert!(properties.contains_key("target"));
assert!(properties.contains_key("items"));
assert!(!properties.contains_key("interrupt"));
assert!(!properties.contains_key("message"));
assert_eq!(
required,
Expand Down
Loading