From b9dc0906f2a9e438efda53fcf11f6aa80d9b5ebc Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 20 Oct 2025 16:44:50 -0700 Subject: [PATCH 1/9] Add new thread items and rewire event parsing to use them --- .../app-server/src/codex_message_processor.rs | 32 +- .../suite/codex_message_processor_flow.rs | 38 --- codex-rs/core/src/chat_completions.rs | 8 +- codex-rs/core/src/codex.rs | 55 +--- codex-rs/core/src/codex/compact.rs | 29 +- codex-rs/core/src/conversation_manager.rs | 11 +- codex-rs/core/src/event_mapping.rs | 298 ++++++++++++------ codex-rs/core/src/lib.rs | 3 +- codex-rs/core/src/rollout/tests.rs | 4 - codex-rs/core/src/state/item_collector.rs | 26 +- .../core/tests/chat_completions_payload.rs | 2 +- codex-rs/core/tests/chat_completions_sse.rs | 5 +- codex-rs/core/tests/suite/client.rs | 2 +- .../core/tests/suite/fork_conversation.rs | 19 +- codex-rs/core/tests/suite/items.rs | 8 +- codex-rs/protocol/src/items.rs | 131 ++++++++ codex-rs/protocol/src/protocol.rs | 56 ---- codex-rs/tui/src/chatwidget.rs | 15 +- codex-rs/tui/src/chatwidget/tests.rs | 2 - codex-rs/tui/src/resume_picker.rs | 37 +-- 20 files changed, 407 insertions(+), 374 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 04cfbcfda2..1e94f02988 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -87,9 +87,8 @@ use codex_login::ShutdownHandle; use codex_login::run_login_server; use codex_protocol::ConversationId; use codex_protocol::config_types::ForcedLoginMethod; -use codex_protocol::models::ContentItem; +use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::InputMessageKind; use codex_protocol::protocol::RateLimitSnapshot; use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::user_input::UserInput as CoreInputItem; @@ -883,18 +882,9 @@ impl CodexMessageProcessor { }, )) .await; - let initial_messages = session_configured.initial_messages.map(|msgs| { - msgs.into_iter() - .filter(|event| { - // Don't send non-plain user messages (like user instructions - // or environment context) back so they don't get rendered. - if let EventMsg::UserMessage(user_message) = event { - return matches!(user_message.kind, Some(InputMessageKind::Plain)); - } - true - }) - .collect() - }); + let initial_messages = session_configured + .initial_messages + .map(|msgs| msgs.into_iter().collect()); // Reply with conversation id + model and initial messages (when present) let response = codex_app_server_protocol::ResumeConversationResponse { @@ -1541,18 +1531,8 @@ fn extract_conversation_summary( let preview = head .iter() .filter_map(|value| serde_json::from_value::(value.clone()).ok()) - .find_map(|item| match item { - ResponseItem::Message { content, .. } => { - content.into_iter().find_map(|content| match content { - ContentItem::InputText { text } => { - match InputMessageKind::from(("user", &text)) { - InputMessageKind::Plain => Some(text), - _ => None, - } - } - _ => None, - }) - } + .find_map(|item| match codex_core::parse_turn_item(&item) { + Some(TurnItem::UserMessage(user)) => Some(user.message()), _ => None, })?; diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index 27731f8430..30b90f6e4a 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -30,7 +30,6 @@ use codex_protocol::config_types::SandboxMode; use codex_protocol::parse_command::ParsedCommand; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::InputMessageKind; use pretty_assertions::assert_eq; use std::env; use tempfile::TempDir; @@ -528,43 +527,6 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() { .expect("sendUserTurn 2 timeout") .expect("sendUserTurn 2 resp"); - let mut env_message: Option = None; - let second_cwd_str = second_cwd.to_string_lossy().into_owned(); - for _ in 0..10 { - let notification = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/user_message"), - ) - .await - .expect("user_message timeout") - .expect("user_message notification"); - let params = notification - .params - .clone() - .expect("user_message should include params"); - let event: Event = serde_json::from_value(params).expect("deserialize user_message event"); - if let EventMsg::UserMessage(user) = event.msg - && matches!(user.kind, Some(InputMessageKind::EnvironmentContext)) - && user.message.contains(&second_cwd_str) - { - env_message = Some(user.message); - break; - } - } - let env_message = env_message.expect("expected environment context update"); - assert!( - env_message.contains("danger-full-access"), - "env context should reflect new sandbox mode: {env_message}" - ); - assert!( - env_message.contains("enabled"), - "env context should enable network access for danger-full-access policy: {env_message}" - ); - assert!( - env_message.contains(&second_cwd_str), - "env context should include updated cwd: {env_message}" - ); - let exec_begin_notification = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("codex/event/exec_command_begin"), diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index aeea012948..27937a48d0 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -104,10 +104,10 @@ pub(crate) async fn stream_chat_completions( } = item { let mut text = String::new(); - for c in items { - match c { - ReasoningItemContent::ReasoningText { text: t } - | ReasoningItemContent::Text { text: t } => text.push_str(t), + for entry in items { + match entry { + ReasoningItemContent::ReasoningText { text: segment } + | ReasoningItemContent::Text { text: segment } => text.push_str(segment), } } if text.trim().is_empty() { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 6c3b6ee472..374374f826 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -6,9 +6,9 @@ use std::sync::atomic::AtomicU64; use crate::AuthManager; use crate::client_common::REVIEW_PROMPT; -use crate::event_mapping::map_response_item_to_event_messages; use crate::function_tool::FunctionCallError; use crate::parse_command::parse_command; +use crate::parse_turn_item; use crate::review_format::format_review_findings_block; use crate::state::ItemCollector; use crate::terminal; @@ -945,17 +945,10 @@ impl Session { .await; // Derive user message events and persist only UserMessage to rollout - let msgs = - map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning()); - let user_msgs: Vec = msgs - .into_iter() - .filter_map(|m| match m { - EventMsg::UserMessage(ev) => Some(RolloutItem::EventMsg(EventMsg::UserMessage(ev))), - _ => None, - }) - .collect(); - if !user_msgs.is_empty() { - self.persist_rollout_items(&user_msgs).await; + let turn_item = parse_turn_item(&response_item); + if let Some(TurnItem::UserMessage(user_msg)) = turn_item { + self.persist_rollout_items(&[RolloutItem::EventMsg(user_msg.as_legacy_event())]) + .await; } } @@ -1152,23 +1145,8 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv { sess.record_conversation_items(std::slice::from_ref(&env_item)) .await; - for msg in map_response_item_to_event_messages( - &env_item, - sess.show_raw_agent_reasoning(), - ) { - let event = Event { - id: sub.id.clone(), - msg, - }; - sess.send_event(event).await; - } } - current_context - .item_collector - .started_completed(TurnItem::UserMessage(UserMessageItem::new(&items))) - .await; - sess.spawn_task(Arc::clone(¤t_context), sub.id, items, RegularTask) .await; previous_context = Some(current_context); @@ -2032,10 +2010,9 @@ async fn try_run_turn( } Ok(None) => { let response = handle_non_tool_response_item( - Arc::clone(&sess), Arc::clone(&turn_context), - sub_id, item.clone(), + sess.show_raw_agent_reasoning(), ) .await?; add_completed(ProcessedResponseItem { item, response }); @@ -2168,10 +2145,9 @@ async fn try_run_turn( } async fn handle_non_tool_response_item( - sess: Arc, turn_context: Arc, - sub_id: &str, item: ResponseItem, + show_raw_agent_reasoning: bool, ) -> CodexResult> { debug!(?item, "Output item"); @@ -2179,19 +2155,18 @@ async fn handle_non_tool_response_item( ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } | ResponseItem::WebSearchCall { .. } => { - let msgs = match &item { + let turn_item = match &item { ResponseItem::Message { .. } if turn_context.is_review_mode => { trace!("suppressing assistant Message in review mode"); - Vec::new() + None } - _ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning()), + _ => parse_turn_item(&item), }; - for msg in msgs { - let event = Event { - id: sub_id.to_string(), - msg, - }; - sess.send_event(event).await; + if let Some(turn_item) = turn_item { + turn_context + .item_collector + .started_completed(turn_item, show_raw_agent_reasoning) + .await; } } ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => { diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index a6bb8a8fb4..3a3c63a2cc 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -12,13 +12,13 @@ use crate::protocol::CompactedItem; use crate::protocol::ErrorEvent; use crate::protocol::Event; use crate::protocol::EventMsg; -use crate::protocol::InputMessageKind; use crate::protocol::TaskStartedEvent; use crate::protocol::TurnContextItem; use crate::state::TaskKind; use crate::truncate::truncate_middle; use crate::util::backoff; use askama::Template; +use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; @@ -199,23 +199,13 @@ pub fn content_items_to_text(content: &[ContentItem]) -> Option { pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec { items .iter() - .filter_map(|item| match item { - ResponseItem::Message { role, content, .. } if role == "user" => { - content_items_to_text(content) - } + .filter_map(|item| match crate::event_mapping::parse_turn_item(item) { + Some(TurnItem::UserMessage(user)) => Some(user.message()), _ => None, }) - .filter(|text| !is_session_prefix_message(text)) .collect() } -pub fn is_session_prefix_message(text: &str) -> bool { - matches!( - InputMessageKind::from(("user", text)), - InputMessageKind::UserInstructions | InputMessageKind::EnvironmentContext - ) -} - pub(crate) fn build_compacted_history( initial_context: Vec, user_messages: &[String], @@ -338,21 +328,16 @@ mod tests { ResponseItem::Message { id: Some("user".to_string()), role: "user".to_string(), - content: vec![ - ContentItem::InputText { - text: "first".to_string(), - }, - ContentItem::OutputText { - text: "second".to_string(), - }, - ], + content: vec![ContentItem::InputText { + text: "first".to_string(), + }], }, ResponseItem::Other, ]; let collected = collect_user_messages(&items); - assert_eq!(vec!["first\nsecond".to_string()], collected); + assert_eq!(vec!["first".to_string()], collected); } #[test] diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index aeb0780725..a30038f2eb 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -3,8 +3,6 @@ use crate::CodexAuth; use crate::codex::Codex; use crate::codex::CodexSpawnOk; use crate::codex::INITIAL_SUBMIT_ID; -use crate::codex::compact::content_items_to_text; -use crate::codex::compact::is_session_prefix_message; use crate::codex_conversation::CodexConversation; use crate::config::Config; use crate::error::CodexErr; @@ -14,6 +12,7 @@ use crate::protocol::EventMsg; use crate::protocol::SessionConfiguredEvent; use crate::rollout::RolloutRecorder; use codex_protocol::ConversationId; +use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::RolloutItem; @@ -182,9 +181,11 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia // Find indices of user message inputs in rollout order. let mut user_positions: Vec = Vec::new(); for (idx, item) in items.iter().enumerate() { - if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = item - && role == "user" - && content_items_to_text(content).is_some_and(|text| !is_session_prefix_message(&text)) + if let RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) = item + && matches!( + crate::event_mapping::parse_turn_item(item), + Some(TurnItem::UserMessage(_)) + ) { user_positions.push(idx); } diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index cbbf1f444f..f5fb7c8e85 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -1,135 +1,144 @@ -use crate::protocol::AgentMessageEvent; -use crate::protocol::AgentReasoningEvent; -use crate::protocol::AgentReasoningRawContentEvent; use crate::protocol::EventMsg; -use crate::protocol::InputMessageKind; -use crate::protocol::UserMessageEvent; -use crate::protocol::WebSearchEndEvent; +use codex_protocol::items::AgentMessageContent; +use codex_protocol::items::AgentMessageItem; +use codex_protocol::items::ReasoningItem; +use codex_protocol::items::TurnItem; +use codex_protocol::items::UserMessageItem; +use codex_protocol::items::WebSearchItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseItem; use codex_protocol::models::WebSearchAction; +use codex_protocol::user_input::UserInput; +use tracing::warn; -/// Convert a `ResponseItem` into zero or more `EventMsg` values that the UI can render. -/// -/// When `show_raw_agent_reasoning` is false, raw reasoning content events are omitted. -pub(crate) fn map_response_item_to_event_messages( - item: &ResponseItem, - show_raw_agent_reasoning: bool, -) -> Vec { - match item { - ResponseItem::Message { role, content, .. } => { - // Do not surface system messages as user events. - if role == "system" { - return Vec::new(); - } +fn is_session_prefix(text: &str) -> bool { + let trimmed = text.trim_start(); + let lowered = trimmed.to_ascii_lowercase(); + lowered.starts_with("") || lowered.starts_with("") +} + +fn parse_user_message(message: &[ContentItem]) -> Option { + let mut content: Vec = Vec::new(); - let mut events: Vec = Vec::new(); - let mut message_parts: Vec = Vec::new(); - let mut images: Vec = Vec::new(); - let mut kind: Option = None; - - for content_item in content.iter() { - match content_item { - ContentItem::InputText { text } => { - if kind.is_none() { - let trimmed = text.trim_start(); - kind = if trimmed.starts_with("") { - Some(InputMessageKind::EnvironmentContext) - } else if trimmed.starts_with("") { - Some(InputMessageKind::UserInstructions) - } else { - Some(InputMessageKind::Plain) - }; - } - message_parts.push(text.clone()); - } - ContentItem::InputImage { image_url } => { - images.push(image_url.clone()); - } - ContentItem::OutputText { text } => { - events.push(EventMsg::AgentMessage(AgentMessageEvent { - message: text.clone(), - })); - } + for content_item in message.iter() { + match content_item { + ContentItem::InputText { text } => { + if is_session_prefix(text) { + return None; } + content.push(UserInput::Text { text: text.clone() }); } - - if !message_parts.is_empty() || !images.is_empty() { - let message = if message_parts.is_empty() { - String::new() - } else { - message_parts.join("") - }; - let images = if images.is_empty() { - None - } else { - Some(images) - }; - - events.push(EventMsg::UserMessage(UserMessageEvent { - message, - kind, - images, - })); + ContentItem::InputImage { image_url } => { + content.push(UserInput::Image { + image_url: image_url.clone(), + }); + } + ContentItem::OutputText { text } => { + if is_session_prefix(text) { + return None; + } + warn!("Output text in user message: {}", text); } - - events } + } - ResponseItem::Reasoning { - summary, content, .. - } => { - let mut events = Vec::new(); - for ReasoningItemReasoningSummary::SummaryText { text } in summary { - events.push(EventMsg::AgentReasoning(AgentReasoningEvent { - text: text.clone(), - })); + Some(UserMessageItem::new(&content)) +} + +fn parse_agent_message(message: &[ContentItem]) -> AgentMessageItem { + let mut output: String = String::new(); + for content_item in message.iter() { + match content_item { + ContentItem::OutputText { text } => { + output = text.to_string(); } - if let Some(items) = content.as_ref().filter(|_| show_raw_agent_reasoning) { - for c in items { - let text = match c { - ReasoningItemContent::ReasoningText { text } - | ReasoningItemContent::Text { text } => text, - }; - events.push(EventMsg::AgentReasoningRawContent( - AgentReasoningRawContentEvent { text: text.clone() }, - )); - } + _ => { + warn!( + "Unexpected content item in agent message: {:?}", + content_item + ); } - events } + } + AgentMessageItem::new(&[AgentMessageContent::Text { text: output }]) +} - ResponseItem::WebSearchCall { id, action, .. } => match action { - WebSearchAction::Search { query } => { - let call_id = id.clone().unwrap_or_else(|| "".to_string()); - vec![EventMsg::WebSearchEnd(WebSearchEndEvent { - call_id, - query: query.clone(), - })] - } - WebSearchAction::Other => Vec::new(), +pub fn parse_turn_item(item: &ResponseItem) -> Option { + match item { + ResponseItem::Message { role, content, .. } => match role.as_str() { + "user" => parse_user_message(content).map(TurnItem::UserMessage), + "assistant" => Some(TurnItem::AgentMessage(parse_agent_message(content))), + "system" => None, + _ => None, }, + ResponseItem::Reasoning { + id, + summary, + content, + encrypted_content, + } => { + let summary_text = summary + .iter() + .map(|entry| match entry { + ReasoningItemReasoningSummary::SummaryText { text } => text.clone(), + }) + .collect(); + let raw_content = content + .clone() + .unwrap_or_default() + .into_iter() + .map(|entry| match entry { + ReasoningItemContent::ReasoningText { text } + | ReasoningItemContent::Text { text } => text, + }) + .collect(); + Some(TurnItem::Reasoning(ReasoningItem { + id: id.clone(), + summary_text, + raw_content, + encrypted_content: encrypted_content.clone(), + })) + } + ResponseItem::WebSearchCall { + id, + action: WebSearchAction::Search { query }, + .. + } => Some(TurnItem::WebSearch(WebSearchItem { + id: id.clone().unwrap_or_default(), + query: query.clone(), + })), + _ => None, + } +} - // Variants that require side effects are handled by higher layers and do not emit events here. - ResponseItem::FunctionCall { .. } - | ResponseItem::FunctionCallOutput { .. } - | ResponseItem::LocalShellCall { .. } - | ResponseItem::CustomToolCall { .. } - | ResponseItem::CustomToolCallOutput { .. } - | ResponseItem::Other => Vec::new(), +/// Convert a `ResponseItem` into zero or more `EventMsg` values that the UI can render. +/// +/// When `show_raw_agent_reasoning` is false, raw reasoning content events are omitted. +pub(crate) fn map_response_item_to_event_messages( + item: &ResponseItem, + show_raw_agent_reasoning: bool, +) -> Vec { + if let Some(turn_item) = parse_turn_item(item) { + return turn_item.legacy_events(show_raw_agent_reasoning); } + + // Variants that require side effects are handled by higher layers and do not emit events here. + Vec::new() } #[cfg(test)] mod tests { use super::map_response_item_to_event_messages; use crate::protocol::EventMsg; - use crate::protocol::InputMessageKind; - use assert_matches::assert_matches; + use crate::protocol::WebSearchEndEvent; + use codex_protocol::models::ContentItem; + use codex_protocol::models::ReasoningItemContent; + use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseItem; + use codex_protocol::models::WebSearchAction; use pretty_assertions::assert_eq; #[test] @@ -159,10 +168,89 @@ mod tests { match &events[0] { EventMsg::UserMessage(user) => { assert_eq!(user.message, "Hello world"); - assert_matches!(user.kind, Some(InputMessageKind::Plain)); assert_eq!(user.images, Some(vec![img1, img2])); } other => panic!("expected UserMessage, got {other:?}"), } } + + #[test] + fn maps_reasoning_summary_without_raw_content() { + let item = ResponseItem::Reasoning { + id: "reasoning_1".to_string(), + summary: vec![ + ReasoningItemReasoningSummary::SummaryText { + text: "Step 1".to_string(), + }, + ReasoningItemReasoningSummary::SummaryText { + text: "Step 2".to_string(), + }, + ], + content: Some(vec![ReasoningItemContent::ReasoningText { + text: "raw details".to_string(), + }]), + encrypted_content: None, + }; + + let events = map_response_item_to_event_messages(&item, false); + + assert_eq!(events.len(), 2, "expected only reasoning summaries"); + assert!( + events + .iter() + .all(|event| matches!(event, EventMsg::AgentReasoning(_))) + ); + } + + #[test] + fn maps_reasoning_including_raw_content_when_enabled() { + let item = ResponseItem::Reasoning { + id: "reasoning_2".to_string(), + summary: vec![ReasoningItemReasoningSummary::SummaryText { + text: "Summarized step".to_string(), + }], + content: Some(vec![ + ReasoningItemContent::ReasoningText { + text: "raw step".to_string(), + }, + ReasoningItemContent::Text { + text: "final thought".to_string(), + }, + ]), + encrypted_content: None, + }; + + let events = map_response_item_to_event_messages(&item, true); + + assert_eq!( + events.len(), + 3, + "expected summary and raw reasoning content events" + ); + assert!(matches!(events[0], EventMsg::AgentReasoning(_))); + assert!(matches!(events[1], EventMsg::AgentReasoningRawContent(_))); + assert!(matches!(events[2], EventMsg::AgentReasoningRawContent(_))); + } + + #[test] + fn maps_web_search_call() { + let item = ResponseItem::WebSearchCall { + id: Some("ws_1".to_string()), + status: Some("completed".to_string()), + action: WebSearchAction::Search { + query: "weather".to_string(), + }, + }; + + let events = map_response_item_to_event_messages(&item, false); + assert_eq!(events.len(), 1, "expected a single web search event"); + + match &events[0] { + EventMsg::WebSearchEnd(WebSearchEndEvent { call_id, query }) => { + assert_eq!(call_id, "ws_1"); + assert_eq!(query, "weather"); + } + other => panic!("expected WebSearchEnd, got {other:?}"), + } + } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index fa307b1ea7..5e5b4e44a9 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -98,11 +98,10 @@ pub use client_common::REVIEW_PROMPT; pub use client_common::ResponseEvent; pub use client_common::ResponseStream; pub use codex::compact::content_items_to_text; -pub use codex::compact::is_session_prefix_message; pub use codex_protocol::models::ContentItem; pub use codex_protocol::models::LocalShellAction; pub use codex_protocol::models::LocalShellExecAction; pub use codex_protocol::models::LocalShellStatus; -pub use codex_protocol::models::ReasoningItemContent; pub use codex_protocol::models::ResponseItem; +pub use event_mapping::parse_turn_item; pub mod otel_init; diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index b1f34e3a9c..617e5189c2 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -24,7 +24,6 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::InputMessageKind; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionMeta; @@ -543,7 +542,6 @@ async fn test_tail_includes_last_response_items() -> Result<()> { timestamp: ts.to_string(), item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), - kind: Some(InputMessageKind::Plain), images: None, })), }; @@ -627,7 +625,6 @@ async fn test_tail_handles_short_sessions() -> Result<()> { timestamp: ts.to_string(), item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: "hi".into(), - kind: Some(InputMessageKind::Plain), images: None, })), }; @@ -712,7 +709,6 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> { timestamp: ts.to_string(), item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { message: "hello".into(), - kind: Some(InputMessageKind::Plain), images: None, })), }; diff --git a/codex-rs/core/src/state/item_collector.rs b/codex-rs/core/src/state/item_collector.rs index bd1c4329d8..ce9de7038a 100644 --- a/codex-rs/core/src/state/item_collector.rs +++ b/codex-rs/core/src/state/item_collector.rs @@ -44,7 +44,7 @@ impl ItemCollector { } } - pub async fn completed(&self, item: TurnItem) { + pub async fn completed(&self, item: TurnItem, emit_raw_agent_reasoning: bool) { let err = self .tx_event .send(Event { @@ -52,17 +52,35 @@ impl ItemCollector { msg: EventMsg::ItemCompleted(ItemCompletedEvent { thread_id: self.thread_id, turn_id: self.turn_id.clone(), - item, + item: item.clone(), }), }) .await; if let Err(e) = err { error!("failed to send item completed event: {e}"); } + + self.trigger_legacy_events(item, emit_raw_agent_reasoning) + .await; } - pub async fn started_completed(&self, item: TurnItem) { + pub async fn started_completed(&self, item: TurnItem, emit_raw_agent_reasoning: bool) { self.started(item.clone()).await; - self.completed(item).await; + self.completed(item, emit_raw_agent_reasoning).await; + } + + async fn trigger_legacy_events(&self, item: TurnItem, emit_raw_agent_reasoning: bool) { + for event in item.legacy_events(emit_raw_agent_reasoning) { + if let Err(e) = self + .tx_event + .send(Event { + id: self.turn_id.clone(), + msg: event.clone(), + }) + .await + { + error!("failed to send legacy event: {e}"); + } + } } } diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs index 0d0d60d47f..f1a4527628 100644 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ b/codex-rs/core/tests/chat_completions_payload.rs @@ -8,12 +8,12 @@ use codex_core::LocalShellStatus; use codex_core::ModelClient; use codex_core::ModelProviderInfo; use codex_core::Prompt; -use codex_core::ReasoningItemContent; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::ConversationId; +use codex_protocol::models::ReasoningItemContent; use core_test_support::load_default_config_for_test; use futures::StreamExt; use serde_json::Value; diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index dffc9e4213..5a3768567a 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -13,6 +13,7 @@ use codex_core::WireApi; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::ConversationId; +use codex_protocol::models::ReasoningItemContent; use core_test_support::load_default_config_for_test; use futures::StreamExt; use tempfile::TempDir; @@ -142,8 +143,8 @@ fn assert_reasoning(item: &ResponseItem, expected: &str) { let mut combined = String::new(); for part in parts { match part { - codex_core::ReasoningItemContent::ReasoningText { text } - | codex_core::ReasoningItemContent::Text { text } => combined.push_str(text), + ReasoningItemContent::ReasoningText { text } + | ReasoningItemContent::Text { text } => combined.push_str(text), } } assert_eq!(combined, expected); diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index ed16cdb35d..3034c70974 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -9,7 +9,6 @@ use codex_core::ModelClient; use codex_core::ModelProviderInfo; use codex_core::NewConversation; use codex_core::Prompt; -use codex_core::ReasoningItemContent; use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::WireApi; @@ -21,6 +20,7 @@ use codex_core::protocol::Op; use codex_core::protocol::SessionSource; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::ConversationId; +use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::WebSearchAction; use codex_protocol::user_input::UserInput; diff --git a/codex-rs/core/tests/suite/fork_conversation.rs b/codex-rs/core/tests/suite/fork_conversation.rs index 28f5d0175d..da2ff8c3f6 100644 --- a/codex-rs/core/tests/suite/fork_conversation.rs +++ b/codex-rs/core/tests/suite/fork_conversation.rs @@ -1,17 +1,15 @@ use codex_core::CodexAuth; -use codex_core::ContentItem; use codex_core::ConversationManager; use codex_core::ModelProviderInfo; use codex_core::NewConversation; -use codex_core::ResponseItem; use codex_core::built_in_model_providers; -use codex_core::content_items_to_text; -use codex_core::is_session_prefix_message; +use codex_core::parse_turn_item; use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; +use codex_protocol::items::TurnItem; use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::skip_if_no_network; @@ -115,19 +113,12 @@ async fn fork_conversation_twice_drops_to_first_message() { let find_user_input_positions = |items: &[RolloutItem]| -> Vec { let mut pos = Vec::new(); for (i, it) in items.iter().enumerate() { - if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = it - && role == "user" - && content_items_to_text(content) - .is_some_and(|text| !is_session_prefix_message(&text)) + if let RolloutItem::ResponseItem(response_item) = it + && let Some(TurnItem::UserMessage(_)) = parse_turn_item(response_item) { // Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem. // We specifically look for input items, which are represented as ContentItem::InputText. - if content - .iter() - .any(|c| matches!(c, ContentItem::InputText { .. })) - { - pos.push(i); - } + pos.push(i); } } pos diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index a64f51187a..63c3c79cd4 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -48,8 +48,12 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { }) .await; - let TurnItem::UserMessage(started_item) = started.item; - let TurnItem::UserMessage(completed_item) = completed.item; + let TurnItem::UserMessage(started_item) = started.item else { + panic!("expected user message item"); + }; + let TurnItem::UserMessage(completed_item) = completed.item else { + panic!("expected user message item"); + }; assert_eq!(started_item.id, completed_item.id); assert_eq!( diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index e1efc9525b..cd71e94b21 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -1,3 +1,10 @@ +use crate::models::WebSearchAction; +use crate::protocol::AgentMessageEvent; +use crate::protocol::AgentReasoningEvent; +use crate::protocol::AgentReasoningRawContentEvent; +use crate::protocol::EventMsg; +use crate::protocol::UserMessageEvent; +use crate::protocol::WebSearchEndEvent; use crate::user_input::UserInput; use schemars::JsonSchema; use serde::Deserialize; @@ -7,6 +14,9 @@ use ts_rs::TS; #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub enum TurnItem { UserMessage(UserMessageItem), + AgentMessage(AgentMessageItem), + Reasoning(ReasoningItem), + WebSearch(WebSearchItem), } #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] @@ -15,6 +25,33 @@ pub struct UserMessageItem { pub content: Vec, } +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub enum AgentMessageContent { + Text { text: String }, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct AgentMessageItem { + pub id: String, + pub content: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct ReasoningItem { + pub id: String, + pub summary_text: Vec, + #[serde(default)] + pub raw_content: Vec, + #[serde(default)] + pub encrypted_content: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct WebSearchItem { + pub id: String, + pub query: String, +} + impl UserMessageItem { pub fn new(content: &[UserInput]) -> Self { Self { @@ -22,12 +59,106 @@ impl UserMessageItem { content: content.to_vec(), } } + + pub fn as_legacy_event(&self) -> EventMsg { + EventMsg::UserMessage(UserMessageEvent { + message: self.message(), + images: Some(self.image_urls()), + }) + } + + pub fn message(&self) -> String { + self.content + .iter() + .map(|c| match c { + UserInput::Text { text } => text.clone(), + _ => String::new(), + }) + .collect::>() + .join("") + } + + pub fn image_urls(&self) -> Vec { + self.content + .iter() + .filter_map(|c| match c { + UserInput::Image { image_url } => Some(image_url.clone()), + _ => None, + }) + .collect() + } +} + +impl AgentMessageItem { + pub fn new(content: &[AgentMessageContent]) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + content: content.to_vec(), + } + } + + pub fn as_legacy_event(&self) -> EventMsg { + let message = self + .content + .iter() + .map(|c| match c { + AgentMessageContent::Text { text } => text.clone(), + }) + .collect::>() + .join(""); + + EventMsg::AgentMessage(AgentMessageEvent { message }) + } +} + +impl ReasoningItem { + pub fn legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + let mut events = Vec::new(); + for summary in &self.summary_text { + events.push(EventMsg::AgentReasoning(AgentReasoningEvent { + text: summary.clone(), + })); + } + + if show_raw_agent_reasoning { + for entry in &self.raw_content { + events.push(EventMsg::AgentReasoningRawContent( + AgentReasoningRawContentEvent { + text: entry.clone(), + }, + )); + } + } + + events + } +} + +impl WebSearchItem { + pub fn as_legacy_event(&self) -> EventMsg { + EventMsg::WebSearchEnd(WebSearchEndEvent { + call_id: self.id.clone(), + query: self.query.clone(), + }) + } } impl TurnItem { pub fn id(&self) -> String { match self { TurnItem::UserMessage(item) => item.id.clone(), + TurnItem::AgentMessage(item) => item.id.clone(), + TurnItem::Reasoning(item) => item.id.clone(), + TurnItem::WebSearch(item) => item.id.clone(), + } + } + + pub fn legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + match self { + TurnItem::UserMessage(item) => vec![item.as_legacy_event()], + TurnItem::AgentMessage(item) => vec![item.as_legacy_event()], + TurnItem::WebSearch(item) => vec![item.as_legacy_event()], + TurnItem::Reasoning(item) => item.legacy_events(show_raw_agent_reasoning), } } } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 11747c949f..105f028049 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -770,69 +770,13 @@ pub struct AgentMessageEvent { pub message: String, } -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] -#[serde(rename_all = "snake_case")] -pub enum InputMessageKind { - /// Plain user text (default) - Plain, - /// XML-wrapped user instructions (...) - UserInstructions, - /// XML-wrapped environment context (...) - EnvironmentContext, -} - #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct UserMessageEvent { pub message: String, #[serde(skip_serializing_if = "Option::is_none")] - pub kind: Option, - #[serde(skip_serializing_if = "Option::is_none")] pub images: Option>, } -impl From<(T, U)> for InputMessageKind -where - T: AsRef, - U: AsRef, -{ - fn from(value: (T, U)) -> Self { - let (_role, message) = value; - let message = message.as_ref(); - let trimmed = message.trim(); - if starts_with_ignore_ascii_case(trimmed, ENVIRONMENT_CONTEXT_OPEN_TAG) - && ends_with_ignore_ascii_case(trimmed, ENVIRONMENT_CONTEXT_CLOSE_TAG) - { - InputMessageKind::EnvironmentContext - } else if starts_with_ignore_ascii_case(trimmed, USER_INSTRUCTIONS_OPEN_TAG) - && ends_with_ignore_ascii_case(trimmed, USER_INSTRUCTIONS_CLOSE_TAG) - { - InputMessageKind::UserInstructions - } else { - InputMessageKind::Plain - } - } -} - -fn starts_with_ignore_ascii_case(text: &str, prefix: &str) -> bool { - let text_bytes = text.as_bytes(); - let prefix_bytes = prefix.as_bytes(); - text_bytes.len() >= prefix_bytes.len() - && text_bytes - .iter() - .zip(prefix_bytes.iter()) - .all(|(a, b)| a.eq_ignore_ascii_case(b)) -} - -fn ends_with_ignore_ascii_case(text: &str, suffix: &str) -> bool { - let text_bytes = text.as_bytes(); - let suffix_bytes = suffix.as_bytes(); - text_bytes.len() >= suffix_bytes.len() - && text_bytes[text_bytes.len() - suffix_bytes.len()..] - .iter() - .zip(suffix_bytes.iter()) - .all(|(a, b)| a.eq_ignore_ascii_case(b)) -} - #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct AgentMessageDeltaEvent { pub delta: String, diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 18bda8837c..b881c901f5 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -23,7 +23,6 @@ use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExitedReviewModeEvent; -use codex_core::protocol::InputMessageKind; use codex_core::protocol::ListCustomPromptsResponseEvent; use codex_core::protocol::McpListToolsResponseEvent; use codex_core::protocol::McpToolCallBeginEvent; @@ -1563,17 +1562,9 @@ impl ChatWidget { } fn on_user_message_event(&mut self, event: UserMessageEvent) { - match event.kind { - Some(InputMessageKind::EnvironmentContext) - | Some(InputMessageKind::UserInstructions) => { - // Skip XML‑wrapped context blocks in the transcript. - } - Some(InputMessageKind::Plain) | None => { - let message = event.message.trim(); - if !message.is_empty() { - self.add_to_history(history_cell::new_user_prompt(message.to_string())); - } - } + let message = event.message.trim(); + if !message.is_empty() { + self.add_to_history(history_cell::new_user_prompt(message.to_string())); } } diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index dd6e59af42..7a3459ac10 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -23,7 +23,6 @@ use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExitedReviewModeEvent; use codex_core::protocol::FileChange; -use codex_core::protocol::InputMessageKind; use codex_core::protocol::Op; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; @@ -104,7 +103,6 @@ fn resumed_initial_messages_render_history() { initial_messages: Some(vec![ EventMsg::UserMessage(UserMessageEvent { message: "hello from user".to_string(), - kind: Some(InputMessageKind::Plain), images: None, }), EventMsg::AgentMessage(AgentMessageEvent { diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index eb2a9cd4ea..0d3ae07eb2 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -10,6 +10,7 @@ use codex_core::ConversationsPage; use codex_core::Cursor; use codex_core::INTERACTIVE_SESSION_SOURCES; use codex_core::RolloutRecorder; +use codex_protocol::items::TurnItem; use color_eyre::eyre::Result; use crossterm::event::KeyCode; use crossterm::event::KeyEvent; @@ -30,10 +31,7 @@ use crate::text_formatting::truncate_text; use crate::tui::FrameRequester; use crate::tui::Tui; use crate::tui::TuiEvent; -use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::InputMessageKind; -use codex_protocol::protocol::USER_MESSAGE_BEGIN; const PAGE_SIZE: usize = 25; const LOAD_NEAR_THRESHOLD: usize = 5; @@ -616,37 +614,8 @@ fn extract_timestamp(value: &serde_json::Value) -> Option> { fn preview_from_head(head: &[serde_json::Value]) -> Option { head.iter() .filter_map(|value| serde_json::from_value::(value.clone()).ok()) - .find_map(|item| match item { - ResponseItem::Message { content, .. } => { - // Find the actual user message (as opposed to user instructions or ide context) - let preview = content - .into_iter() - .filter_map(|content| match content { - ContentItem::InputText { text } - if matches!( - InputMessageKind::from(("user", text.as_str())), - InputMessageKind::Plain - ) => - { - // Strip ide context. - let text = match text.find(USER_MESSAGE_BEGIN) { - Some(idx) => { - text[idx + USER_MESSAGE_BEGIN.len()..].trim().to_string() - } - None => text, - }; - Some(text) - } - _ => None, - }) - .collect::(); - - if preview.is_empty() { - None - } else { - Some(preview) - } - } + .find_map(|item| match codex_core::parse_turn_item(&item) { + Some(TurnItem::UserMessage(user)) => Some(user.message()), _ => None, }) } From b42ef6d75fe4593536a7a057816e688653fe2fe9 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 21 Oct 2025 10:34:24 -0700 Subject: [PATCH 2/9] core: emit ItemStarted and ItemCompleted events directly from Session Drop the ItemCollector abstraction and move item lifecycle event emission into Session. Emit ItemStarted and ItemCompleted events (including legacy events) for each user and agent item directly via new Session methods. Update affected code paths and tests. Remove obsolete item_collector module. --- codex-rs/core/src/codex.rs | 105 +++++++++--- codex-rs/core/src/state/item_collector.rs | 86 ---------- codex-rs/core/src/state/mod.rs | 2 - codex-rs/core/tests/common/responses.rs | 50 ++++++ codex-rs/core/tests/suite/items.rs | 192 ++++++++++++++++++++-- 5 files changed, 308 insertions(+), 127 deletions(-) delete mode 100644 codex-rs/core/src/state/item_collector.rs diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8890172819..b4e1218165 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -12,7 +12,6 @@ use crate::mcp::auth::McpAuthStatusEntry; use crate::parse_command::parse_command; use crate::parse_turn_item; use crate::review_format::format_review_findings_block; -use crate::state::ItemCollector; use crate::terminal; use crate::user_notification::UserNotifier; use async_channel::Receiver; @@ -20,9 +19,10 @@ use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; use codex_protocol::ConversationId; use codex_protocol::items::TurnItem; -use codex_protocol::items::UserMessageItem; use codex_protocol::protocol::ConversationPathResponseEvent; use codex_protocol::protocol::ExitedReviewModeEvent; +use codex_protocol::protocol::ItemCompletedEvent; +use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; @@ -270,7 +270,6 @@ pub(crate) struct TurnContext { pub(crate) is_review_mode: bool, pub(crate) final_output_json_schema: Option, pub(crate) codex_linux_sandbox_exe: Option, - pub(crate) item_collector: ItemCollector, } impl TurnContext { @@ -359,7 +358,6 @@ impl Session { provider: ModelProviderInfo, session_configuration: &SessionConfiguration, conversation_id: ConversationId, - tx_event: Sender, sub_id: String, ) -> TurnContext { let config = session_configuration.original_config_do_not_use.clone(); @@ -394,8 +392,6 @@ impl Session { features: &config.features, }); - let item_collector = ItemCollector::new(tx_event, conversation_id, sub_id.clone()); - TurnContext { sub_id, client, @@ -409,7 +405,6 @@ impl Session { is_review_mode: false, final_output_json_schema: None, codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), - item_collector, } } @@ -665,7 +660,6 @@ impl Session { session_configuration.provider.clone(), &session_configuration, self.conversation_id, - self.get_tx_event(), sub_id, ); if let Some(final_schema) = updates.final_output_json_schema { @@ -710,6 +704,59 @@ impl Session { } } + async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) { + self.send_event( + &turn_context, + EventMsg::ItemStarted(ItemStartedEvent { + thread_id: self.conversation_id, + turn_id: turn_context.sub_id.clone(), + item: item.clone(), + }), + ) + .await; + } + + async fn emit_turn_item_completed( + &self, + turn_context: &TurnContext, + item: TurnItem, + emit_raw_agent_reasoning: bool, + ) { + self.send_event( + turn_context, + EventMsg::ItemCompleted(ItemCompletedEvent { + thread_id: self.conversation_id, + turn_id: turn_context.sub_id.clone(), + item: item.clone(), + }), + ) + .await; + self.emit_turn_item_legacy_events(turn_context, &item, emit_raw_agent_reasoning) + .await; + } + + async fn emit_turn_item_started_completed( + &self, + turn_context: &TurnContext, + item: TurnItem, + emit_raw_agent_reasoning: bool, + ) { + self.emit_turn_item_started(turn_context, &item).await; + self.emit_turn_item_completed(turn_context, item, emit_raw_agent_reasoning) + .await; + } + + async fn emit_turn_item_legacy_events( + &self, + turn_context: &TurnContext, + item: &TurnItem, + emit_raw_agent_reasoning: bool, + ) { + for event in item.legacy_events(emit_raw_agent_reasoning) { + self.send_event(turn_context, event).await; + } + } + /// Emit an exec approval request event and await the user's decision. /// /// The request is keyed by `sub_id`/`call_id` so matching responses are delivered @@ -946,7 +993,11 @@ impl Session { /// Record a user input item to conversation history and also persist a /// corresponding UserMessage EventMsg to rollout. - async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) { + async fn record_input_and_rollout_usermsg( + &self, + turn_context: &TurnContext, + response_input: &ResponseInputItem, + ) { let response_item: ResponseItem = response_input.clone().into(); // Add to conversation history and persist response item to rollout self.record_conversation_items(std::slice::from_ref(&response_item)) @@ -954,9 +1005,17 @@ impl Session { // Derive user message events and persist only UserMessage to rollout let turn_item = parse_turn_item(&response_item); - if let Some(TurnItem::UserMessage(user_msg)) = turn_item { - self.persist_rollout_items(&[RolloutItem::EventMsg(user_msg.as_legacy_event())]) - .await; + if let Some(turn_item) = turn_item { + self.emit_turn_item_started_completed( + turn_context, + turn_item.clone(), + self.show_raw_agent_reasoning(), + ) + .await; + if let TurnItem::UserMessage(user_msg) = turn_item { + self.persist_rollout_items(&[RolloutItem::EventMsg(user_msg.as_legacy_event())]) + .await; + } } } @@ -1151,7 +1210,6 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv { sess.record_conversation_items(std::slice::from_ref(&env_item)) .await; - sess.send_event(event).await; } sess.spawn_task(Arc::clone(¤t_context), items, RegularTask) @@ -1427,11 +1485,6 @@ async fn spawn_review_thread( is_review_mode: true, final_output_json_schema: None, codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(), - item_collector: ItemCollector::new( - sess.get_tx_event(), - sess.conversation_id, - sub_id.to_string(), - ), }; // Seed the child task with the review prompt as the initial user message. @@ -1489,7 +1542,7 @@ pub(crate) async fn run_task( review_thread_history.extend(sess.build_initial_context(turn_context.as_ref())); review_thread_history.push(initial_input_for_turn.into()); } else { - sess.record_input_and_rollout_usermsg(&initial_input_for_turn) + sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn) .await; } @@ -2006,6 +2059,7 @@ async fn try_run_turn( } Ok(None) => { let response = handle_non_tool_response_item( + sess.as_ref(), Arc::clone(&turn_context), item.clone(), sess.show_raw_agent_reasoning(), @@ -2127,6 +2181,7 @@ async fn try_run_turn( } async fn handle_non_tool_response_item( + sess: &Session, turn_context: Arc, item: ResponseItem, show_raw_agent_reasoning: bool, @@ -2145,10 +2200,12 @@ async fn handle_non_tool_response_item( _ => parse_turn_item(&item), }; if let Some(turn_item) = turn_item { - turn_context - .item_collector - .started_completed(turn_item, show_raw_agent_reasoning) - .await; + sess.emit_turn_item_started_completed( + turn_context.as_ref(), + turn_item, + show_raw_agent_reasoning, + ) + .await; } } ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => { @@ -2635,7 +2692,6 @@ mod tests { session_configuration.provider.clone(), &session_configuration, conversation_id, - tx_event.clone(), "turn_id".to_string(), ); @@ -2704,7 +2760,6 @@ mod tests { session_configuration.provider.clone(), &session_configuration, conversation_id, - tx_event.clone(), "turn_id".to_string(), )); diff --git a/codex-rs/core/src/state/item_collector.rs b/codex-rs/core/src/state/item_collector.rs deleted file mode 100644 index ce9de7038a..0000000000 --- a/codex-rs/core/src/state/item_collector.rs +++ /dev/null @@ -1,86 +0,0 @@ -use async_channel::Sender; -use codex_protocol::ConversationId; -use codex_protocol::items::TurnItem; -use codex_protocol::protocol::Event; -use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::ItemCompletedEvent; -use codex_protocol::protocol::ItemStartedEvent; -use tracing::error; - -#[derive(Debug)] -pub(crate) struct ItemCollector { - thread_id: ConversationId, - turn_id: String, - tx_event: Sender, -} - -impl ItemCollector { - pub fn new( - tx_event: Sender, - thread_id: ConversationId, - turn_id: String, - ) -> ItemCollector { - ItemCollector { - tx_event, - thread_id, - turn_id, - } - } - - pub async fn started(&self, item: TurnItem) { - let err = self - .tx_event - .send(Event { - id: self.turn_id.clone(), - msg: EventMsg::ItemStarted(ItemStartedEvent { - thread_id: self.thread_id, - turn_id: self.turn_id.clone(), - item, - }), - }) - .await; - if let Err(e) = err { - error!("failed to send item started event: {e}"); - } - } - - pub async fn completed(&self, item: TurnItem, emit_raw_agent_reasoning: bool) { - let err = self - .tx_event - .send(Event { - id: self.turn_id.clone(), - msg: EventMsg::ItemCompleted(ItemCompletedEvent { - thread_id: self.thread_id, - turn_id: self.turn_id.clone(), - item: item.clone(), - }), - }) - .await; - if let Err(e) = err { - error!("failed to send item completed event: {e}"); - } - - self.trigger_legacy_events(item, emit_raw_agent_reasoning) - .await; - } - - pub async fn started_completed(&self, item: TurnItem, emit_raw_agent_reasoning: bool) { - self.started(item.clone()).await; - self.completed(item, emit_raw_agent_reasoning).await; - } - - async fn trigger_legacy_events(&self, item: TurnItem, emit_raw_agent_reasoning: bool) { - for event in item.legacy_events(emit_raw_agent_reasoning) { - if let Err(e) = self - .tx_event - .send(Event { - id: self.turn_id.clone(), - msg: event.clone(), - }) - .await - { - error!("failed to send legacy event: {e}"); - } - } - } -} diff --git a/codex-rs/core/src/state/mod.rs b/codex-rs/core/src/state/mod.rs index 7ba5f37083..642433a786 100644 --- a/codex-rs/core/src/state/mod.rs +++ b/codex-rs/core/src/state/mod.rs @@ -1,9 +1,7 @@ -mod item_collector; mod service; mod session; mod turn; -pub(crate) use item_collector::ItemCollector; pub(crate) use service::SessionServices; pub(crate) use session::SessionState; pub(crate) use turn::ActiveTurn; diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index a8a777ae4f..a4703d9e60 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -167,6 +167,56 @@ pub fn ev_assistant_message(id: &str, text: &str) -> Value { }) } +pub fn ev_reasoning_item(id: &str, summary: &[&str], raw_content: &[&str]) -> Value { + let summary_entries: Vec = summary + .iter() + .map(|text| serde_json::json!({"type": "summary_text", "text": text})) + .collect(); + + let mut event = serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "reasoning", + "id": id, + "summary": summary_entries, + } + }); + + if !raw_content.is_empty() { + let content_entries: Vec = raw_content + .iter() + .map(|text| serde_json::json!({"type": "reasoning_text", "text": text})) + .collect(); + event["item"]["content"] = Value::Array(content_entries); + } + + event +} + +pub fn ev_web_search_call_added(id: &str, status: &str, query: &str) -> Value { + serde_json::json!({ + "type": "response.output_item.added", + "item": { + "type": "web_search_call", + "id": id, + "status": status, + "action": {"type": "search", "query": query} + } + }) +} + +pub fn ev_web_search_call_done(id: &str, status: &str, query: &str) -> Value { + serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "web_search_call", + "id": id, + "status": status, + "action": {"type": "search", "query": query} + } + }) +} + pub fn ev_function_call(call_id: &str, name: &str, arguments: &str) -> Value { serde_json::json!({ "type": "response.output_item.done", diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index 63c3c79cd4..13c95003e0 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -2,12 +2,18 @@ use anyhow::Ok; use codex_core::protocol::EventMsg; +use codex_core::protocol::ItemCompletedEvent; +use codex_core::protocol::ItemStartedEvent; use codex_core::protocol::Op; use codex_protocol::items::TurnItem; use codex_protocol::user_input::UserInput; -use core_test_support::responses; +use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_response_created; +use core_test_support::responses::ev_web_search_call_added; +use core_test_support::responses::ev_web_search_call_done; +use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; @@ -26,7 +32,7 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { let TestCodex { codex, .. } = test_codex().build(&server).await?; let first_response = sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]); - responses::mount_sse_once_match(&server, any(), first_response).await; + mount_sse_once_match(&server, any(), first_response).await; codex .submit(Op::UserInput { @@ -36,25 +42,23 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { }) .await?; - let started = wait_for_event_match(&codex, |ev| match ev { - EventMsg::ItemStarted(e) => Some(e.clone()), + let started_item = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::UserMessage(item), + .. + }) => Some(item.clone()), _ => None, }) .await; - - let completed = wait_for_event_match(&codex, |ev| match ev { - EventMsg::ItemCompleted(e) => Some(e.clone()), + let completed_item = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::UserMessage(item), + .. + }) => Some(item.clone()), _ => None, }) .await; - let TurnItem::UserMessage(started_item) = started.item else { - panic!("expected user message item"); - }; - let TurnItem::UserMessage(completed_item) = completed.item else { - panic!("expected user message item"); - }; - assert_eq!(started_item.id, completed_item.id); assert_eq!( started_item.content, @@ -70,3 +74,163 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { ); Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn assistant_message_item_is_emitted() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let first_response = sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "all done"), + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), first_response).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "please summarize results".into(), + }], + }) + .await?; + + let started = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!(started.id, completed.id); + let Some(codex_protocol::items::AgentMessageContent::Text { text }) = completed.content.first() + else { + panic!("expected agent message text content"); + }; + assert_eq!(text, "all done"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn reasoning_item_is_emitted() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let reasoning_item = ev_reasoning_item( + "reasoning-1", + &["Consider inputs", "Compute output"], + &["Detailed reasoning trace"], + ); + + let first_response = sse(vec![ + ev_response_created("resp-1"), + reasoning_item, + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), first_response).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "explain your reasoning".into(), + }], + }) + .await?; + + let started = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::Reasoning(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Reasoning(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!(started.id, completed.id); + assert_eq!( + completed.summary_text, + vec!["Consider inputs".to_string(), "Compute output".to_string()] + ); + assert_eq!( + completed.raw_content, + vec!["Detailed reasoning trace".to_string()] + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn web_search_item_is_emitted() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let web_search_added = + ev_web_search_call_added("web-search-1", "in_progress", "weather seattle"); + let web_search_done = ev_web_search_call_done("web-search-1", "completed", "weather seattle"); + + let first_response = sse(vec![ + ev_response_created("resp-1"), + web_search_added, + web_search_done, + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), first_response).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "find the weather".into(), + }], + }) + .await?; + + let started = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::WebSearch(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::WebSearch(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!(started.id, completed.id); + assert_eq!(completed.query, "weather seattle"); + + Ok(()) +} From a8fa24a9529d81c4131b165559f193bad2231d8b Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 21 Oct 2025 12:07:17 -0700 Subject: [PATCH 3/9] core: simplify user input recording and rollout logic; fix resume to include reasoning events in initial messages - Remove unnecessary turn_context param from record_input_and_rollout_usermsg and related invocations. - Only persist UserMessage events instead of additionally emitting started/completed events during input recording. - Update resume logic and add test to ensure reasoning events (AgentReasoning/AgentReasoningRawContent) are included in initial_messages after resume. - Minor cleanup of unused imports. --- codex-rs/core/src/codex.rs | 24 +++--------- codex-rs/core/tests/suite/resume.rs | 57 +++++++++++++++++++++++++++++ codex-rs/protocol/src/items.rs | 1 - 3 files changed, 63 insertions(+), 19 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index b4e1218165..887d2013d3 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -706,7 +706,7 @@ impl Session { async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) { self.send_event( - &turn_context, + turn_context, EventMsg::ItemStarted(ItemStartedEvent { thread_id: self.conversation_id, turn_id: turn_context.sub_id.clone(), @@ -993,11 +993,7 @@ impl Session { /// Record a user input item to conversation history and also persist a /// corresponding UserMessage EventMsg to rollout. - async fn record_input_and_rollout_usermsg( - &self, - turn_context: &TurnContext, - response_input: &ResponseInputItem, - ) { + async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) { let response_item: ResponseItem = response_input.clone().into(); // Add to conversation history and persist response item to rollout self.record_conversation_items(std::slice::from_ref(&response_item)) @@ -1005,17 +1001,9 @@ impl Session { // Derive user message events and persist only UserMessage to rollout let turn_item = parse_turn_item(&response_item); - if let Some(turn_item) = turn_item { - self.emit_turn_item_started_completed( - turn_context, - turn_item.clone(), - self.show_raw_agent_reasoning(), - ) - .await; - if let TurnItem::UserMessage(user_msg) = turn_item { - self.persist_rollout_items(&[RolloutItem::EventMsg(user_msg.as_legacy_event())]) - .await; - } + if let Some(TurnItem::UserMessage(user_msg)) = turn_item { + self.persist_rollout_items(&[RolloutItem::EventMsg(user_msg.as_legacy_event())]) + .await; } } @@ -1542,7 +1530,7 @@ pub(crate) async fn run_task( review_thread_history.extend(sess.build_initial_context(turn_context.as_ref())); review_thread_history.push(initial_input_for_turn.into()); } else { - sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn) + sess.record_input_and_rollout_usermsg(&initial_input_for_turn) .await; } diff --git a/codex-rs/core/tests/suite/resume.rs b/codex-rs/core/tests/suite/resume.rs index 9b1707a05e..3a902b6b6f 100644 --- a/codex-rs/core/tests/suite/resume.rs +++ b/codex-rs/core/tests/suite/resume.rs @@ -4,6 +4,7 @@ use codex_core::protocol::Op; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_response_created; use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; @@ -62,3 +63,59 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let mut builder = test_codex().with_config(|config| { + config.show_raw_agent_reasoning = true; + }); + let initial = builder.build(&server).await?; + let codex = Arc::clone(&initial.codex); + let home = initial.home.clone(); + let rollout_path = initial.session_configured.rollout_path.clone(); + + let initial_sse = sse(vec![ + ev_response_created("resp-initial"), + ev_reasoning_item("reason-1", &["Summarized step"], &["raw detail"]), + ev_assistant_message("msg-1", "Completed reasoning turn"), + ev_completed("resp-initial"), + ]); + mount_sse_once_match(&server, any(), initial_sse).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "Record reasoning messages".into(), + }], + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + + let resumed = builder.resume(&server, home, rollout_path).await?; + let initial_messages = resumed + .session_configured + .initial_messages + .expect("expected initial messages to be present for resumed session"); + match initial_messages.as_slice() { + [ + EventMsg::UserMessage(first_user), + EventMsg::TokenCount(_), + EventMsg::AgentReasoning(reasoning), + EventMsg::AgentReasoningRawContent(raw), + EventMsg::AgentMessage(assistant_message), + EventMsg::TokenCount(_), + ] => { + assert_eq!(first_user.message, "Record reasoning messages"); + assert_eq!(reasoning.text, "Summarized step"); + assert_eq!(raw.text, "raw detail"); + assert_eq!(assistant_message.message, "Completed reasoning turn"); + } + other => panic!("unexpected initial messages after resume: {other:#?}"), + } + + Ok(()) +} diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index cd71e94b21..dc521fce68 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -1,4 +1,3 @@ -use crate::models::WebSearchAction; use crate::protocol::AgentMessageEvent; use crate::protocol::AgentReasoningEvent; use crate::protocol::AgentReasoningRawContentEvent; From e08f5b2017d8cff9691ff4ddec214d364b0c20e1 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 21 Oct 2025 12:27:31 -0700 Subject: [PATCH 4/9] core, protocol: rename legacy_events to as_legacy_events on TurnItem, ReasoningItem, AgentMessageItem Refactor event conversion methods from legacy_events to as_legacy_events for clarity and consistency. Update all call sites and tests accordingly. Adapt AgentMessageItem to return Vec from as_legacy_events. Update event_mapping tests to directly test parse_turn_item and TurnItem variants. --- codex-rs/core/src/codex.rs | 2 +- codex-rs/core/src/event_mapping.rs | 89 +++++++++++++++++------------- codex-rs/protocol/src/items.rs | 22 ++++---- 3 files changed, 61 insertions(+), 52 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 887d2013d3..55b1b4f2e1 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -752,7 +752,7 @@ impl Session { item: &TurnItem, emit_raw_agent_reasoning: bool, ) { - for event in item.legacy_events(emit_raw_agent_reasoning) { + for event in item.as_legacy_events(emit_raw_agent_reasoning) { self.send_event(turn_context, event).await; } } diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index f5fb7c8e85..d58e89ab82 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -121,7 +121,7 @@ pub(crate) fn map_response_item_to_event_messages( show_raw_agent_reasoning: bool, ) -> Vec { if let Some(turn_item) = parse_turn_item(item) { - return turn_item.legacy_events(show_raw_agent_reasoning); + return turn_item.as_legacy_events(show_raw_agent_reasoning); } // Variants that require side effects are handled by higher layers and do not emit events here. @@ -130,19 +130,18 @@ pub(crate) fn map_response_item_to_event_messages( #[cfg(test)] mod tests { - use super::map_response_item_to_event_messages; - use crate::protocol::EventMsg; - use crate::protocol::WebSearchEndEvent; - + use super::parse_turn_item; + use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseItem; use codex_protocol::models::WebSearchAction; + use codex_protocol::user_input::UserInput; use pretty_assertions::assert_eq; #[test] - fn maps_user_message_with_text_and_two_images() { + fn parses_user_message_with_text_and_two_images() { let img1 = "https://example.com/one.png".to_string(); let img2 = "https://example.com/two.jpg".to_string(); @@ -162,20 +161,25 @@ mod tests { ], }; - let events = map_response_item_to_event_messages(&item, false); - assert_eq!(events.len(), 1, "expected a single user message event"); + let turn_item = parse_turn_item(&item).expect("expected user message turn item"); - match &events[0] { - EventMsg::UserMessage(user) => { - assert_eq!(user.message, "Hello world"); - assert_eq!(user.images, Some(vec![img1, img2])); + match turn_item { + TurnItem::UserMessage(user) => { + let expected_content = vec![ + UserInput::Text { + text: "Hello world".to_string(), + }, + UserInput::Image { image_url: img1 }, + UserInput::Image { image_url: img2 }, + ]; + assert_eq!(user.content, expected_content); } - other => panic!("expected UserMessage, got {other:?}"), + other => panic!("expected TurnItem::UserMessage, got {other:?}"), } } #[test] - fn maps_reasoning_summary_without_raw_content() { + fn parses_reasoning_summary_and_raw_content() { let item = ResponseItem::Reasoning { id: "reasoning_1".to_string(), summary: vec![ @@ -192,18 +196,23 @@ mod tests { encrypted_content: None, }; - let events = map_response_item_to_event_messages(&item, false); + let turn_item = parse_turn_item(&item).expect("expected reasoning turn item"); - assert_eq!(events.len(), 2, "expected only reasoning summaries"); - assert!( - events - .iter() - .all(|event| matches!(event, EventMsg::AgentReasoning(_))) - ); + match turn_item { + TurnItem::Reasoning(reasoning) => { + assert_eq!( + reasoning.summary_text, + vec!["Step 1".to_string(), "Step 2".to_string()] + ); + assert_eq!(reasoning.raw_content, vec!["raw details".to_string()]); + assert_eq!(reasoning.encrypted_content, None); + } + other => panic!("expected TurnItem::Reasoning, got {other:?}"), + } } #[test] - fn maps_reasoning_including_raw_content_when_enabled() { + fn parses_reasoning_including_raw_content() { let item = ResponseItem::Reasoning { id: "reasoning_2".to_string(), summary: vec![ReasoningItemReasoningSummary::SummaryText { @@ -220,20 +229,23 @@ mod tests { encrypted_content: None, }; - let events = map_response_item_to_event_messages(&item, true); + let turn_item = parse_turn_item(&item).expect("expected reasoning turn item"); - assert_eq!( - events.len(), - 3, - "expected summary and raw reasoning content events" - ); - assert!(matches!(events[0], EventMsg::AgentReasoning(_))); - assert!(matches!(events[1], EventMsg::AgentReasoningRawContent(_))); - assert!(matches!(events[2], EventMsg::AgentReasoningRawContent(_))); + match turn_item { + TurnItem::Reasoning(reasoning) => { + assert_eq!(reasoning.summary_text, vec!["Summarized step".to_string()]); + assert_eq!( + reasoning.raw_content, + vec!["raw step".to_string(), "final thought".to_string()] + ); + assert_eq!(reasoning.encrypted_content, None); + } + other => panic!("expected TurnItem::Reasoning, got {other:?}"), + } } #[test] - fn maps_web_search_call() { + fn parses_web_search_call() { let item = ResponseItem::WebSearchCall { id: Some("ws_1".to_string()), status: Some("completed".to_string()), @@ -242,15 +254,14 @@ mod tests { }, }; - let events = map_response_item_to_event_messages(&item, false); - assert_eq!(events.len(), 1, "expected a single web search event"); + let turn_item = parse_turn_item(&item).expect("expected web search turn item"); - match &events[0] { - EventMsg::WebSearchEnd(WebSearchEndEvent { call_id, query }) => { - assert_eq!(call_id, "ws_1"); - assert_eq!(query, "weather"); + match turn_item { + TurnItem::WebSearch(search) => { + assert_eq!(search.id, "ws_1"); + assert_eq!(search.query, "weather"); } - other => panic!("expected WebSearchEnd, got {other:?}"), + other => panic!("expected TurnItem::WebSearch, got {other:?}"), } } } diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index dc521fce68..b1ba002652 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -96,22 +96,20 @@ impl AgentMessageItem { } } - pub fn as_legacy_event(&self) -> EventMsg { - let message = self - .content + pub fn as_legacy_events(&self) -> Vec { + self.content .iter() .map(|c| match c { - AgentMessageContent::Text { text } => text.clone(), + AgentMessageContent::Text { text } => EventMsg::AgentMessage(AgentMessageEvent { + message: text.clone(), + }), }) - .collect::>() - .join(""); - - EventMsg::AgentMessage(AgentMessageEvent { message }) + .collect() } } impl ReasoningItem { - pub fn legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + pub fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { let mut events = Vec::new(); for summary in &self.summary_text { events.push(EventMsg::AgentReasoning(AgentReasoningEvent { @@ -152,12 +150,12 @@ impl TurnItem { } } - pub fn legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + pub fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { match self { TurnItem::UserMessage(item) => vec![item.as_legacy_event()], - TurnItem::AgentMessage(item) => vec![item.as_legacy_event()], + TurnItem::AgentMessage(item) => item.as_legacy_events(), TurnItem::WebSearch(item) => vec![item.as_legacy_event()], - TurnItem::Reasoning(item) => item.legacy_events(show_raw_agent_reasoning), + TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning), } } } From f093a5e49c9f1fbe645cbabfb5e8967200ca3775 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 21 Oct 2025 12:32:43 -0700 Subject: [PATCH 5/9] core/event_mapping.rs: remove unused map_response_item_to_event_messages function --- codex-rs/core/src/event_mapping.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index d58e89ab82..4f6ffcaac0 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -113,21 +113,6 @@ pub fn parse_turn_item(item: &ResponseItem) -> Option { } } -/// Convert a `ResponseItem` into zero or more `EventMsg` values that the UI can render. -/// -/// When `show_raw_agent_reasoning` is false, raw reasoning content events are omitted. -pub(crate) fn map_response_item_to_event_messages( - item: &ResponseItem, - show_raw_agent_reasoning: bool, -) -> Vec { - if let Some(turn_item) = parse_turn_item(item) { - return turn_item.as_legacy_events(show_raw_agent_reasoning); - } - - // Variants that require side effects are handled by higher layers and do not emit events here. - Vec::new() -} - #[cfg(test)] mod tests { use super::parse_turn_item; From 2293a1a81c2ddf55fee0c7b293025b42c1d03961 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 21 Oct 2025 12:59:09 -0700 Subject: [PATCH 6/9] session: require turn_context when recording input and persisting user message event event_mapping: fix agent message parsing to support multiple content items; add test for agent message parsing --- codex-rs/core/src/codex.rs | 13 +++++++++---- codex-rs/core/src/event_mapping.rs | 31 ++++++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 55b1b4f2e1..f0b2f61b68 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -993,7 +993,11 @@ impl Session { /// Record a user input item to conversation history and also persist a /// corresponding UserMessage EventMsg to rollout. - async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) { + async fn record_input_and_rollout_usermsg( + &self, + turn_context: &TurnContext, + response_input: &ResponseInputItem, + ) { let response_item: ResponseItem = response_input.clone().into(); // Add to conversation history and persist response item to rollout self.record_conversation_items(std::slice::from_ref(&response_item)) @@ -1001,8 +1005,9 @@ impl Session { // Derive user message events and persist only UserMessage to rollout let turn_item = parse_turn_item(&response_item); - if let Some(TurnItem::UserMessage(user_msg)) = turn_item { - self.persist_rollout_items(&[RolloutItem::EventMsg(user_msg.as_legacy_event())]) + + if let Some(item @ TurnItem::UserMessage(_)) = turn_item { + self.emit_turn_item_started_completed(turn_context, item, false) .await; } } @@ -1530,7 +1535,7 @@ pub(crate) async fn run_task( review_thread_history.extend(sess.build_initial_context(turn_context.as_ref())); review_thread_history.push(initial_input_for_turn.into()); } else { - sess.record_input_and_rollout_usermsg(&initial_input_for_turn) + sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn) .await; } diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index 4f6ffcaac0..a66eebfe5e 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -1,4 +1,3 @@ -use crate::protocol::EventMsg; use codex_protocol::items::AgentMessageContent; use codex_protocol::items::AgentMessageItem; use codex_protocol::items::ReasoningItem; @@ -48,11 +47,11 @@ fn parse_user_message(message: &[ContentItem]) -> Option { } fn parse_agent_message(message: &[ContentItem]) -> AgentMessageItem { - let mut output: String = String::new(); + let mut content: Vec = Vec::new(); for content_item in message.iter() { match content_item { ContentItem::OutputText { text } => { - output = text.to_string(); + content.push(AgentMessageContent::Text { text: text.clone() }); } _ => { warn!( @@ -62,7 +61,7 @@ fn parse_agent_message(message: &[ContentItem]) -> AgentMessageItem { } } } - AgentMessageItem::new(&[AgentMessageContent::Text { text: output }]) + AgentMessageItem::new(&content) } pub fn parse_turn_item(item: &ResponseItem) -> Option { @@ -116,6 +115,7 @@ pub fn parse_turn_item(item: &ResponseItem) -> Option { #[cfg(test)] mod tests { use super::parse_turn_item; + use codex_protocol::items::AgentMessageContent; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemContent; @@ -163,6 +163,29 @@ mod tests { } } + #[test] + fn parses_agent_message() { + let item = ResponseItem::Message { + id: Some("msg-1".to_string()), + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "Hello from Codex".to_string(), + }], + }; + + let turn_item = parse_turn_item(&item).expect("expected agent message turn item"); + + match turn_item { + TurnItem::AgentMessage(message) => { + let Some(AgentMessageContent::Text { text }) = message.content.first() else { + panic!("expected agent message text content"); + }; + assert_eq!(text, "Hello from Codex"); + } + other => panic!("expected TurnItem::AgentMessage, got {other:?}"), + } + } + #[test] fn parses_reasoning_summary_and_raw_content() { let item = ResponseItem::Reasoning { From 2751a50ea0d3e6f38a1dd89bc3fb091d587fc035 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 21 Oct 2025 13:12:18 -0700 Subject: [PATCH 7/9] core/event_mapping.rs, protocol/items.rs: remove encrypted_content from ReasoningItem and parsing logic --- codex-rs/core/src/event_mapping.rs | 1 - codex-rs/protocol/src/items.rs | 2 -- 2 files changed, 3 deletions(-) diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index a66eebfe5e..cf275b9828 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -97,7 +97,6 @@ pub fn parse_turn_item(item: &ResponseItem) -> Option { id: id.clone(), summary_text, raw_content, - encrypted_content: encrypted_content.clone(), })) } ResponseItem::WebSearchCall { diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index b1ba002652..27a9b94743 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -41,8 +41,6 @@ pub struct ReasoningItem { pub summary_text: Vec, #[serde(default)] pub raw_content: Vec, - #[serde(default)] - pub encrypted_content: Option, } #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] From d6f6bc0b095206a5d6e682b5962e6a6a757a5818 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 21 Oct 2025 13:47:36 -0700 Subject: [PATCH 8/9] event_mapping: remove encrypted_content checks and struct propagation in parse_turn_item and tests --- codex-rs/core/src/event_mapping.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index cf275b9828..0c41951ee5 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -76,7 +76,7 @@ pub fn parse_turn_item(item: &ResponseItem) -> Option { id, summary, content, - encrypted_content, + .. } => { let summary_text = summary .iter() @@ -212,7 +212,6 @@ mod tests { vec!["Step 1".to_string(), "Step 2".to_string()] ); assert_eq!(reasoning.raw_content, vec!["raw details".to_string()]); - assert_eq!(reasoning.encrypted_content, None); } other => panic!("expected TurnItem::Reasoning, got {other:?}"), } @@ -245,7 +244,6 @@ mod tests { reasoning.raw_content, vec!["raw step".to_string(), "final thought".to_string()] ); - assert_eq!(reasoning.encrypted_content, None); } other => panic!("expected TurnItem::Reasoning, got {other:?}"), } From 48fd46b5ba32dc959aa452c8479f7358f96ff8fc Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 21 Oct 2025 13:58:47 -0700 Subject: [PATCH 9/9] tests: add environment_context message to test conversation flow --- codex-rs/tui/src/resume_picker.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index 0d3ae07eb2..b735ff9374 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -968,6 +968,19 @@ mod tests { "role": "user", "content": [ { "type": "input_text", "text": "hi" }, + ] + }), + json!({ + "type": "message", + "role": "user", + "content": [ + { "type": "input_text", "text": "..." }, + ] + }), + json!({ + "type": "message", + "role": "user", + "content": [ { "type": "input_text", "text": "real question" }, { "type": "input_image", "image_url": "ignored" } ]