diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index a5dd0d92fc..39992e8391 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -90,9 +90,8 @@ use codex_login::ShutdownHandle; use codex_login::run_login_server; use codex_protocol::ConversationId; use codex_protocol::config_types::ForcedLoginMethod; -use codex_protocol::models::ContentItem; +use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::InputMessageKind; use codex_protocol::protocol::RateLimitSnapshot; use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::user_input::UserInput as CoreInputItem; @@ -940,18 +939,9 @@ impl CodexMessageProcessor { }, )) .await; - let initial_messages = session_configured.initial_messages.map(|msgs| { - msgs.into_iter() - .filter(|event| { - // Don't send non-plain user messages (like user instructions - // or environment context) back so they don't get rendered. - if let EventMsg::UserMessage(user_message) = event { - return matches!(user_message.kind, Some(InputMessageKind::Plain)); - } - true - }) - .collect() - }); + let initial_messages = session_configured + .initial_messages + .map(|msgs| msgs.into_iter().collect()); // Reply with conversation id + model and initial messages (when present) let response = codex_app_server_protocol::ResumeConversationResponse { @@ -1596,18 +1586,8 @@ fn extract_conversation_summary( let preview = head .iter() .filter_map(|value| serde_json::from_value::(value.clone()).ok()) - .find_map(|item| match item { - ResponseItem::Message { content, .. } => { - content.into_iter().find_map(|content| match content { - ContentItem::InputText { text } => { - match InputMessageKind::from(("user", &text)) { - InputMessageKind::Plain => Some(text), - _ => None, - } - } - _ => None, - }) - } + .find_map(|item| match codex_core::parse_turn_item(&item) { + Some(TurnItem::UserMessage(user)) => Some(user.message()), _ => None, })?; diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index 27731f8430..30b90f6e4a 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -30,7 +30,6 @@ use codex_protocol::config_types::SandboxMode; use codex_protocol::parse_command::ParsedCommand; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::InputMessageKind; use pretty_assertions::assert_eq; use std::env; use tempfile::TempDir; @@ -528,43 +527,6 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() { .expect("sendUserTurn 2 timeout") .expect("sendUserTurn 2 resp"); - let mut env_message: Option = None; - let second_cwd_str = second_cwd.to_string_lossy().into_owned(); - for _ in 0..10 { - let notification = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/user_message"), - ) - .await - .expect("user_message timeout") - .expect("user_message notification"); - let params = notification - .params - .clone() - .expect("user_message should include params"); - let event: Event = serde_json::from_value(params).expect("deserialize user_message event"); - if let EventMsg::UserMessage(user) = event.msg - && matches!(user.kind, Some(InputMessageKind::EnvironmentContext)) - && user.message.contains(&second_cwd_str) - { - env_message = Some(user.message); - break; - } - } - let env_message = env_message.expect("expected environment context update"); - assert!( - env_message.contains("danger-full-access"), - "env context should reflect new sandbox mode: {env_message}" - ); - assert!( - env_message.contains("enabled"), - "env context should enable network access for danger-full-access policy: {env_message}" - ); - assert!( - env_message.contains(&second_cwd_str), - "env context should include updated cwd: {env_message}" - ); - let exec_begin_notification = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("codex/event/exec_command_begin"), diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index aeea012948..27937a48d0 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -104,10 +104,10 @@ pub(crate) async fn stream_chat_completions( } = item { let mut text = String::new(); - for c in items { - match c { - ReasoningItemContent::ReasoningText { text: t } - | ReasoningItemContent::Text { text: t } => text.push_str(t), + for entry in items { + match entry { + ReasoningItemContent::ReasoningText { text: segment } + | ReasoningItemContent::Text { text: segment } => text.push_str(segment), } } if text.trim().is_empty() { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 785d48ec58..f0b2f61b68 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -7,12 +7,11 @@ use std::sync::atomic::AtomicU64; use crate::AuthManager; use crate::client_common::REVIEW_PROMPT; -use crate::event_mapping::map_response_item_to_event_messages; use crate::function_tool::FunctionCallError; use crate::mcp::auth::McpAuthStatusEntry; use crate::parse_command::parse_command; +use crate::parse_turn_item; use crate::review_format::format_review_findings_block; -use crate::state::ItemCollector; use crate::terminal; use crate::user_notification::UserNotifier; use async_channel::Receiver; @@ -20,9 +19,10 @@ use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; use codex_protocol::ConversationId; use codex_protocol::items::TurnItem; -use codex_protocol::items::UserMessageItem; use codex_protocol::protocol::ConversationPathResponseEvent; use codex_protocol::protocol::ExitedReviewModeEvent; +use codex_protocol::protocol::ItemCompletedEvent; +use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; @@ -270,7 +270,6 @@ pub(crate) struct TurnContext { pub(crate) is_review_mode: bool, pub(crate) final_output_json_schema: Option, pub(crate) codex_linux_sandbox_exe: Option, - pub(crate) item_collector: ItemCollector, } impl TurnContext { @@ -359,7 +358,6 @@ impl Session { provider: ModelProviderInfo, session_configuration: &SessionConfiguration, conversation_id: ConversationId, - tx_event: Sender, sub_id: String, ) -> TurnContext { let config = session_configuration.original_config_do_not_use.clone(); @@ -394,8 +392,6 @@ impl Session { features: &config.features, }); - let item_collector = ItemCollector::new(tx_event, conversation_id, sub_id.clone()); - TurnContext { sub_id, client, @@ -409,7 +405,6 @@ impl Session { is_review_mode: false, final_output_json_schema: None, codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), - item_collector, } } @@ -665,7 +660,6 @@ impl Session { session_configuration.provider.clone(), &session_configuration, self.conversation_id, - self.get_tx_event(), sub_id, ); if let Some(final_schema) = updates.final_output_json_schema { @@ -710,6 +704,59 @@ impl Session { } } + async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) { + self.send_event( + turn_context, + EventMsg::ItemStarted(ItemStartedEvent { + thread_id: self.conversation_id, + turn_id: turn_context.sub_id.clone(), + item: item.clone(), + }), + ) + .await; + } + + async fn emit_turn_item_completed( + &self, + turn_context: &TurnContext, + item: TurnItem, + emit_raw_agent_reasoning: bool, + ) { + self.send_event( + turn_context, + EventMsg::ItemCompleted(ItemCompletedEvent { + thread_id: self.conversation_id, + turn_id: turn_context.sub_id.clone(), + item: item.clone(), + }), + ) + .await; + self.emit_turn_item_legacy_events(turn_context, &item, emit_raw_agent_reasoning) + .await; + } + + async fn emit_turn_item_started_completed( + &self, + turn_context: &TurnContext, + item: TurnItem, + emit_raw_agent_reasoning: bool, + ) { + self.emit_turn_item_started(turn_context, &item).await; + self.emit_turn_item_completed(turn_context, item, emit_raw_agent_reasoning) + .await; + } + + async fn emit_turn_item_legacy_events( + &self, + turn_context: &TurnContext, + item: &TurnItem, + emit_raw_agent_reasoning: bool, + ) { + for event in item.as_legacy_events(emit_raw_agent_reasoning) { + self.send_event(turn_context, event).await; + } + } + /// Emit an exec approval request event and await the user's decision. /// /// The request is keyed by `sub_id`/`call_id` so matching responses are delivered @@ -946,24 +993,22 @@ impl Session { /// Record a user input item to conversation history and also persist a /// corresponding UserMessage EventMsg to rollout. - async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) { + async fn record_input_and_rollout_usermsg( + &self, + turn_context: &TurnContext, + response_input: &ResponseInputItem, + ) { let response_item: ResponseItem = response_input.clone().into(); // Add to conversation history and persist response item to rollout self.record_conversation_items(std::slice::from_ref(&response_item)) .await; // Derive user message events and persist only UserMessage to rollout - let msgs = - map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning()); - let user_msgs: Vec = msgs - .into_iter() - .filter_map(|m| match m { - EventMsg::UserMessage(ev) => Some(RolloutItem::EventMsg(EventMsg::UserMessage(ev))), - _ => None, - }) - .collect(); - if !user_msgs.is_empty() { - self.persist_rollout_items(&user_msgs).await; + let turn_item = parse_turn_item(&response_item); + + if let Some(item @ TurnItem::UserMessage(_)) = turn_item { + self.emit_turn_item_started_completed(turn_context, item, false) + .await; } } @@ -1158,19 +1203,8 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv { sess.record_conversation_items(std::slice::from_ref(&env_item)) .await; - for msg in map_response_item_to_event_messages( - &env_item, - sess.show_raw_agent_reasoning(), - ) { - sess.send_event(¤t_context, msg).await; - } } - current_context - .item_collector - .started_completed(TurnItem::UserMessage(UserMessageItem::new(&items))) - .await; - sess.spawn_task(Arc::clone(¤t_context), items, RegularTask) .await; previous_context = Some(current_context); @@ -1444,11 +1478,6 @@ async fn spawn_review_thread( is_review_mode: true, final_output_json_schema: None, codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(), - item_collector: ItemCollector::new( - sess.get_tx_event(), - sess.conversation_id, - sub_id.to_string(), - ), }; // Seed the child task with the review prompt as the initial user message. @@ -1506,7 +1535,7 @@ pub(crate) async fn run_task( review_thread_history.extend(sess.build_initial_context(turn_context.as_ref())); review_thread_history.push(initial_input_for_turn.into()); } else { - sess.record_input_and_rollout_usermsg(&initial_input_for_turn) + sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn) .await; } @@ -2023,9 +2052,10 @@ async fn try_run_turn( } Ok(None) => { let response = handle_non_tool_response_item( - Arc::clone(&sess), + sess.as_ref(), Arc::clone(&turn_context), item.clone(), + sess.show_raw_agent_reasoning(), ) .await?; add_completed(ProcessedResponseItem { item, response }); @@ -2144,9 +2174,10 @@ async fn try_run_turn( } async fn handle_non_tool_response_item( - sess: Arc, + sess: &Session, turn_context: Arc, item: ResponseItem, + show_raw_agent_reasoning: bool, ) -> CodexResult> { debug!(?item, "Output item"); @@ -2154,15 +2185,20 @@ async fn handle_non_tool_response_item( ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } | ResponseItem::WebSearchCall { .. } => { - let msgs = match &item { + let turn_item = match &item { ResponseItem::Message { .. } if turn_context.is_review_mode => { trace!("suppressing assistant Message in review mode"); - Vec::new() + None } - _ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning()), + _ => parse_turn_item(&item), }; - for msg in msgs { - sess.send_event(&turn_context, msg).await; + if let Some(turn_item) = turn_item { + sess.emit_turn_item_started_completed( + turn_context.as_ref(), + turn_item, + show_raw_agent_reasoning, + ) + .await; } } ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => { @@ -2649,7 +2685,6 @@ mod tests { session_configuration.provider.clone(), &session_configuration, conversation_id, - tx_event.clone(), "turn_id".to_string(), ); @@ -2718,7 +2753,6 @@ mod tests { session_configuration.provider.clone(), &session_configuration, conversation_id, - tx_event.clone(), "turn_id".to_string(), )); diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index bbc1b97648..7125fc4d0b 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -11,13 +11,13 @@ use crate::protocol::AgentMessageEvent; use crate::protocol::CompactedItem; use crate::protocol::ErrorEvent; use crate::protocol::EventMsg; -use crate::protocol::InputMessageKind; use crate::protocol::TaskStartedEvent; use crate::protocol::TurnContextItem; use crate::state::TaskKind; use crate::truncate::truncate_middle; use crate::util::backoff; use askama::Template; +use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; @@ -181,23 +181,13 @@ pub fn content_items_to_text(content: &[ContentItem]) -> Option { pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec { items .iter() - .filter_map(|item| match item { - ResponseItem::Message { role, content, .. } if role == "user" => { - content_items_to_text(content) - } + .filter_map(|item| match crate::event_mapping::parse_turn_item(item) { + Some(TurnItem::UserMessage(user)) => Some(user.message()), _ => None, }) - .filter(|text| !is_session_prefix_message(text)) .collect() } -pub fn is_session_prefix_message(text: &str) -> bool { - matches!( - InputMessageKind::from(("user", text)), - InputMessageKind::UserInstructions | InputMessageKind::EnvironmentContext - ) -} - pub(crate) fn build_compacted_history( initial_context: Vec, user_messages: &[String], @@ -319,21 +309,16 @@ mod tests { ResponseItem::Message { id: Some("user".to_string()), role: "user".to_string(), - content: vec![ - ContentItem::InputText { - text: "first".to_string(), - }, - ContentItem::OutputText { - text: "second".to_string(), - }, - ], + content: vec![ContentItem::InputText { + text: "first".to_string(), + }], }, ResponseItem::Other, ]; let collected = collect_user_messages(&items); - assert_eq!(vec!["first\nsecond".to_string()], collected); + assert_eq!(vec!["first".to_string()], collected); } #[test] diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index aeb0780725..a30038f2eb 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -3,8 +3,6 @@ use crate::CodexAuth; use crate::codex::Codex; use crate::codex::CodexSpawnOk; use crate::codex::INITIAL_SUBMIT_ID; -use crate::codex::compact::content_items_to_text; -use crate::codex::compact::is_session_prefix_message; use crate::codex_conversation::CodexConversation; use crate::config::Config; use crate::error::CodexErr; @@ -14,6 +12,7 @@ use crate::protocol::EventMsg; use crate::protocol::SessionConfiguredEvent; use crate::rollout::RolloutRecorder; use codex_protocol::ConversationId; +use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::RolloutItem; @@ -182,9 +181,11 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia // Find indices of user message inputs in rollout order. let mut user_positions: Vec = Vec::new(); for (idx, item) in items.iter().enumerate() { - if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = item - && role == "user" - && content_items_to_text(content).is_some_and(|text| !is_session_prefix_message(&text)) + if let RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) = item + && matches!( + crate::event_mapping::parse_turn_item(item), + Some(TurnItem::UserMessage(_)) + ) { user_positions.push(idx); } diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index cbbf1f444f..0c41951ee5 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -1,139 +1,131 @@ -use crate::protocol::AgentMessageEvent; -use crate::protocol::AgentReasoningEvent; -use crate::protocol::AgentReasoningRawContentEvent; -use crate::protocol::EventMsg; -use crate::protocol::InputMessageKind; -use crate::protocol::UserMessageEvent; -use crate::protocol::WebSearchEndEvent; +use codex_protocol::items::AgentMessageContent; +use codex_protocol::items::AgentMessageItem; +use codex_protocol::items::ReasoningItem; +use codex_protocol::items::TurnItem; +use codex_protocol::items::UserMessageItem; +use codex_protocol::items::WebSearchItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseItem; use codex_protocol::models::WebSearchAction; +use codex_protocol::user_input::UserInput; +use tracing::warn; -/// Convert a `ResponseItem` into zero or more `EventMsg` values that the UI can render. -/// -/// When `show_raw_agent_reasoning` is false, raw reasoning content events are omitted. -pub(crate) fn map_response_item_to_event_messages( - item: &ResponseItem, - show_raw_agent_reasoning: bool, -) -> Vec { - match item { - ResponseItem::Message { role, content, .. } => { - // Do not surface system messages as user events. - if role == "system" { - return Vec::new(); - } +fn is_session_prefix(text: &str) -> bool { + let trimmed = text.trim_start(); + let lowered = trimmed.to_ascii_lowercase(); + lowered.starts_with("") || lowered.starts_with("") +} + +fn parse_user_message(message: &[ContentItem]) -> Option { + let mut content: Vec = Vec::new(); - let mut events: Vec = Vec::new(); - let mut message_parts: Vec = Vec::new(); - let mut images: Vec = Vec::new(); - let mut kind: Option = None; - - for content_item in content.iter() { - match content_item { - ContentItem::InputText { text } => { - if kind.is_none() { - let trimmed = text.trim_start(); - kind = if trimmed.starts_with("") { - Some(InputMessageKind::EnvironmentContext) - } else if trimmed.starts_with("") { - Some(InputMessageKind::UserInstructions) - } else { - Some(InputMessageKind::Plain) - }; - } - message_parts.push(text.clone()); - } - ContentItem::InputImage { image_url } => { - images.push(image_url.clone()); - } - ContentItem::OutputText { text } => { - events.push(EventMsg::AgentMessage(AgentMessageEvent { - message: text.clone(), - })); - } + for content_item in message.iter() { + match content_item { + ContentItem::InputText { text } => { + if is_session_prefix(text) { + return None; } + content.push(UserInput::Text { text: text.clone() }); } - - if !message_parts.is_empty() || !images.is_empty() { - let message = if message_parts.is_empty() { - String::new() - } else { - message_parts.join("") - }; - let images = if images.is_empty() { - None - } else { - Some(images) - }; - - events.push(EventMsg::UserMessage(UserMessageEvent { - message, - kind, - images, - })); + ContentItem::InputImage { image_url } => { + content.push(UserInput::Image { + image_url: image_url.clone(), + }); + } + ContentItem::OutputText { text } => { + if is_session_prefix(text) { + return None; + } + warn!("Output text in user message: {}", text); } - - events } + } - ResponseItem::Reasoning { - summary, content, .. - } => { - let mut events = Vec::new(); - for ReasoningItemReasoningSummary::SummaryText { text } in summary { - events.push(EventMsg::AgentReasoning(AgentReasoningEvent { - text: text.clone(), - })); + Some(UserMessageItem::new(&content)) +} + +fn parse_agent_message(message: &[ContentItem]) -> AgentMessageItem { + let mut content: Vec = Vec::new(); + for content_item in message.iter() { + match content_item { + ContentItem::OutputText { text } => { + content.push(AgentMessageContent::Text { text: text.clone() }); } - if let Some(items) = content.as_ref().filter(|_| show_raw_agent_reasoning) { - for c in items { - let text = match c { - ReasoningItemContent::ReasoningText { text } - | ReasoningItemContent::Text { text } => text, - }; - events.push(EventMsg::AgentReasoningRawContent( - AgentReasoningRawContentEvent { text: text.clone() }, - )); - } + _ => { + warn!( + "Unexpected content item in agent message: {:?}", + content_item + ); } - events } + } + AgentMessageItem::new(&content) +} - ResponseItem::WebSearchCall { id, action, .. } => match action { - WebSearchAction::Search { query } => { - let call_id = id.clone().unwrap_or_else(|| "".to_string()); - vec![EventMsg::WebSearchEnd(WebSearchEndEvent { - call_id, - query: query.clone(), - })] - } - WebSearchAction::Other => Vec::new(), +pub fn parse_turn_item(item: &ResponseItem) -> Option { + match item { + ResponseItem::Message { role, content, .. } => match role.as_str() { + "user" => parse_user_message(content).map(TurnItem::UserMessage), + "assistant" => Some(TurnItem::AgentMessage(parse_agent_message(content))), + "system" => None, + _ => None, }, - - // Variants that require side effects are handled by higher layers and do not emit events here. - ResponseItem::FunctionCall { .. } - | ResponseItem::FunctionCallOutput { .. } - | ResponseItem::LocalShellCall { .. } - | ResponseItem::CustomToolCall { .. } - | ResponseItem::CustomToolCallOutput { .. } - | ResponseItem::Other => Vec::new(), + ResponseItem::Reasoning { + id, + summary, + content, + .. + } => { + let summary_text = summary + .iter() + .map(|entry| match entry { + ReasoningItemReasoningSummary::SummaryText { text } => text.clone(), + }) + .collect(); + let raw_content = content + .clone() + .unwrap_or_default() + .into_iter() + .map(|entry| match entry { + ReasoningItemContent::ReasoningText { text } + | ReasoningItemContent::Text { text } => text, + }) + .collect(); + Some(TurnItem::Reasoning(ReasoningItem { + id: id.clone(), + summary_text, + raw_content, + })) + } + ResponseItem::WebSearchCall { + id, + action: WebSearchAction::Search { query }, + .. + } => Some(TurnItem::WebSearch(WebSearchItem { + id: id.clone().unwrap_or_default(), + query: query.clone(), + })), + _ => None, } } #[cfg(test)] mod tests { - use super::map_response_item_to_event_messages; - use crate::protocol::EventMsg; - use crate::protocol::InputMessageKind; - use assert_matches::assert_matches; + use super::parse_turn_item; + use codex_protocol::items::AgentMessageContent; + use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; + use codex_protocol::models::ReasoningItemContent; + use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseItem; + use codex_protocol::models::WebSearchAction; + use codex_protocol::user_input::UserInput; use pretty_assertions::assert_eq; #[test] - fn maps_user_message_with_text_and_two_images() { + fn parses_user_message_with_text_and_two_images() { let img1 = "https://example.com/one.png".to_string(); let img2 = "https://example.com/two.jpg".to_string(); @@ -153,16 +145,128 @@ mod tests { ], }; - let events = map_response_item_to_event_messages(&item, false); - assert_eq!(events.len(), 1, "expected a single user message event"); + let turn_item = parse_turn_item(&item).expect("expected user message turn item"); + + match turn_item { + TurnItem::UserMessage(user) => { + let expected_content = vec![ + UserInput::Text { + text: "Hello world".to_string(), + }, + UserInput::Image { image_url: img1 }, + UserInput::Image { image_url: img2 }, + ]; + assert_eq!(user.content, expected_content); + } + other => panic!("expected TurnItem::UserMessage, got {other:?}"), + } + } + + #[test] + fn parses_agent_message() { + let item = ResponseItem::Message { + id: Some("msg-1".to_string()), + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "Hello from Codex".to_string(), + }], + }; + + let turn_item = parse_turn_item(&item).expect("expected agent message turn item"); + + match turn_item { + TurnItem::AgentMessage(message) => { + let Some(AgentMessageContent::Text { text }) = message.content.first() else { + panic!("expected agent message text content"); + }; + assert_eq!(text, "Hello from Codex"); + } + other => panic!("expected TurnItem::AgentMessage, got {other:?}"), + } + } + + #[test] + fn parses_reasoning_summary_and_raw_content() { + let item = ResponseItem::Reasoning { + id: "reasoning_1".to_string(), + summary: vec![ + ReasoningItemReasoningSummary::SummaryText { + text: "Step 1".to_string(), + }, + ReasoningItemReasoningSummary::SummaryText { + text: "Step 2".to_string(), + }, + ], + content: Some(vec![ReasoningItemContent::ReasoningText { + text: "raw details".to_string(), + }]), + encrypted_content: None, + }; + + let turn_item = parse_turn_item(&item).expect("expected reasoning turn item"); + + match turn_item { + TurnItem::Reasoning(reasoning) => { + assert_eq!( + reasoning.summary_text, + vec!["Step 1".to_string(), "Step 2".to_string()] + ); + assert_eq!(reasoning.raw_content, vec!["raw details".to_string()]); + } + other => panic!("expected TurnItem::Reasoning, got {other:?}"), + } + } + + #[test] + fn parses_reasoning_including_raw_content() { + let item = ResponseItem::Reasoning { + id: "reasoning_2".to_string(), + summary: vec![ReasoningItemReasoningSummary::SummaryText { + text: "Summarized step".to_string(), + }], + content: Some(vec![ + ReasoningItemContent::ReasoningText { + text: "raw step".to_string(), + }, + ReasoningItemContent::Text { + text: "final thought".to_string(), + }, + ]), + encrypted_content: None, + }; + + let turn_item = parse_turn_item(&item).expect("expected reasoning turn item"); + + match turn_item { + TurnItem::Reasoning(reasoning) => { + assert_eq!(reasoning.summary_text, vec!["Summarized step".to_string()]); + assert_eq!( + reasoning.raw_content, + vec!["raw step".to_string(), "final thought".to_string()] + ); + } + other => panic!("expected TurnItem::Reasoning, got {other:?}"), + } + } + + #[test] + fn parses_web_search_call() { + let item = ResponseItem::WebSearchCall { + id: Some("ws_1".to_string()), + status: Some("completed".to_string()), + action: WebSearchAction::Search { + query: "weather".to_string(), + }, + }; + + let turn_item = parse_turn_item(&item).expect("expected web search turn item"); - match &events[0] { - EventMsg::UserMessage(user) => { - assert_eq!(user.message, "Hello world"); - assert_matches!(user.kind, Some(InputMessageKind::Plain)); - assert_eq!(user.images, Some(vec![img1, img2])); + match turn_item { + TurnItem::WebSearch(search) => { + assert_eq!(search.id, "ws_1"); + assert_eq!(search.query, "weather"); } - other => panic!("expected UserMessage, got {other:?}"), + other => panic!("expected TurnItem::WebSearch, got {other:?}"), } } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index fa307b1ea7..5e5b4e44a9 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -98,11 +98,10 @@ pub use client_common::REVIEW_PROMPT; pub use client_common::ResponseEvent; pub use client_common::ResponseStream; pub use codex::compact::content_items_to_text; -pub use codex::compact::is_session_prefix_message; pub use codex_protocol::models::ContentItem; pub use codex_protocol::models::LocalShellAction; pub use codex_protocol::models::LocalShellExecAction; pub use codex_protocol::models::LocalShellStatus; -pub use codex_protocol::models::ReasoningItemContent; pub use codex_protocol::models::ResponseItem; +pub use event_mapping::parse_turn_item; pub mod otel_init; diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index b1f34e3a9c..617e5189c2 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -24,7 +24,6 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::InputMessageKind; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionMeta; @@ -543,7 +542,6 @@ async fn test_tail_includes_last_response_items() -> Result<()> { timestamp: ts.to_string(), item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), - kind: Some(InputMessageKind::Plain), images: None, })), }; @@ -627,7 +625,6 @@ async fn test_tail_handles_short_sessions() -> Result<()> { timestamp: ts.to_string(), item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: "hi".into(), - kind: Some(InputMessageKind::Plain), images: None, })), }; @@ -712,7 +709,6 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> { timestamp: ts.to_string(), item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), - kind: Some(InputMessageKind::Plain), images: None, })), }; diff --git a/codex-rs/core/src/state/item_collector.rs b/codex-rs/core/src/state/item_collector.rs deleted file mode 100644 index bd1c4329d8..0000000000 --- a/codex-rs/core/src/state/item_collector.rs +++ /dev/null @@ -1,68 +0,0 @@ -use async_channel::Sender; -use codex_protocol::ConversationId; -use codex_protocol::items::TurnItem; -use codex_protocol::protocol::Event; -use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::ItemCompletedEvent; -use codex_protocol::protocol::ItemStartedEvent; -use tracing::error; - -#[derive(Debug)] -pub(crate) struct ItemCollector { - thread_id: ConversationId, - turn_id: String, - tx_event: Sender, -} - -impl ItemCollector { - pub fn new( - tx_event: Sender, - thread_id: ConversationId, - turn_id: String, - ) -> ItemCollector { - ItemCollector { - tx_event, - thread_id, - turn_id, - } - } - - pub async fn started(&self, item: TurnItem) { - let err = self - .tx_event - .send(Event { - id: self.turn_id.clone(), - msg: EventMsg::ItemStarted(ItemStartedEvent { - thread_id: self.thread_id, - turn_id: self.turn_id.clone(), - item, - }), - }) - .await; - if let Err(e) = err { - error!("failed to send item started event: {e}"); - } - } - - pub async fn completed(&self, item: TurnItem) { - let err = self - .tx_event - .send(Event { - id: self.turn_id.clone(), - msg: EventMsg::ItemCompleted(ItemCompletedEvent { - thread_id: self.thread_id, - turn_id: self.turn_id.clone(), - item, - }), - }) - .await; - if let Err(e) = err { - error!("failed to send item completed event: {e}"); - } - } - - pub async fn started_completed(&self, item: TurnItem) { - self.started(item.clone()).await; - self.completed(item).await; - } -} diff --git a/codex-rs/core/src/state/mod.rs b/codex-rs/core/src/state/mod.rs index 7ba5f37083..642433a786 100644 --- a/codex-rs/core/src/state/mod.rs +++ b/codex-rs/core/src/state/mod.rs @@ -1,9 +1,7 @@ -mod item_collector; mod service; mod session; mod turn; -pub(crate) use item_collector::ItemCollector; pub(crate) use service::SessionServices; pub(crate) use session::SessionState; pub(crate) use turn::ActiveTurn; diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs index 0d0d60d47f..f1a4527628 100644 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ b/codex-rs/core/tests/chat_completions_payload.rs @@ -8,12 +8,12 @@ use codex_core::LocalShellStatus; use codex_core::ModelClient; use codex_core::ModelProviderInfo; use codex_core::Prompt; -use codex_core::ReasoningItemContent; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::ConversationId; +use codex_protocol::models::ReasoningItemContent; use core_test_support::load_default_config_for_test; use futures::StreamExt; use serde_json::Value; diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index dffc9e4213..5a3768567a 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -13,6 +13,7 @@ use codex_core::WireApi; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::ConversationId; +use codex_protocol::models::ReasoningItemContent; use core_test_support::load_default_config_for_test; use futures::StreamExt; use tempfile::TempDir; @@ -142,8 +143,8 @@ fn assert_reasoning(item: &ResponseItem, expected: &str) { let mut combined = String::new(); for part in parts { match part { - codex_core::ReasoningItemContent::ReasoningText { text } - | codex_core::ReasoningItemContent::Text { text } => combined.push_str(text), + ReasoningItemContent::ReasoningText { text } + | ReasoningItemContent::Text { text } => combined.push_str(text), } } assert_eq!(combined, expected); diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index a8a777ae4f..a4703d9e60 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -167,6 +167,56 @@ pub fn ev_assistant_message(id: &str, text: &str) -> Value { }) } +pub fn ev_reasoning_item(id: &str, summary: &[&str], raw_content: &[&str]) -> Value { + let summary_entries: Vec = summary + .iter() + .map(|text| serde_json::json!({"type": "summary_text", "text": text})) + .collect(); + + let mut event = serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "reasoning", + "id": id, + "summary": summary_entries, + } + }); + + if !raw_content.is_empty() { + let content_entries: Vec = raw_content + .iter() + .map(|text| serde_json::json!({"type": "reasoning_text", "text": text})) + .collect(); + event["item"]["content"] = Value::Array(content_entries); + } + + event +} + +pub fn ev_web_search_call_added(id: &str, status: &str, query: &str) -> Value { + serde_json::json!({ + "type": "response.output_item.added", + "item": { + "type": "web_search_call", + "id": id, + "status": status, + "action": {"type": "search", "query": query} + } + }) +} + +pub fn ev_web_search_call_done(id: &str, status: &str, query: &str) -> Value { + serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "web_search_call", + "id": id, + "status": status, + "action": {"type": "search", "query": query} + } + }) +} + pub fn ev_function_call(call_id: &str, name: &str, arguments: &str) -> Value { serde_json::json!({ "type": "response.output_item.done", diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index ed16cdb35d..3034c70974 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -9,7 +9,6 @@ use codex_core::ModelClient; use codex_core::ModelProviderInfo; use codex_core::NewConversation; use codex_core::Prompt; -use codex_core::ReasoningItemContent; use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::WireApi; @@ -21,6 +20,7 @@ use codex_core::protocol::Op; use codex_core::protocol::SessionSource; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::ConversationId; +use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::WebSearchAction; use codex_protocol::user_input::UserInput; diff --git a/codex-rs/core/tests/suite/fork_conversation.rs b/codex-rs/core/tests/suite/fork_conversation.rs index 28f5d0175d..da2ff8c3f6 100644 --- a/codex-rs/core/tests/suite/fork_conversation.rs +++ b/codex-rs/core/tests/suite/fork_conversation.rs @@ -1,17 +1,15 @@ use codex_core::CodexAuth; -use codex_core::ContentItem; use codex_core::ConversationManager; use codex_core::ModelProviderInfo; use codex_core::NewConversation; -use codex_core::ResponseItem; use codex_core::built_in_model_providers; -use codex_core::content_items_to_text; -use codex_core::is_session_prefix_message; +use codex_core::parse_turn_item; use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; +use codex_protocol::items::TurnItem; use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::skip_if_no_network; @@ -115,19 +113,12 @@ async fn fork_conversation_twice_drops_to_first_message() { let find_user_input_positions = |items: &[RolloutItem]| -> Vec { let mut pos = Vec::new(); for (i, it) in items.iter().enumerate() { - if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = it - && role == "user" - && content_items_to_text(content) - .is_some_and(|text| !is_session_prefix_message(&text)) + if let RolloutItem::ResponseItem(response_item) = it + && let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item) { // Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem. // We specifically look for input items, which are represented as ContentItem::InputText. - if content - .iter() - .any(|c| matches!(c, ContentItem::InputText { .. })) - { - pos.push(i); - } + pos.push(i); } } pos diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index a64f51187a..13c95003e0 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -2,12 +2,18 @@ use anyhow::Ok; use codex_core::protocol::EventMsg; +use codex_core::protocol::ItemCompletedEvent; +use codex_core::protocol::ItemStartedEvent; use codex_core::protocol::Op; use codex_protocol::items::TurnItem; use codex_protocol::user_input::UserInput; -use core_test_support::responses; +use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_response_created; +use core_test_support::responses::ev_web_search_call_added; +use core_test_support::responses::ev_web_search_call_done; +use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; @@ -26,7 +32,7 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { let TestCodex { codex, .. } = test_codex().build(&server).await?; let first_response = sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]); - responses::mount_sse_once_match(&server, any(), first_response).await; + mount_sse_once_match(&server, any(), first_response).await; codex .submit(Op::UserInput { @@ -36,21 +42,23 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { }) .await?; - let started = wait_for_event_match(&codex, |ev| match ev { - EventMsg::ItemStarted(e) => Some(e.clone()), + let started_item = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::UserMessage(item), + .. + }) => Some(item.clone()), _ => None, }) .await; - - let completed = wait_for_event_match(&codex, |ev| match ev { - EventMsg::ItemCompleted(e) => Some(e.clone()), + let completed_item = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::UserMessage(item), + .. + }) => Some(item.clone()), _ => None, }) .await; - let TurnItem::UserMessage(started_item) = started.item; - let TurnItem::UserMessage(completed_item) = completed.item; - assert_eq!(started_item.id, completed_item.id); assert_eq!( started_item.content, @@ -66,3 +74,163 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { ); Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn assistant_message_item_is_emitted() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let first_response = sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "all done"), + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), first_response).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "please summarize results".into(), + }], + }) + .await?; + + let started = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!(started.id, completed.id); + let Some(codex_protocol::items::AgentMessageContent::Text { text }) = completed.content.first() + else { + panic!("expected agent message text content"); + }; + assert_eq!(text, "all done"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn reasoning_item_is_emitted() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let reasoning_item = ev_reasoning_item( + "reasoning-1", + &["Consider inputs", "Compute output"], + &["Detailed reasoning trace"], + ); + + let first_response = sse(vec![ + ev_response_created("resp-1"), + reasoning_item, + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), first_response).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "explain your reasoning".into(), + }], + }) + .await?; + + let started = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::Reasoning(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Reasoning(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!(started.id, completed.id); + assert_eq!( + completed.summary_text, + vec!["Consider inputs".to_string(), "Compute output".to_string()] + ); + assert_eq!( + completed.raw_content, + vec!["Detailed reasoning trace".to_string()] + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn web_search_item_is_emitted() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let web_search_added = + ev_web_search_call_added("web-search-1", "in_progress", "weather seattle"); + let web_search_done = ev_web_search_call_done("web-search-1", "completed", "weather seattle"); + + let first_response = sse(vec![ + ev_response_created("resp-1"), + web_search_added, + web_search_done, + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), first_response).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "find the weather".into(), + }], + }) + .await?; + + let started = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::WebSearch(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::WebSearch(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!(started.id, completed.id); + assert_eq!(completed.query, "weather seattle"); + + Ok(()) +} diff --git a/codex-rs/core/tests/suite/resume.rs b/codex-rs/core/tests/suite/resume.rs index 9b1707a05e..3a902b6b6f 100644 --- a/codex-rs/core/tests/suite/resume.rs +++ b/codex-rs/core/tests/suite/resume.rs @@ -4,6 +4,7 @@ use codex_core::protocol::Op; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_response_created; use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; @@ -62,3 +63,59 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let mut builder = test_codex().with_config(|config| { + config.show_raw_agent_reasoning = true; + }); + let initial = builder.build(&server).await?; + let codex = Arc::clone(&initial.codex); + let home = initial.home.clone(); + let rollout_path = initial.session_configured.rollout_path.clone(); + + let initial_sse = sse(vec![ + ev_response_created("resp-initial"), + ev_reasoning_item("reason-1", &["Summarized step"], &["raw detail"]), + ev_assistant_message("msg-1", "Completed reasoning turn"), + ev_completed("resp-initial"), + ]); + mount_sse_once_match(&server, any(), initial_sse).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "Record reasoning messages".into(), + }], + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + + let resumed = builder.resume(&server, home, rollout_path).await?; + let initial_messages = resumed + .session_configured + .initial_messages + .expect("expected initial messages to be present for resumed session"); + match initial_messages.as_slice() { + [ + EventMsg::UserMessage(first_user), + EventMsg::TokenCount(_), + EventMsg::AgentReasoning(reasoning), + EventMsg::AgentReasoningRawContent(raw), + EventMsg::AgentMessage(assistant_message), + EventMsg::TokenCount(_), + ] => { + assert_eq!(first_user.message, "Record reasoning messages"); + assert_eq!(reasoning.text, "Summarized step"); + assert_eq!(raw.text, "raw detail"); + assert_eq!(assistant_message.message, "Completed reasoning turn"); + } + other => panic!("unexpected initial messages after resume: {other:#?}"), + } + + Ok(()) +} diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index e1efc9525b..27a9b94743 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -1,3 +1,9 @@ +use crate::protocol::AgentMessageEvent; +use crate::protocol::AgentReasoningEvent; +use crate::protocol::AgentReasoningRawContentEvent; +use crate::protocol::EventMsg; +use crate::protocol::UserMessageEvent; +use crate::protocol::WebSearchEndEvent; use crate::user_input::UserInput; use schemars::JsonSchema; use serde::Deserialize; @@ -7,6 +13,9 @@ use ts_rs::TS; #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub enum TurnItem { UserMessage(UserMessageItem), + AgentMessage(AgentMessageItem), + Reasoning(ReasoningItem), + WebSearch(WebSearchItem), } #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] @@ -15,6 +24,31 @@ pub struct UserMessageItem { pub content: Vec, } +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub enum AgentMessageContent { + Text { text: String }, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct AgentMessageItem { + pub id: String, + pub content: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct ReasoningItem { + pub id: String, + pub summary_text: Vec, + #[serde(default)] + pub raw_content: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct WebSearchItem { + pub id: String, + pub query: String, +} + impl UserMessageItem { pub fn new(content: &[UserInput]) -> Self { Self { @@ -22,12 +56,104 @@ impl UserMessageItem { content: content.to_vec(), } } + + pub fn as_legacy_event(&self) -> EventMsg { + EventMsg::UserMessage(UserMessageEvent { + message: self.message(), + images: Some(self.image_urls()), + }) + } + + pub fn message(&self) -> String { + self.content + .iter() + .map(|c| match c { + UserInput::Text { text } => text.clone(), + _ => String::new(), + }) + .collect::>() + .join("") + } + + pub fn image_urls(&self) -> Vec { + self.content + .iter() + .filter_map(|c| match c { + UserInput::Image { image_url } => Some(image_url.clone()), + _ => None, + }) + .collect() + } +} + +impl AgentMessageItem { + pub fn new(content: &[AgentMessageContent]) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + content: content.to_vec(), + } + } + + pub fn as_legacy_events(&self) -> Vec { + self.content + .iter() + .map(|c| match c { + AgentMessageContent::Text { text } => EventMsg::AgentMessage(AgentMessageEvent { + message: text.clone(), + }), + }) + .collect() + } +} + +impl ReasoningItem { + pub fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + let mut events = Vec::new(); + for summary in &self.summary_text { + events.push(EventMsg::AgentReasoning(AgentReasoningEvent { + text: summary.clone(), + })); + } + + if show_raw_agent_reasoning { + for entry in &self.raw_content { + events.push(EventMsg::AgentReasoningRawContent( + AgentReasoningRawContentEvent { + text: entry.clone(), + }, + )); + } + } + + events + } +} + +impl WebSearchItem { + pub fn as_legacy_event(&self) -> EventMsg { + EventMsg::WebSearchEnd(WebSearchEndEvent { + call_id: self.id.clone(), + query: self.query.clone(), + }) + } } impl TurnItem { pub fn id(&self) -> String { match self { TurnItem::UserMessage(item) => item.id.clone(), + TurnItem::AgentMessage(item) => item.id.clone(), + TurnItem::Reasoning(item) => item.id.clone(), + TurnItem::WebSearch(item) => item.id.clone(), + } + } + + pub fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + match self { + TurnItem::UserMessage(item) => vec![item.as_legacy_event()], + TurnItem::AgentMessage(item) => item.as_legacy_events(), + TurnItem::WebSearch(item) => vec![item.as_legacy_event()], + TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning), } } } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 11747c949f..105f028049 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -770,69 +770,13 @@ pub struct AgentMessageEvent { pub message: String, } -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] -#[serde(rename_all = "snake_case")] -pub enum InputMessageKind { - /// Plain user text (default) - Plain, - /// XML-wrapped user instructions (...) - UserInstructions, - /// XML-wrapped environment context (...) - EnvironmentContext, -} - #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct UserMessageEvent { pub message: String, #[serde(skip_serializing_if = "Option::is_none")] - pub kind: Option, - #[serde(skip_serializing_if = "Option::is_none")] pub images: Option>, } -impl From<(T, U)> for InputMessageKind -where - T: AsRef, - U: AsRef, -{ - fn from(value: (T, U)) -> Self { - let (_role, message) = value; - let message = message.as_ref(); - let trimmed = message.trim(); - if starts_with_ignore_ascii_case(trimmed, ENVIRONMENT_CONTEXT_OPEN_TAG) - && ends_with_ignore_ascii_case(trimmed, ENVIRONMENT_CONTEXT_CLOSE_TAG) - { - InputMessageKind::EnvironmentContext - } else if starts_with_ignore_ascii_case(trimmed, USER_INSTRUCTIONS_OPEN_TAG) - && ends_with_ignore_ascii_case(trimmed, USER_INSTRUCTIONS_CLOSE_TAG) - { - InputMessageKind::UserInstructions - } else { - InputMessageKind::Plain - } - } -} - -fn starts_with_ignore_ascii_case(text: &str, prefix: &str) -> bool { - let text_bytes = text.as_bytes(); - let prefix_bytes = prefix.as_bytes(); - text_bytes.len() >= prefix_bytes.len() - && text_bytes - .iter() - .zip(prefix_bytes.iter()) - .all(|(a, b)| a.eq_ignore_ascii_case(b)) -} - -fn ends_with_ignore_ascii_case(text: &str, suffix: &str) -> bool { - let text_bytes = text.as_bytes(); - let suffix_bytes = suffix.as_bytes(); - text_bytes.len() >= suffix_bytes.len() - && text_bytes[text_bytes.len() - suffix_bytes.len()..] - .iter() - .zip(suffix_bytes.iter()) - .all(|(a, b)| a.eq_ignore_ascii_case(b)) -} - #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct AgentMessageDeltaEvent { pub delta: String, diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 3f4de8d355..c1a6dd6769 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -23,7 +23,6 @@ use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExitedReviewModeEvent; -use codex_core::protocol::InputMessageKind; use codex_core::protocol::ListCustomPromptsResponseEvent; use codex_core::protocol::McpListToolsResponseEvent; use codex_core::protocol::McpToolCallBeginEvent; @@ -1553,17 +1552,9 @@ impl ChatWidget { } fn on_user_message_event(&mut self, event: UserMessageEvent) { - match event.kind { - Some(InputMessageKind::EnvironmentContext) - | Some(InputMessageKind::UserInstructions) => { - // Skip XML‑wrapped context blocks in the transcript. - } - Some(InputMessageKind::Plain) | None => { - let message = event.message.trim(); - if !message.is_empty() { - self.add_to_history(history_cell::new_user_prompt(message.to_string())); - } - } + let message = event.message.trim(); + if !message.is_empty() { + self.add_to_history(history_cell::new_user_prompt(message.to_string())); } } diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 9af5588bb5..c28815ea18 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -23,7 +23,6 @@ use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExitedReviewModeEvent; use codex_core::protocol::FileChange; -use codex_core::protocol::InputMessageKind; use codex_core::protocol::Op; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; @@ -104,7 +103,6 @@ fn resumed_initial_messages_render_history() { initial_messages: Some(vec![ EventMsg::UserMessage(UserMessageEvent { message: "hello from user".to_string(), - kind: Some(InputMessageKind::Plain), images: None, }), EventMsg::AgentMessage(AgentMessageEvent { diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index eb2a9cd4ea..b735ff9374 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -10,6 +10,7 @@ use codex_core::ConversationsPage; use codex_core::Cursor; use codex_core::INTERACTIVE_SESSION_SOURCES; use codex_core::RolloutRecorder; +use codex_protocol::items::TurnItem; use color_eyre::eyre::Result; use crossterm::event::KeyCode; use crossterm::event::KeyEvent; @@ -30,10 +31,7 @@ use crate::text_formatting::truncate_text; use crate::tui::FrameRequester; use crate::tui::Tui; use crate::tui::TuiEvent; -use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::InputMessageKind; -use codex_protocol::protocol::USER_MESSAGE_BEGIN; const PAGE_SIZE: usize = 25; const LOAD_NEAR_THRESHOLD: usize = 5; @@ -616,37 +614,8 @@ fn extract_timestamp(value: &serde_json::Value) -> Option> { fn preview_from_head(head: &[serde_json::Value]) -> Option { head.iter() .filter_map(|value| serde_json::from_value::(value.clone()).ok()) - .find_map(|item| match item { - ResponseItem::Message { content, .. } => { - // Find the actual user message (as opposed to user instructions or ide context) - let preview = content - .into_iter() - .filter_map(|content| match content { - ContentItem::InputText { text } - if matches!( - InputMessageKind::from(("user", text.as_str())), - InputMessageKind::Plain - ) => - { - // Strip ide context. - let text = match text.find(USER_MESSAGE_BEGIN) { - Some(idx) => { - text[idx + USER_MESSAGE_BEGIN.len()..].trim().to_string() - } - None => text, - }; - Some(text) - } - _ => None, - }) - .collect::(); - - if preview.is_empty() { - None - } else { - Some(preview) - } - } + .find_map(|item| match codex_core::parse_turn_item(&item) { + Some(TurnItem::UserMessage(user)) => Some(user.message()), _ => None, }) } @@ -999,6 +968,19 @@ mod tests { "role": "user", "content": [ { "type": "input_text", "text": "hi" }, + ] + }), + json!({ + "type": "message", + "role": "user", + "content": [ + { "type": "input_text", "text": "..." }, + ] + }), + json!({ + "type": "message", + "role": "user", + "content": [ { "type": "input_text", "text": "real question" }, { "type": "input_image", "image_url": "ignored" } ]