diff --git a/codex-rs/app-server/tests/suite/send_message.rs b/codex-rs/app-server/tests/suite/send_message.rs index e755a11b772..725f327b4cb 100644 --- a/codex-rs/app-server/tests/suite/send_message.rs +++ b/codex-rs/app-server/tests/suite/send_message.rs @@ -628,6 +628,7 @@ fn append_rollout_turn_context(path: &Path, timestamp: &str, model: &str) -> std model: model.to_string(), personality: None, collaboration_mode: None, + realtime_active: Some(false), effort: None, summary: ReasoningSummary::Auto, user_instructions: None, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 885554c4ca0..045e6f966fc 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -166,6 +166,19 @@ pub enum SteerInputError { ExpectedTurnMismatch { expected: String, actual: String }, EmptyInput, } + +/// Notes from the previous real user turn. +/// +/// Conceptually this is the same role that `previous_model` used to fill, but +/// it can carry other prior-turn settings that matter when constructing +/// sensible state-change diffs or full-context reinjection, such as model +/// switches or detecting a prior `realtime_active -> false` transition. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct PreviousTurnSettings { + pub(crate) model: String, + pub(crate) realtime_active: Option, +} + use crate::exec_policy::ExecPolicyUpdateError; use crate::feedback_tags; use crate::file_watcher::FileWatcher; @@ -603,6 +616,7 @@ impl TurnSkillsContext { #[derive(Debug)] pub(crate) struct TurnContext { pub(crate) sub_id: String, + pub(crate) realtime_active: bool, pub(crate) config: Arc, pub(crate) auth_manager: Option>, pub(crate) model_info: ModelInfo, @@ -690,6 +704,7 @@ impl TurnContext { Self { sub_id: self.sub_id.clone(), + realtime_active: self.realtime_active, config: Arc::new(config), auth_manager: self.auth_manager.clone(), model_info: model_info.clone(), @@ -753,6 +768,7 @@ impl TurnContext { model: self.model_info.slug.clone(), personality: self.personality, collaboration_mode: Some(self.collaboration_mode.clone()), + realtime_active: Some(self.realtime_active), effort: self.reasoning_effort, summary: self.reasoning_summary, user_instructions: self.user_instructions.clone(), @@ -1063,6 +1079,7 @@ impl Session { let (current_date, timezone) = local_time_context(); TurnContext { sub_id, + realtime_active: false, config: per_turn_config.clone(), auth_manager: auth_manager_for_context, model_info: model_info.clone(), @@ -1732,13 +1749,13 @@ impl Session { // TODO(ccunningham): Defer initial context insertion until the first real turn // starts so it reflects the actual first-turn settings (permissions, etc.) and // we do not emit model-visible "diff" updates before the first user message. - let items = self.build_initial_context(&turn_context, None).await; + let items = self.build_initial_context(&turn_context).await; self.record_conversation_items(&turn_context, &items).await; { let mut state = self.state.lock().await; state.set_reference_context_item(Some(turn_context.to_turn_context_item())); } - self.set_previous_model(None).await; + self.set_previous_turn_settings(None).await; // Ensure initial items are visible to immediate readers (e.g., tests, forks). self.flush_rollout().await; } @@ -1746,19 +1763,25 @@ impl Session { let rollout_items = resumed_history.history; let restored_tool_selection = Self::extract_mcp_tool_selection_from_rollout(&rollout_items); + let reconstructed_rollout = self .reconstruct_history_from_rollout(&turn_context, &rollout_items) .await; - let previous_model = reconstructed_rollout.previous_model.clone(); - let curr = turn_context.model_info.slug.as_str(); + let previous_turn_settings = reconstructed_rollout.previous_turn_settings.clone(); + self.set_previous_turn_settings(previous_turn_settings.clone()) + .await; { let mut state = self.state.lock().await; state.set_reference_context_item(reconstructed_rollout.reference_context_item); } - self.set_previous_model(previous_model.clone()).await; // If resuming, warn when the last recorded model differs from the current one. - if let Some(prev) = previous_model.as_deref().filter(|p| *p != curr) { + let curr: &str = turn_context.model_info.slug.as_str(); + if let Some(prev) = previous_turn_settings + .as_ref() + .map(|settings| settings.model.as_str()) + .filter(|model| *model != curr) + { warn!("resuming session with different model: previous={prev}, current={curr}"); self.send_event( &turn_context, @@ -1796,11 +1819,20 @@ impl Session { InitialHistory::Forked(rollout_items) => { let restored_tool_selection = Self::extract_mcp_tool_selection_from_rollout(&rollout_items); + let reconstructed_rollout = self .reconstruct_history_from_rollout(&turn_context, &rollout_items) .await; - let previous_model = reconstructed_rollout.previous_model.clone(); - self.set_previous_model(previous_model).await; + self.set_previous_turn_settings( + reconstructed_rollout.previous_turn_settings.clone(), + ) + .await; + { + let mut state = self.state.lock().await; + state.set_reference_context_item( + reconstructed_rollout.reference_context_item.clone(), + ); + } // Always add response items to conversation history let reconstructed_history = reconstructed_rollout.history; @@ -1825,7 +1857,7 @@ impl Session { } // Append the current session's initial context after the reconstructed history. - let initial_context = self.build_initial_context(&turn_context, None).await; + let initial_context = self.build_initial_context(&turn_context).await; self.record_conversation_items(&turn_context, &initial_context) .await; { @@ -1897,14 +1929,17 @@ impl Session { active_selected_tools } - async fn previous_model(&self) -> Option { + async fn previous_turn_settings(&self) -> Option { let state = self.state.lock().await; - state.previous_model() + state.previous_turn_settings() } - pub(crate) async fn set_previous_model(&self, previous_model: Option) { + pub(crate) async fn set_previous_turn_settings( + &self, + previous_turn_settings: Option, + ) { let mut state = self.state.lock().await; - state.set_previous_model(previous_model); + state.set_previous_turn_settings(previous_turn_settings); } fn maybe_refresh_shell_snapshot_for_cwd( @@ -2067,6 +2102,7 @@ impl Session { Arc::clone(&self.js_repl), skills_outcome, ); + turn_context.realtime_active = self.conversation.running_state().await.is_some(); if let Some(final_schema) = final_output_json_schema { turn_context.final_output_json_schema = final_schema; @@ -2226,21 +2262,24 @@ impl Session { .await } - fn build_settings_update_items( + async fn build_settings_update_items( &self, reference_context_item: Option<&TurnContextItem>, - previous_user_turn_model: Option<&str>, current_context: &TurnContext, ) -> Vec { // TODO: Make context updates a pure diff of persisted previous/current TurnContextItem // state so replay/backtracking is deterministic. Runtime inputs that affect model-visible - // context (shell, exec policy, feature gates, previous-model bridge) should be persisted + // context (shell, exec policy, feature gates, previous-turn bridge) should be persisted // state or explicit non-state replay events. + let previous_turn_settings = { + let state = self.state.lock().await; + state.previous_turn_settings() + }; let shell = self.user_shell(); let exec_policy = self.services.exec_policy.current(); crate::context_manager::updates::build_settings_update_items( reference_context_item, - previous_user_turn_model, + previous_turn_settings.as_ref(), current_context, shell.as_ref(), exec_policy.as_ref(), @@ -2879,14 +2918,22 @@ impl Session { pub(crate) async fn build_initial_context( &self, turn_context: &TurnContext, - previous_user_turn_model: Option<&str>, ) -> Vec { let mut developer_sections = Vec::::with_capacity(8); let mut contextual_user_sections = Vec::::with_capacity(2); let shell = self.user_shell(); + let (reference_context_item, previous_turn_settings, collaboration_mode, base_instructions) = { + let state = self.state.lock().await; + ( + state.reference_context_item(), + state.previous_turn_settings(), + state.session_configuration.collaboration_mode.clone(), + state.session_configuration.base_instructions.clone(), + ) + }; if let Some(model_switch_message) = crate::context_manager::updates::build_model_instructions_update_item( - previous_user_turn_model, + previous_turn_settings.as_ref(), turn_context, ) { @@ -2914,18 +2961,18 @@ impl Session { developer_sections.push(memory_prompt); } // Add developer instructions from collaboration_mode if they exist and are non-empty - let (collaboration_mode, base_instructions) = { - let state = self.state.lock().await; - ( - state.session_configuration.collaboration_mode.clone(), - state.session_configuration.base_instructions.clone(), - ) - }; if let Some(collab_instructions) = DeveloperInstructions::from_collaboration_mode(&collaboration_mode) { developer_sections.push(collab_instructions.into_text()); } + if let Some(realtime_update) = crate::context_manager::updates::build_initial_realtime_item( + reference_context_item.as_ref(), + previous_turn_settings.as_ref(), + turn_context, + ) { + developer_sections.push(realtime_update.into_text()); + } if self.features.enabled(Feature::Personality) && let Some(personality) = turn_context.personality { @@ -3027,7 +3074,6 @@ impl Session { pub(crate) async fn record_context_updates_and_set_reference_context_item( &self, turn_context: &TurnContext, - previous_user_turn_model: Option<&str>, ) { let reference_context_item = { let state = self.state.lock().await; @@ -3035,15 +3081,11 @@ impl Session { }; let should_inject_full_context = reference_context_item.is_none(); let context_items = if should_inject_full_context { - self.build_initial_context(turn_context, previous_user_turn_model) - .await + self.build_initial_context(turn_context).await } else { // Steady-state path: append only context diffs to minimize token overhead. - self.build_settings_update_items( - reference_context_item.as_ref(), - previous_user_turn_model, - turn_context, - ) + self.build_settings_update_items(reference_context_item.as_ref(), turn_context) + .await }; let turn_context_item = turn_context.to_turn_context_item(); if !context_items.is_empty() { @@ -4307,8 +4349,8 @@ mod handlers { // TODO(ccunningham): Fix rollback/backtracking baseline handling. // We clear `reference_context_item` here, but should restore the // post-rollback baseline from the surviving history/rollout instead. - // Truncating history should also invalidate/recompute `previous_model` - // so the next regular turn replays any dropped model-switch + // Truncating history should also invalidate/recompute `previous_turn_settings` + // so the next regular turn replays any dropped model/realtime // instructions. history.drop_last_n_user_turns(num_turns); @@ -4552,6 +4594,7 @@ async fn spawn_review_thread( let review_turn_context = TurnContext { sub_id: review_turn_id, + realtime_active: parent_turn_context.realtime_active, config: per_turn_config, auth_manager: auth_manager_for_context, model_info: model_info.clone(), @@ -4711,12 +4754,8 @@ pub(crate) async fn run_turn( let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref()); - let previous_model = sess.previous_model().await; - sess.record_context_updates_and_set_reference_context_item( - turn_context.as_ref(), - previous_model.as_deref(), - ) - .await; + sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref()) + .await; let available_connectors = if turn_context.config.features.enabled(Feature::Apps) { let mcp_tools = match sess @@ -4822,11 +4861,14 @@ pub(crate) async fn run_turn( let response_item: ResponseItem = initial_input_for_turn.clone().into(); sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item) .await; - // Track the previous-model baseline from the regular user-turn path only so + // Track the previous-turn baseline from the regular user-turn path only so // standalone tasks (compact/shell/review/undo) cannot suppress future - // `` injections. - sess.set_previous_model(Some(turn_context.model_info.slug.clone())) - .await; + // model/realtime injections. + sess.set_previous_turn_settings(Some(PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(turn_context.realtime_active), + })) + .await; if !skill_items.is_empty() { sess.record_conversation_items(&turn_context, &skill_items) @@ -4934,7 +4976,6 @@ pub(crate) async fn run_turn( &sess, &turn_context, InitialContextInjection::BeforeLastUserMessage, - previous_model.as_deref(), ) .await .is_err() @@ -5059,13 +5100,7 @@ async fn run_pre_sampling_compact( .unwrap_or(i64::MAX); // Compact if the total usage tokens are greater than the auto compact limit if total_usage_tokens >= auto_compact_limit { - run_auto_compact( - sess, - turn_context, - InitialContextInjection::DoNotInject, - None, - ) - .await?; + run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?; } Ok(()) } @@ -5081,12 +5116,12 @@ async fn maybe_run_previous_model_inline_compact( turn_context: &Arc, total_usage_tokens: i64, ) -> CodexResult { - let Some(previous_model) = sess.previous_model().await else { + let Some(previous_turn_settings) = sess.previous_turn_settings().await else { return Ok(false); }; let previous_model_turn_context = Arc::new( turn_context - .with_model(previous_model, &sess.services.models_manager) + .with_model(previous_turn_settings.model, &sess.services.models_manager) .await, ); @@ -5108,7 +5143,6 @@ async fn maybe_run_previous_model_inline_compact( sess, &previous_model_turn_context, InitialContextInjection::DoNotInject, - None, ) .await?; return Ok(true); @@ -5120,14 +5154,12 @@ async fn run_auto_compact( sess: &Arc, turn_context: &Arc, initial_context_injection: InitialContextInjection, - previous_user_turn_model: Option<&str>, ) -> CodexResult<()> { if should_use_remote_compact_task(&turn_context.provider) { run_inline_remote_auto_compact_task( Arc::clone(sess), Arc::clone(turn_context), initial_context_injection, - previous_user_turn_model, ) .await?; } else { @@ -5135,7 +5167,6 @@ async fn run_auto_compact( Arc::clone(sess), Arc::clone(turn_context), initial_context_injection, - previous_user_turn_model, ) .await?; } @@ -6506,6 +6537,23 @@ mod tests { } } + fn developer_input_texts(items: &[ResponseItem]) -> Vec<&str> { + items + .iter() + .filter_map(|item| match item { + ResponseItem::Message { role, content, .. } if role == "developer" => { + Some(content.as_slice()) + } + _ => None, + }) + .flat_map(|content| content.iter()) + .filter_map(|item| match item { + ContentItem::InputText { text } => Some(text.as_str()), + _ => None, + }) + .collect() + } + fn make_connector(id: &str, name: &str) -> AppInfo { AppInfo { id: id.to_string(), @@ -7157,14 +7205,14 @@ mod tests { assert_eq!(expected, history_before_seed.raw_items()); session - .record_context_updates_and_set_reference_context_item(&turn_context, None) + .record_context_updates_and_set_reference_context_item(&turn_context) .await; - expected.extend(session.build_initial_context(&turn_context, None).await); + expected.extend(session.build_initial_context(&turn_context).await); let history_after_seed = session.clone_history().await; assert_eq!(expected, history_after_seed.raw_items()); session - .record_context_updates_and_set_reference_context_item(&turn_context, None) + .record_context_updates_and_set_reference_context_item(&turn_context) .await; let history_after_second_seed = session.clone_history().await; assert_eq!( @@ -7324,7 +7372,7 @@ mod tests { let reconstruction_turn = session.new_default_turn().await; expected.extend( session - .build_initial_context(reconstruction_turn.as_ref(), None) + .build_initial_context(reconstruction_turn.as_ref()) .await, ); let history = session.state.lock().await.clone_history(); @@ -7332,7 +7380,7 @@ mod tests { } #[tokio::test] - async fn record_initial_history_forked_hydrates_previous_model() { + async fn record_initial_history_forked_hydrates_previous_turn_settings() { let (session, turn_context) = make_session_and_context().await; let previous_model = "forked-rollout-model"; let previous_context_item = TurnContextItem { @@ -7346,6 +7394,7 @@ mod tests { model: previous_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -7387,8 +7436,11 @@ mod tests { .await; assert_eq!( - session.previous_model().await, - Some(previous_model.to_string()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: previous_model.to_string(), + realtime_active: Some(turn_context.realtime_active), + }) ); } @@ -7396,7 +7448,7 @@ mod tests { async fn thread_rollback_drops_last_turn_from_history() { let (sess, tc, rx) = make_session_and_context_with_rx().await; - let initial_context = sess.build_initial_context(tc.as_ref(), None).await; + let initial_context = sess.build_initial_context(tc.as_ref()).await; sess.record_into_history(&initial_context, tc.as_ref()) .await; @@ -7443,8 +7495,11 @@ mod tests { }, ]; sess.record_into_history(&turn_2, tc.as_ref()).await; - sess.set_previous_model(Some("previous-regular-model".to_string())) - .await; + sess.set_previous_turn_settings(Some(PreviousTurnSettings { + model: "previous-regular-model".to_string(), + realtime_active: Some(tc.realtime_active), + })) + .await; handlers::thread_rollback(&sess, "sub-1".to_string(), 1).await; @@ -7458,8 +7513,11 @@ mod tests { let history = sess.clone_history().await; assert_eq!(expected, history.raw_items()); assert_eq!( - sess.previous_model().await, - Some("previous-regular-model".to_string()) + sess.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: "previous-regular-model".to_string(), + realtime_active: Some(tc.realtime_active), + }) ); } @@ -7467,7 +7525,7 @@ mod tests { async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns() { let (sess, tc, rx) = make_session_and_context_with_rx().await; - let initial_context = sess.build_initial_context(tc.as_ref(), None).await; + let initial_context = sess.build_initial_context(tc.as_ref()).await; sess.record_into_history(&initial_context, tc.as_ref()) .await; @@ -7495,7 +7553,7 @@ mod tests { async fn thread_rollback_fails_when_turn_in_progress() { let (sess, tc, rx) = make_session_and_context_with_rx().await; - let initial_context = sess.build_initial_context(tc.as_ref(), None).await; + let initial_context = sess.build_initial_context(tc.as_ref()).await; sess.record_into_history(&initial_context, tc.as_ref()) .await; @@ -7516,7 +7574,7 @@ mod tests { async fn thread_rollback_fails_when_num_turns_is_zero() { let (sess, tc, rx) = make_session_and_context_with_rx().await; - let initial_context = sess.build_initial_context(tc.as_ref(), None).await; + let initial_context = sess.build_initial_context(tc.as_ref()).await; sess.record_into_history(&initial_context, tc.as_ref()) .await; @@ -8457,9 +8515,9 @@ mod tests { } #[tokio::test] - async fn spawn_task_does_not_update_previous_model_for_non_run_turn_tasks() { + async fn spawn_task_does_not_update_previous_turn_settings_for_non_run_turn_tasks() { let (sess, tc, _rx) = make_session_and_context_with_rx().await; - sess.set_previous_model(None).await; + sess.set_previous_turn_settings(None).await; let input = vec![UserInput::Text { text: "hello".to_string(), text_elements: Vec::new(), @@ -8476,7 +8534,7 @@ mod tests { .await; sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - assert_eq!(sess.previous_model().await, None); + assert_eq!(sess.previous_turn_settings().await, None); } #[tokio::test] @@ -8515,11 +8573,9 @@ mod tests { current_context.config = Arc::new(config); let reference_context_item = previous_context.to_turn_context_item(); - let update_items = session.build_settings_update_items( - Some(&reference_context_item), - None, - ¤t_context, - ); + let update_items = session + .build_settings_update_items(Some(&reference_context_item), ¤t_context) + .await; let environment_update = update_items .iter() @@ -8552,11 +8608,9 @@ mod tests { current_context.timezone = Some("Europe/Berlin".to_string()); let reference_context_item = previous_context.to_turn_context_item(); - let update_items = session.build_settings_update_items( - Some(&reference_context_item), - None, - ¤t_context, - ); + let update_items = session + .build_settings_update_items(Some(&reference_context_item), ¤t_context) + .await; let environment_update = update_items .iter() @@ -8574,15 +8628,176 @@ mod tests { assert!(environment_update.contains("Europe/Berlin")); } + #[tokio::test] + async fn build_settings_update_items_emits_realtime_start_when_session_becomes_live() { + let (session, previous_context) = make_session_and_context().await; + let previous_context = Arc::new(previous_context); + let mut current_context = previous_context + .with_model( + previous_context.model_info.slug.clone(), + &session.services.models_manager, + ) + .await; + current_context.realtime_active = true; + + let update_items = session + .build_settings_update_items( + Some(&previous_context.to_turn_context_item()), + ¤t_context, + ) + .await; + + let developer_texts = developer_input_texts(&update_items); + assert!( + developer_texts + .iter() + .any(|text| text.contains("")), + "expected a realtime start update, got {developer_texts:?}" + ); + } + + #[tokio::test] + async fn build_settings_update_items_emits_realtime_end_when_session_stops_being_live() { + let (session, mut previous_context) = make_session_and_context().await; + previous_context.realtime_active = true; + let mut current_context = previous_context + .with_model( + previous_context.model_info.slug.clone(), + &session.services.models_manager, + ) + .await; + current_context.realtime_active = false; + + let update_items = session + .build_settings_update_items( + Some(&previous_context.to_turn_context_item()), + ¤t_context, + ) + .await; + + let developer_texts = developer_input_texts(&update_items); + assert!( + developer_texts + .iter() + .any(|text| text.contains("Reason: inactive")), + "expected a realtime end update, got {developer_texts:?}" + ); + } + + #[tokio::test] + async fn build_settings_update_items_uses_previous_turn_settings_for_realtime_end() { + let (session, previous_context) = make_session_and_context().await; + let mut previous_context_item = previous_context.to_turn_context_item(); + previous_context_item.realtime_active = None; + let previous_turn_settings = PreviousTurnSettings { + model: previous_context.model_info.slug.clone(), + realtime_active: Some(true), + }; + let mut current_context = previous_context + .with_model( + previous_context.model_info.slug.clone(), + &session.services.models_manager, + ) + .await; + current_context.realtime_active = false; + + session + .set_previous_turn_settings(Some(previous_turn_settings)) + .await; + let update_items = session + .build_settings_update_items(Some(&previous_context_item), ¤t_context) + .await; + + let developer_texts = developer_input_texts(&update_items); + assert!( + developer_texts + .iter() + .any(|text| text.contains("Reason: inactive")), + "expected a realtime end update from previous turn settings, got {developer_texts:?}" + ); + } + + #[tokio::test] + async fn build_initial_context_uses_previous_realtime_state() { + let (session, mut turn_context) = make_session_and_context().await; + turn_context.realtime_active = true; + + let initial_context = session.build_initial_context(&turn_context).await; + let developer_texts = developer_input_texts(&initial_context); + assert!( + developer_texts + .iter() + .any(|text| text.contains("")), + "expected initial context to describe active realtime state, got {developer_texts:?}" + ); + + let previous_context_item = turn_context.to_turn_context_item(); + { + let mut state = session.state.lock().await; + state.set_reference_context_item(Some(previous_context_item)); + } + let resumed_context = session.build_initial_context(&turn_context).await; + let resumed_developer_texts = developer_input_texts(&resumed_context); + assert!( + !resumed_developer_texts + .iter() + .any(|text| text.contains("")), + "did not expect a duplicate realtime update, got {resumed_developer_texts:?}" + ); + } + + #[tokio::test] + async fn build_initial_context_uses_previous_turn_settings_for_realtime_end() { + let (session, turn_context) = make_session_and_context().await; + let previous_turn_settings = PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(true), + }; + + session + .set_previous_turn_settings(Some(previous_turn_settings)) + .await; + let initial_context = session.build_initial_context(&turn_context).await; + let developer_texts = developer_input_texts(&initial_context); + assert!( + developer_texts + .iter() + .any(|text| text.contains("Reason: inactive")), + "expected initial context to describe an ended realtime session, got {developer_texts:?}" + ); + } + + #[tokio::test] + async fn build_initial_context_restates_realtime_start_when_reference_context_is_missing() { + let (session, mut turn_context) = make_session_and_context().await; + turn_context.realtime_active = true; + let previous_turn_settings = PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(true), + }; + + session + .set_previous_turn_settings(Some(previous_turn_settings)) + .await; + let initial_context = session.build_initial_context(&turn_context).await; + let developer_texts = developer_input_texts(&initial_context); + assert!( + developer_texts + .iter() + .any(|text| text.contains("")), + "expected initial context to restate active realtime when the reference context is missing, got {developer_texts:?}" + ); + } + #[tokio::test] async fn record_context_updates_and_set_reference_context_item_injects_full_context_when_baseline_missing() { let (session, turn_context) = make_session_and_context().await; session - .record_context_updates_and_set_reference_context_item(&turn_context, None) + .record_context_updates_and_set_reference_context_item(&turn_context) .await; let history = session.clone_history().await; - let initial_context = session.build_initial_context(&turn_context, None).await; + let initial_context = session.build_initial_context(&turn_context).await; assert_eq!(history.raw_items().to_vec(), initial_context); let current_context = session.reference_context_item().await; @@ -8610,7 +8825,7 @@ mod tests { .record_into_history(std::slice::from_ref(&compacted_summary), &turn_context) .await; session - .record_context_updates_and_set_reference_context_item(&turn_context, None) + .record_context_updates_and_set_reference_context_item(&turn_context) .await; { let mut state = session.state.lock().await; @@ -8621,12 +8836,12 @@ mod tests { .await; session - .record_context_updates_and_set_reference_context_item(&turn_context, None) + .record_context_updates_and_set_reference_context_item(&turn_context) .await; let history = session.clone_history().await; let mut expected_history = vec![compacted_summary]; - expected_history.extend(session.build_initial_context(&turn_context, None).await); + expected_history.extend(session.build_initial_context(&turn_context).await); assert_eq!(history.raw_items().to_vec(), expected_history); } @@ -8669,12 +8884,13 @@ mod tests { *rollout = Some(recorder); } - let update_items = - session.build_settings_update_items(Some(&previous_context_item), None, &turn_context); + let update_items = session + .build_settings_update_items(Some(&previous_context_item), &turn_context) + .await; assert_eq!(update_items, Vec::new()); session - .record_context_updates_and_set_reference_context_item(&turn_context, None) + .record_context_updates_and_set_reference_context_item(&turn_context) .await; assert_eq!( @@ -8711,10 +8927,15 @@ mod tests { #[tokio::test] async fn build_initial_context_prepends_model_switch_message() { let (session, turn_context) = make_session_and_context().await; + let previous_turn_settings = PreviousTurnSettings { + model: "previous-regular-model".to_string(), + realtime_active: None, + }; - let initial_context = session - .build_initial_context(&turn_context, Some("previous-regular-model")) + session + .set_previous_turn_settings(Some(previous_turn_settings)) .await; + let initial_context = session.build_initial_context(&turn_context).await; let ResponseItem::Message { role, content, .. } = &initial_context[0] else { panic!("expected developer message"); @@ -8776,10 +8997,13 @@ mod tests { } session - .record_context_updates_and_set_reference_context_item( - &turn_context, - Some(previous_context.model_info.slug.as_str()), - ) + .set_previous_turn_settings(Some(PreviousTurnSettings { + model: previous_context.model_info.slug.clone(), + realtime_active: Some(previous_context.realtime_active), + })) + .await; + session + .record_context_updates_and_set_reference_context_item(&turn_context) .await; session.ensure_rollout_materialized().await; session.flush_rollout().await; @@ -9198,7 +9422,7 @@ mod tests { // personality_spec) matches reconstruction. let reconstruction_turn = session.new_default_turn().await; let mut initial_context = session - .build_initial_context(reconstruction_turn.as_ref(), None) + .build_initial_context(reconstruction_turn.as_ref()) .await; // Ensure personality_spec is present when Personality is enabled, so expected matches // what reconstruction produces (build_initial_context may omit it when baked into model). diff --git a/codex-rs/core/src/codex/rollout_reconstruction.rs b/codex-rs/core/src/codex/rollout_reconstruction.rs index 1cfe5486693..11ddbc19285 100644 --- a/codex-rs/core/src/codex/rollout_reconstruction.rs +++ b/codex-rs/core/src/codex/rollout_reconstruction.rs @@ -5,7 +5,7 @@ use super::*; #[derive(Debug)] pub(super) struct RolloutReconstruction { pub(super) history: Vec, - pub(super) previous_model: Option, + pub(super) previous_turn_settings: Option, pub(super) reference_context_item: Option, } @@ -29,7 +29,7 @@ enum TurnReferenceContextItem { struct ActiveReplaySegment<'a> { turn_id: Option, counts_as_user_turn: bool, - previous_model: Option, + previous_turn_settings: Option, reference_context_item: TurnReferenceContextItem, base_replacement_history: Option<&'a [ResponseItem]>, } @@ -42,7 +42,7 @@ fn turn_ids_are_compatible(active_turn_id: Option<&str>, item_turn_id: Option<&s fn finalize_active_segment<'a>( active_segment: ActiveReplaySegment<'a>, base_replacement_history: &mut Option<&'a [ResponseItem]>, - previous_model: &mut Option, + previous_turn_settings: &mut Option, reference_context_item: &mut TurnReferenceContextItem, pending_rollback_turns: &mut usize, ) { @@ -64,9 +64,9 @@ fn finalize_active_segment<'a>( *base_replacement_history = Some(segment_base_replacement_history); } - // `previous_model` comes from the newest surviving user turn that established one. - if previous_model.is_none() && active_segment.counts_as_user_turn { - *previous_model = active_segment.previous_model; + // `previous_turn_settings` come from the newest surviving user turn that established them. + if previous_turn_settings.is_none() && active_segment.counts_as_user_turn { + *previous_turn_settings = active_segment.previous_turn_settings; } // `reference_context_item` comes from the newest surviving user turn baseline, or @@ -94,7 +94,7 @@ impl Session { // are both known; then replay only the buffered surviving tail forward to preserve exact // history semantics. let mut base_replacement_history: Option<&[ResponseItem]> = None; - let mut previous_model = None; + let mut previous_turn_settings = None; let mut reference_context_item = TurnReferenceContextItem::NeverSet; // Rollback is "drop the newest N user turns". While scanning in reverse, that becomes // "skip the next N user-turn segments we finalize". @@ -170,7 +170,10 @@ impl Session { active_segment.turn_id.as_deref(), ctx.turn_id.as_deref(), ) { - active_segment.previous_model = Some(ctx.model.clone()); + active_segment.previous_turn_settings = Some(PreviousTurnSettings { + model: ctx.model.clone(), + realtime_active: ctx.realtime_active, + }); if matches!( active_segment.reference_context_item, TurnReferenceContextItem::NeverSet @@ -192,7 +195,7 @@ impl Session { finalize_active_segment( active_segment, &mut base_replacement_history, - &mut previous_model, + &mut previous_turn_settings, &mut reference_context_item, &mut pending_rollback_turns, ); @@ -204,7 +207,7 @@ impl Session { } if base_replacement_history.is_some() - && previous_model.is_some() + && previous_turn_settings.is_some() && !matches!(reference_context_item, TurnReferenceContextItem::NeverSet) { // At this point we have both eager resume metadata values and the replacement- @@ -218,7 +221,7 @@ impl Session { finalize_active_segment( active_segment, &mut base_replacement_history, - &mut previous_model, + &mut previous_turn_settings, &mut reference_context_item, &mut pending_rollback_turns, ); @@ -287,7 +290,7 @@ impl Session { RolloutReconstruction { history: history.raw_items().to_vec(), - previous_model, + previous_turn_settings, reference_context_item, } } diff --git a/codex-rs/core/src/codex/rollout_reconstruction_tests.rs b/codex-rs/core/src/codex/rollout_reconstruction_tests.rs index d2cbca79068..97dbcdd9c55 100644 --- a/codex-rs/core/src/codex/rollout_reconstruction_tests.rs +++ b/codex-rs/core/src/codex/rollout_reconstruction_tests.rs @@ -34,7 +34,8 @@ fn assistant_message(text: &str) -> ResponseItem { } #[tokio::test] -async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_model() { +async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_turn_settings() +{ let (session, turn_context) = make_session_and_context().await; let previous_model = "previous-rollout-model"; let previous_context_item = TurnContextItem { @@ -48,6 +49,7 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ model: previous_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -65,12 +67,12 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ })) .await; - assert_eq!(session.previous_model().await, None); + assert_eq!(session.previous_turn_settings().await, None); assert!(session.reference_context_item().await.is_none()); } #[tokio::test] -async fn record_initial_history_resumed_hydrates_previous_model_from_lifecycle_turn_with_missing_turn_context_id() +async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lifecycle_turn_with_missing_turn_context_id() { let (session, turn_context) = make_session_and_context().await; let previous_model = "previous-rollout-model"; @@ -85,6 +87,7 @@ async fn record_initial_history_resumed_hydrates_previous_model_from_lifecycle_t model: previous_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -132,8 +135,11 @@ async fn record_initial_history_resumed_hydrates_previous_model_from_lifecycle_t .await; assert_eq!( - session.previous_model().await, - Some(previous_model.to_string()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: previous_model.to_string(), + realtime_active: Some(turn_context.realtime_active), + }) ); } @@ -220,8 +226,11 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_com vec![turn_one_user, turn_one_assistant] ); assert_eq!( - reconstructed.previous_model, - Some(turn_context.model_info.slug.clone()) + reconstructed.previous_turn_settings, + Some(PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert_eq!( serde_json::to_value(reconstructed.reference_context_item) @@ -299,8 +308,11 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_inc vec![turn_one_user, turn_one_assistant] ); assert_eq!( - reconstructed.previous_model, - Some(turn_context.model_info.slug.clone()) + reconstructed.previous_turn_settings, + Some(PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert_eq!( serde_json::to_value(reconstructed.reference_context_item) @@ -402,8 +414,11 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad vec![turn_one_user, turn_one_assistant] ); assert_eq!( - reconstructed.previous_model, - Some(turn_context.model_info.slug.clone()) + reconstructed.previous_turn_settings, + Some(PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert_eq!( serde_json::to_value(reconstructed.reference_context_item) @@ -456,7 +471,7 @@ async fn reconstruct_history_rollback_clears_history_and_metadata_when_exceeding .await; assert_eq!(reconstructed.history, Vec::new()); - assert_eq!(reconstructed.previous_model, None); + assert_eq!(reconstructed.previous_turn_settings, None); assert!(reconstructed.reference_context_item.is_none()); } @@ -519,7 +534,7 @@ async fn record_initial_history_resumed_rollback_skips_only_user_turns() { })) .await; - assert_eq!(session.previous_model().await, None); + assert_eq!(session.previous_turn_settings().await, None); assert!(session.reference_context_item().await.is_none()); } @@ -589,8 +604,11 @@ async fn record_initial_history_resumed_rollback_drops_incomplete_user_turn_comp .await; assert_eq!( - session.previous_model().await, - Some(turn_context.model_info.slug.clone()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert_eq!( serde_json::to_value(session.reference_context_item().await) @@ -637,7 +655,7 @@ async fn record_initial_history_resumed_does_not_seed_reference_context_item_aft })) .await; - assert_eq!(session.previous_model().await, None); + assert_eq!(session.previous_turn_settings().await, None); assert!(session.reference_context_item().await.is_none()); } @@ -730,6 +748,7 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis model: previous_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -780,8 +799,11 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis .await; assert_eq!( - session.previous_model().await, - Some(previous_model.to_string()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: previous_model.to_string(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert_eq!( serde_json::to_value(session.reference_context_item().await) @@ -797,6 +819,7 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis model: previous_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -824,6 +847,7 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu model: previous_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -896,8 +920,11 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu .await; assert_eq!( - session.previous_model().await, - Some(previous_model.to_string()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: previous_model.to_string(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert!(session.reference_context_item().await.is_none()); } @@ -925,6 +952,7 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo model: current_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -995,8 +1023,11 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo .await; assert_eq!( - session.previous_model().await, - Some(current_model.to_string()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: current_model.to_string(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert_eq!( serde_json::to_value(session.reference_context_item().await) @@ -1022,6 +1053,7 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea model: previous_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -1088,8 +1120,11 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea .await; assert_eq!( - session.previous_model().await, - Some(previous_model.to_string()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: previous_model.to_string(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert!(session.reference_context_item().await.is_none()); } @@ -1131,8 +1166,11 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_preserves_turn_ .await; assert_eq!( - session.previous_model().await, - Some(turn_context.model_info.slug.clone()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: turn_context.model_info.slug.clone(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert_eq!( serde_json::to_value(session.reference_context_item().await) @@ -1158,6 +1196,7 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear model: previous_model.to_string(), personality: turn_context.personality, collaboration_mode: Some(turn_context.collaboration_mode.clone()), + realtime_active: Some(turn_context.realtime_active), effort: turn_context.reasoning_effort, summary: turn_context.reasoning_summary, user_instructions: None, @@ -1234,8 +1273,11 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear .await; assert_eq!( - session.previous_model().await, - Some(previous_model.to_string()) + session.previous_turn_settings().await, + Some(PreviousTurnSettings { + model: previous_model.to_string(), + realtime_active: Some(turn_context.realtime_active), + }) ); assert!(session.reference_context_item().await.is_none()); } diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 2885a885fd2..11500e27cfc 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -4,6 +4,8 @@ use crate::ModelProviderInfo; use crate::Prompt; use crate::client::ModelClientSession; use crate::client_common::ResponseEvent; +#[cfg(test)] +use crate::codex::PreviousTurnSettings; use crate::codex::Session; use crate::codex::TurnContext; use crate::codex::get_last_assistant_message_from_turn; @@ -53,7 +55,6 @@ pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, initial_context_injection: InitialContextInjection, - previous_user_turn_model: Option<&str>, ) -> CodexResult<()> { let prompt = turn_context.compact_prompt().to_string(); let input = vec![UserInput::Text { @@ -62,14 +63,7 @@ pub(crate) async fn run_inline_auto_compact_task( text_elements: Vec::new(), }]; - run_compact_task_inner( - sess, - turn_context, - input, - initial_context_injection, - previous_user_turn_model, - ) - .await?; + run_compact_task_inner(sess, turn_context, input, initial_context_injection).await?; Ok(()) } @@ -89,7 +83,6 @@ pub(crate) async fn run_compact_task( turn_context, input, InitialContextInjection::DoNotInject, - None, ) .await } @@ -99,7 +92,6 @@ async fn run_compact_task_inner( turn_context: Arc, input: Vec, initial_context_injection: InitialContextInjection, - previous_user_turn_model: Option<&str>, ) -> CodexResult<()> { let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); sess.emit_turn_item_started(&turn_context, &compaction_item) @@ -207,9 +199,7 @@ async fn run_compact_task_inner( initial_context_injection, InitialContextInjection::BeforeLastUserMessage ) { - let initial_context = sess - .build_initial_context(turn_context.as_ref(), previous_user_turn_model) - .await; + let initial_context = sess.build_initial_context(turn_context.as_ref()).await; new_history = insert_initial_context_before_last_real_user_or_summary(new_history, initial_context); } @@ -453,18 +443,18 @@ mod tests { async fn process_compacted_history_with_test_session( compacted_history: Vec, - previous_user_turn_model: Option<&str>, + previous_turn_settings: Option<&PreviousTurnSettings>, ) -> (Vec, Vec) { let (session, turn_context) = crate::codex::make_session_and_context().await; - let initial_context = session - .build_initial_context(&turn_context, previous_user_turn_model) + session + .set_previous_turn_settings(previous_turn_settings.cloned()) .await; + let initial_context = session.build_initial_context(&turn_context).await; let refreshed = crate::compact_remote::process_compacted_history( &session, &turn_context, compacted_history, InitialContextInjection::BeforeLastUserMessage, - previous_user_turn_model, ) .await; (refreshed, initial_context) @@ -859,10 +849,14 @@ keep me updated end_turn: None, phase: None, }]; + let previous_turn_settings = PreviousTurnSettings { + model: "previous-regular-model".to_string(), + realtime_active: None, + }; let (refreshed, initial_context) = process_compacted_history_with_test_session( compacted_history, - Some("previous-regular-model"), + Some(&previous_turn_settings), ) .await; diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 8029ba1bb7e..6d8368bce45 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -26,15 +26,8 @@ pub(crate) async fn run_inline_remote_auto_compact_task( sess: Arc, turn_context: Arc, initial_context_injection: InitialContextInjection, - previous_user_turn_model: Option<&str>, ) -> CodexResult<()> { - run_remote_compact_task_inner( - &sess, - &turn_context, - initial_context_injection, - previous_user_turn_model, - ) - .await?; + run_remote_compact_task_inner(&sess, &turn_context, initial_context_injection).await?; Ok(()) } @@ -49,28 +42,16 @@ pub(crate) async fn run_remote_compact_task( }); sess.send_event(&turn_context, start_event).await; - run_remote_compact_task_inner( - &sess, - &turn_context, - InitialContextInjection::DoNotInject, - None, - ) - .await + run_remote_compact_task_inner(&sess, &turn_context, InitialContextInjection::DoNotInject).await } async fn run_remote_compact_task_inner( sess: &Arc, turn_context: &Arc, initial_context_injection: InitialContextInjection, - previous_user_turn_model: Option<&str>, ) -> CodexResult<()> { - if let Err(err) = run_remote_compact_task_inner_impl( - sess, - turn_context, - initial_context_injection, - previous_user_turn_model, - ) - .await + if let Err(err) = + run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await { let event = EventMsg::Error( err.to_error_event(Some("Error running remote compact task".to_string())), @@ -85,7 +66,6 @@ async fn run_remote_compact_task_inner_impl( sess: &Arc, turn_context: &Arc, initial_context_injection: InitialContextInjection, - previous_user_turn_model: Option<&str>, ) -> CodexResult<()> { let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); sess.emit_turn_item_started(turn_context, &compaction_item) @@ -147,7 +127,6 @@ async fn run_remote_compact_task_inner_impl( turn_context.as_ref(), new_history, initial_context_injection, - previous_user_turn_model, ) .await; @@ -176,7 +155,6 @@ pub(crate) async fn process_compacted_history( turn_context: &TurnContext, mut compacted_history: Vec, initial_context_injection: InitialContextInjection, - previous_user_turn_model: Option<&str>, ) -> Vec { // Mid-turn compaction is the only path that must inject initial context above the last user // message in the replacement history. Pre-turn compaction instead injects context after the @@ -185,8 +163,7 @@ pub(crate) async fn process_compacted_history( initial_context_injection, InitialContextInjection::BeforeLastUserMessage ) { - sess.build_initial_context(turn_context, previous_user_turn_model) - .await + sess.build_initial_context(turn_context).await } else { Vec::new() }; diff --git a/codex-rs/core/src/context_manager/updates.rs b/codex-rs/core/src/context_manager/updates.rs index b9fc537ec7c..63deb5c8083 100644 --- a/codex-rs/core/src/context_manager/updates.rs +++ b/codex-rs/core/src/context_manager/updates.rs @@ -1,3 +1,4 @@ +use crate::codex::PreviousTurnSettings; use crate::codex::TurnContext; use crate::environment_context::EnvironmentContext; use crate::features::Feature; @@ -64,6 +65,33 @@ fn build_collaboration_mode_update_item( } } +pub(crate) fn build_realtime_update_item( + previous: Option<&TurnContextItem>, + previous_turn_settings: Option<&PreviousTurnSettings>, + next: &TurnContext, +) -> Option { + match ( + previous.and_then(|item| item.realtime_active), + next.realtime_active, + ) { + (Some(true), false) => Some(DeveloperInstructions::realtime_end_message("inactive")), + (Some(false), true) | (None, true) => Some(DeveloperInstructions::realtime_start_message()), + (Some(true), true) | (Some(false), false) => None, + (None, false) => previous_turn_settings + .and_then(|settings| settings.realtime_active) + .filter(|realtime_active| *realtime_active) + .map(|_| DeveloperInstructions::realtime_end_message("inactive")), + } +} + +pub(crate) fn build_initial_realtime_item( + previous: Option<&TurnContextItem>, + previous_turn_settings: Option<&PreviousTurnSettings>, + next: &TurnContext, +) -> Option { + build_realtime_update_item(previous, previous_turn_settings, next) +} + fn build_personality_update_item( previous: Option<&TurnContextItem>, next: &TurnContext, @@ -100,11 +128,11 @@ pub(crate) fn personality_message_for( } pub(crate) fn build_model_instructions_update_item( - previous_user_turn_model: Option<&str>, + previous_turn_settings: Option<&PreviousTurnSettings>, next: &TurnContext, ) -> Option { - let previous_model = previous_user_turn_model?; - if previous_model == next.model_info.slug { + let previous_turn_settings = previous_turn_settings?; + if previous_turn_settings.model == next.model_info.slug { return None; } @@ -147,7 +175,7 @@ fn build_text_message(role: &str, text_sections: Vec) -> Option, - previous_user_turn_model: Option<&str>, + previous_turn_settings: Option<&PreviousTurnSettings>, next: &TurnContext, shell: &Shell, exec_policy: &Policy, @@ -157,9 +185,10 @@ pub(crate) fn build_settings_update_items( let developer_update_sections = [ // Keep model-switch instructions first so model-specific guidance is read before // any other context diffs on this turn. - build_model_instructions_update_item(previous_user_turn_model, next), + build_model_instructions_update_item(previous_turn_settings, next), build_permissions_update_item(previous, next, exec_policy), build_collaboration_mode_update_item(previous, next), + build_realtime_update_item(previous, previous_turn_settings, next), build_personality_update_item(previous, next, personality_feature_enabled), ] .into_iter() diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index d643d746825..861dd31649a 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -27,6 +27,8 @@ use codex_protocol::protocol::RealtimeConversationStartedEvent; use http::HeaderMap; use serde_json::Value; use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::debug; @@ -47,6 +49,7 @@ struct ConversationState { audio_tx: Sender, text_tx: Sender, task: JoinHandle<()>, + realtime_active: Arc, } #[allow(dead_code)] @@ -59,7 +62,9 @@ impl RealtimeConversationManager { pub(crate) async fn running_state(&self) -> Option<()> { let state = self.state.lock().await; - state.as_ref().map(|_| ()) + state + .as_ref() + .and_then(|state| state.realtime_active.load(Ordering::Relaxed).then_some(())) } pub(crate) async fn start( @@ -68,12 +73,13 @@ impl RealtimeConversationManager { extra_headers: Option, prompt: String, session_id: Option, - ) -> CodexResult> { + ) -> CodexResult<(Receiver, Arc)> { let previous_state = { let mut guard = self.state.lock().await; guard.take() }; if let Some(state) = previous_state { + state.realtime_active.store(false, Ordering::Relaxed); state.task.abort(); let _ = state.task.await; } @@ -97,6 +103,7 @@ impl RealtimeConversationManager { let (events_tx, events_rx) = async_channel::bounded::(OUTPUT_EVENTS_QUEUE_CAPACITY); + let realtime_active = Arc::new(AtomicBool::new(true)); let task = spawn_realtime_input_task(writer, events, text_rx, audio_rx, events_tx); let mut guard = self.state.lock().await; @@ -104,8 +111,9 @@ impl RealtimeConversationManager { audio_tx, text_tx, task, + realtime_active: Arc::clone(&realtime_active), }); - Ok(events_rx) + Ok((events_rx, realtime_active)) } pub(crate) async fn audio_in(&self, frame: RealtimeAudioFrame) -> CodexResult<()> { @@ -158,6 +166,7 @@ impl RealtimeConversationManager { }; if let Some(state) = state { + state.realtime_active.store(false, Ordering::Relaxed); state.task.abort(); let _ = state.task.await; } @@ -186,7 +195,7 @@ pub(crate) async fn handle_start( .session_id .or_else(|| Some(sess.conversation_id.to_string())); info!("starting realtime conversation"); - let events_rx = match sess + let (events_rx, realtime_active) = match sess .conversation .start(api_provider, None, prompt, requested_session_id.clone()) .await @@ -236,7 +245,7 @@ pub(crate) async fn handle_start( ))) .await; } - if let Some(()) = sess_clone.conversation.running_state().await { + if realtime_active.swap(false, Ordering::Relaxed) { info!("realtime conversation transport closed"); sess_clone .send_event_raw(ev(EventMsg::RealtimeConversationClosed( diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index d7404fd2c69..30f13c3170e 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -1402,6 +1402,7 @@ mod tests { model: "test-model".to_string(), personality: None, collaboration_mode: None, + realtime_active: None, effort: None, summary: ReasoningSummaryConfig::Auto, user_instructions: None, diff --git a/codex-rs/core/src/rollout/truncation.rs b/codex-rs/core/src/rollout/truncation.rs index 7b316c0bb5f..c50eacc48bd 100644 --- a/codex-rs/core/src/rollout/truncation.rs +++ b/codex-rs/core/src/rollout/truncation.rs @@ -193,7 +193,7 @@ mod tests { #[tokio::test] async fn ignores_session_prefix_messages_when_truncating_rollout_from_start() { let (session, turn_context) = make_session_and_context().await; - let mut items = session.build_initial_context(&turn_context, None).await; + let mut items = session.build_initial_context(&turn_context).await; items.push(user_msg("feature request")); items.push(assistant_msg("ack")); items.push(user_msg("second question")); diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 4523b935666..576db7cf3cb 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::collections::HashSet; use tokio::task::JoinHandle; +use crate::codex::PreviousTurnSettings; use crate::codex::SessionConfiguration; use crate::context_manager::ContextManager; use crate::error::Result as CodexResult; @@ -23,10 +24,10 @@ pub(crate) struct SessionState { pub(crate) server_reasoning_included: bool, pub(crate) dependency_env: HashMap, pub(crate) mcp_dependency_prompted: HashSet, - /// Model used by the latest regular user turn, used for model-switch handling - /// on subsequent regular turns (including full-context reinjection after - /// resume or `/compact`). - previous_model: Option, + /// Settings used by the latest regular user turn, used for turn-to-turn + /// model/realtime handling on subsequent regular turns (including full-context + /// reinjection after resume or `/compact`). + previous_turn_settings: Option, /// Startup regular task pre-created during session initialization. pub(crate) startup_regular_task: Option>>, pub(crate) active_mcp_tool_selection: Option>, @@ -44,7 +45,7 @@ impl SessionState { server_reasoning_included: false, dependency_env: HashMap::new(), mcp_dependency_prompted: HashSet::new(), - previous_model: None, + previous_turn_settings: None, startup_regular_task: None, active_mcp_tool_selection: None, active_connector_selection: HashSet::new(), @@ -60,11 +61,14 @@ impl SessionState { self.history.record_items(items, policy); } - pub(crate) fn previous_model(&self) -> Option { - self.previous_model.clone() + pub(crate) fn previous_turn_settings(&self) -> Option { + self.previous_turn_settings.clone() } - pub(crate) fn set_previous_model(&mut self, previous_model: Option) { - self.previous_model = previous_model; + pub(crate) fn set_previous_turn_settings( + &mut self, + previous_turn_settings: Option, + ) { + self.previous_turn_settings = previous_turn_settings; } pub(crate) fn clone_history(&self) -> ContextManager { diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 0a56cab1d66..ba4c4d457fa 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -715,7 +715,7 @@ mod tests { #[tokio::test] async fn ignores_session_prefix_messages_when_truncating() { let (session, turn_context) = make_session_and_context().await; - let mut items = session.build_initial_context(&turn_context, None).await; + let mut items = session.build_initial_context(&turn_context).await; items.push(user_msg("feature request")); items.push(assistant_msg("ack")); items.push(user_msg("second question")); diff --git a/codex-rs/core/tests/common/context_snapshot.rs b/codex-rs/core/tests/common/context_snapshot.rs index addb1bc31ed..5471fd8913a 100644 --- a/codex-rs/core/tests/common/context_snapshot.rs +++ b/codex-rs/core/tests/common/context_snapshot.rs @@ -214,11 +214,14 @@ pub fn format_labeled_items_snapshot( fn format_snapshot_text(text: &str, options: &ContextSnapshotOptions) -> String { match options.render_mode { ContextSnapshotRenderMode::RedactedText => { - canonicalize_snapshot_text(text).replace('\n', "\\n") + normalize_snapshot_line_endings(&canonicalize_snapshot_text(text)).replace('\n', "\\n") + } + ContextSnapshotRenderMode::FullText => { + normalize_snapshot_line_endings(text).replace('\n', "\\n") } - ContextSnapshotRenderMode::FullText => text.replace('\n', "\\n"), ContextSnapshotRenderMode::KindWithTextPrefix { max_chars } => { - let normalized = canonicalize_snapshot_text(text).replace('\n', "\\n"); + let normalized = normalize_snapshot_line_endings(&canonicalize_snapshot_text(text)) + .replace('\n', "\\n"); if normalized.chars().count() <= max_chars { normalized } else { @@ -230,6 +233,10 @@ fn format_snapshot_text(text: &str, options: &ContextSnapshotOptions) -> String } } +fn normalize_snapshot_line_endings(text: &str) -> String { + text.replace("\r\n", "\n").replace('\r', "\n") +} + fn canonicalize_snapshot_text(text: &str) -> String { if text.starts_with("") { return "".to_string(); @@ -308,6 +315,25 @@ mod tests { ); } + #[test] + fn full_text_mode_normalizes_crlf_line_endings() { + let items = vec![json!({ + "type": "message", + "role": "user", + "content": [{ + "type": "input_text", + "text": "line one\r\n\r\nline two" + }] + })]; + + let rendered = format_response_items_snapshot( + &items, + &ContextSnapshotOptions::default().render_mode(ContextSnapshotRenderMode::FullText), + ); + + assert_eq!(rendered, r"00:message/user:line one\n\nline two"); + } + #[test] fn redacted_text_mode_keeps_canonical_placeholders() { let items = vec![json!({ @@ -349,6 +375,29 @@ mod tests { ); } + #[test] + fn kind_with_text_prefix_mode_normalizes_crlf_line_endings() { + let items = vec![json!({ + "type": "message", + "role": "developer", + "content": [{ + "type": "input_text", + "text": "\r\nRealtime conversation started.\r\n\r\nYou are..." + }] + })]; + + let rendered = format_response_items_snapshot( + &items, + &ContextSnapshotOptions::default() + .render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 }), + ); + + assert_eq!( + rendered, + r"00:message/developer:\nRealtime conversation started.\n\nYou a..." + ); + } + #[test] fn image_only_message_is_rendered_as_non_text_span() { let items = vec![json!({ diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 573f38930b6..22329887872 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -9,10 +9,14 @@ use codex_core::compact::SUMMARY_PREFIX; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::ConversationStartParams; +use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::Op; +use codex_protocol::protocol::RealtimeConversationRealtimeEvent; +use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::user_input::UserInput; @@ -22,12 +26,15 @@ use core_test_support::context_snapshot::ContextSnapshotRenderMode; use core_test_support::responses; use core_test_support::responses::mount_sse_once; use core_test_support::responses::sse; +use core_test_support::responses::start_websocket_server; use core_test_support::skip_if_no_network; +use core_test_support::test_codex::TestCodexBuilder; use core_test_support::test_codex::TestCodexHarness; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; +use serde_json::json; use wiremock::ResponseTemplate; fn approx_token_count(text: &str) -> i64 { @@ -68,6 +75,104 @@ fn format_labeled_requests_snapshot( ) } +fn compacted_summary_only_output(summary: &str) -> Vec { + vec![ResponseItem::Compaction { + encrypted_content: summary_with_prefix(summary), + }] +} + +fn remote_realtime_test_codex_builder( + realtime_server: &responses::WebSocketTestServer, +) -> TestCodexBuilder { + let realtime_base_url = realtime_server.uri().to_string(); + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(move |config| { + config.experimental_realtime_ws_base_url = Some(realtime_base_url); + }) +} + +async fn start_remote_realtime_server() -> responses::WebSocketTestServer { + start_websocket_server(vec![vec![ + vec![json!({ + "type": "session.created", + "session": { "id": "sess_remote_compact" } + })], + // Keep the websocket open after startup so routed transcript items during the test do not + // exhaust the scripted responses and mark realtime inactive before the assertions run. + vec![], + vec![], + vec![], + vec![], + vec![], + vec![], + vec![], + vec![], + ]]) + .await +} + +async fn start_realtime_conversation(codex: &codex_core::CodexThread) -> Result<()> { + codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + wait_for_event_match(codex, |msg| match msg { + EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); + + wait_for_event_match(codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) => Some(session_id.clone()), + _ => None, + }) + .await; + + Ok(()) +} + +async fn close_realtime_conversation(codex: &codex_core::CodexThread) -> Result<()> { + codex.submit(Op::RealtimeConversationClose).await?; + wait_for_event_match(codex, |msg| match msg { + EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), + _ => None, + }) + .await; + Ok(()) +} + +fn assert_request_contains_realtime_start(request: &responses::ResponsesRequest) { + let body = request.body_json().to_string(); + assert!( + body.contains(""), + "expected request to restate realtime instructions" + ); + assert!( + !body.contains("Reason: inactive"), + "expected request to use realtime start instructions" + ); +} + +fn assert_request_contains_realtime_end(request: &responses::ResponsesRequest) { + let body = request.body_json().to_string(); + assert!( + body.contains(""), + "expected request to restate realtime instructions" + ); + assert!( + body.contains("Reason: inactive"), + "expected request to use realtime end instructions" + ); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_replaces_history_for_followups() -> Result<()> { skip_if_no_network!(Ok(())); @@ -1305,6 +1410,470 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume() Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn snapshot_request_shape_remote_pre_turn_compaction_restates_realtime_start() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = wiremock::MockServer::start().await; + let realtime_server = start_remote_realtime_server().await; + let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| { + config.model_auto_compact_token_limit = Some(200); + }); + let test = builder.build(&server).await?; + + let responses_mock = responses::mount_sse_sequence( + &server, + vec![ + responses::sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"), + responses::ev_completed_with_tokens("r1", 500), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"), + responses::ev_completed_with_tokens("r2", 80), + ]), + ], + ) + .await; + let compact_mock = responses::mount_compact_json_once( + &server, + serde_json::json!({ + "output": compacted_summary_only_output( + "REMOTE_PRETURN_REALTIME_STILL_ACTIVE_SUMMARY" + ) + }), + ) + .await; + + start_realtime_conversation(test.codex.as_ref()).await?; + + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_TWO".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + let requests = responses_mock.requests(); + assert_eq!(requests.len(), 2, "expected two model requests"); + + let compact_request = compact_mock.single_request(); + let post_compact_request = &requests[1]; + assert_request_contains_realtime_start(post_compact_request); + + insta::assert_snapshot!( + "remote_pre_turn_compaction_restates_realtime_start_shapes", + format_labeled_requests_snapshot( + "Remote pre-turn auto-compaction while realtime remains active: compaction clears the reference baseline, so the follow-up request restates realtime-start instructions.", + &[ + ("Remote Compaction Request", &compact_request), + ( + "Remote Post-Compaction History Layout", + post_compact_request + ), + ] + ) + ); + + close_realtime_conversation(test.codex.as_ref()).await?; + realtime_server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn snapshot_request_shape_remote_pre_turn_compaction_restates_realtime_end() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = wiremock::MockServer::start().await; + let realtime_server = start_remote_realtime_server().await; + let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| { + config.model_auto_compact_token_limit = Some(200); + }); + let test = builder.build(&server).await?; + + let responses_mock = responses::mount_sse_sequence( + &server, + vec![ + responses::sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"), + responses::ev_completed_with_tokens("r1", 500), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"), + responses::ev_completed_with_tokens("r2", 80), + ]), + ], + ) + .await; + let compact_mock = responses::mount_compact_json_once( + &server, + serde_json::json!({ + "output": compacted_summary_only_output( + "REMOTE_PRETURN_REALTIME_CLOSED_SUMMARY" + ) + }), + ) + .await; + + start_realtime_conversation(test.codex.as_ref()).await?; + + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + close_realtime_conversation(test.codex.as_ref()).await?; + + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_TWO".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + let requests = responses_mock.requests(); + assert_eq!(requests.len(), 2, "expected two model requests"); + + let compact_request = compact_mock.single_request(); + let post_compact_request = &requests[1]; + assert_request_contains_realtime_end(post_compact_request); + + insta::assert_snapshot!( + "remote_pre_turn_compaction_restates_realtime_end_shapes", + format_labeled_requests_snapshot( + "Remote pre-turn auto-compaction after realtime was closed between turns: the follow-up request emits realtime-end instructions from previous-turn settings even though compaction cleared the reference baseline.", + &[ + ("Remote Compaction Request", &compact_request), + ( + "Remote Post-Compaction History Layout", + post_compact_request + ), + ] + ) + ); + + realtime_server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn snapshot_request_shape_remote_manual_compact_restates_realtime_start() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = wiremock::MockServer::start().await; + let realtime_server = start_remote_realtime_server().await; + let mut builder = remote_realtime_test_codex_builder(&realtime_server); + let test = builder.build(&server).await?; + + let responses_mock = responses::mount_sse_sequence( + &server, + vec![ + responses::sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"), + responses::ev_completed_with_tokens("r1", 60), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"), + responses::ev_completed_with_tokens("r2", 80), + ]), + ], + ) + .await; + let compact_mock = responses::mount_compact_json_once( + &server, + serde_json::json!({ + "output": compacted_summary_only_output( + "REMOTE_MANUAL_REALTIME_STILL_ACTIVE_SUMMARY" + ) + }), + ) + .await; + + start_realtime_conversation(test.codex.as_ref()).await?; + + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + test.codex.submit(Op::Compact).await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_TWO".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + let requests = responses_mock.requests(); + assert_eq!(requests.len(), 2, "expected two model requests"); + + let compact_request = compact_mock.single_request(); + let post_compact_request = &requests[1]; + assert_request_contains_realtime_start(post_compact_request); + + insta::assert_snapshot!( + "remote_manual_compact_restates_realtime_start_shapes", + format_labeled_requests_snapshot( + "Remote manual /compact while realtime remains active: the next regular turn restates realtime-start instructions after compaction clears the baseline.", + &[ + ("Remote Compaction Request", &compact_request), + ( + "Remote Post-Compaction History Layout", + post_compact_request + ), + ] + ) + ); + + close_realtime_conversation(test.codex.as_ref()).await?; + realtime_server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn snapshot_request_shape_remote_mid_turn_compaction_does_not_restate_realtime_end() +-> Result<()> { + skip_if_no_network!(Ok(())); + + let server = wiremock::MockServer::start().await; + let realtime_server = start_remote_realtime_server().await; + let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| { + config.model_auto_compact_token_limit = Some(200); + }); + let test = builder.build(&server).await?; + + let responses_mock = responses::mount_sse_sequence( + &server, + vec![ + responses::sse(vec![ + responses::ev_assistant_message("setup", "REMOTE_SETUP_REPLY"), + responses::ev_completed_with_tokens("setup-response", 60), + ]), + responses::sse(vec![ + responses::ev_function_call("call-remote-mid-turn", DUMMY_FUNCTION_NAME, "{}"), + responses::ev_completed_with_tokens("r1", 500), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "REMOTE_MID_TURN_FINAL_REPLY"), + responses::ev_completed_with_tokens("r2", 80), + ]), + ], + ) + .await; + let compact_mock = responses::mount_compact_json_once( + &server, + serde_json::json!({ + "output": compacted_summary_only_output( + "REMOTE_MID_TURN_REALTIME_CLOSED_SUMMARY" + ) + }), + ) + .await; + + start_realtime_conversation(test.codex.as_ref()).await?; + + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "SETUP_USER".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + close_realtime_conversation(test.codex.as_ref()).await?; + + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_TWO".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + let requests = responses_mock.requests(); + assert_eq!(requests.len(), 3, "expected three model requests"); + + let second_turn_request = &requests[1]; + let compact_request = compact_mock.single_request(); + let post_compact_request = &requests[2]; + assert_request_contains_realtime_end(second_turn_request); + assert!( + !post_compact_request + .body_json() + .to_string() + .contains(""), + "did not expect post-compaction history to restate realtime instructions once the current turn had already established an inactive baseline" + ); + + insta::assert_snapshot!( + "remote_mid_turn_compaction_does_not_restate_realtime_end_shapes", + format_labeled_requests_snapshot( + "Remote mid-turn continuation compaction after realtime was closed before the turn: the initial second-turn request emits realtime-end instructions, but the continuation request does not restate them after compaction because the current turn already established the inactive baseline.", + &[ + ("Second Turn Initial Request", second_turn_request), + ("Remote Compaction Request", &compact_request), + ( + "Remote Post-Compaction History Layout", + post_compact_request + ), + ] + ) + ); + + realtime_server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn snapshot_request_shape_remote_compact_resume_restates_realtime_end() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = wiremock::MockServer::start().await; + let realtime_server = start_remote_realtime_server().await; + let mut builder = remote_realtime_test_codex_builder(&realtime_server); + let initial = builder.build(&server).await?; + let home = initial.home.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .expect("rollout path"); + + let responses_mock = responses::mount_sse_sequence( + &server, + vec![ + responses::sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"), + responses::ev_completed_with_tokens("r1", 60), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "REMOTE_AFTER_RESUME_REPLY"), + responses::ev_completed_with_tokens("r2", 80), + ]), + ], + ) + .await; + let compact_mock = responses::mount_compact_json_once( + &server, + serde_json::json!({ + "output": compacted_summary_only_output( + "REMOTE_RESUME_REALTIME_CLOSED_SUMMARY" + ) + }), + ) + .await; + + start_realtime_conversation(initial.codex.as_ref()).await?; + + initial + .codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + close_realtime_conversation(initial.codex.as_ref()).await?; + + initial.codex.submit(Op::Compact).await?; + wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + initial.codex.submit(Op::Shutdown).await?; + wait_for_event(&initial.codex, |ev| { + matches!(ev, EventMsg::ShutdownComplete) + }) + .await; + + let mut resume_builder = + test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let resumed = resume_builder.resume(&server, home, rollout_path).await?; + + resumed + .codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_TWO".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + let requests = responses_mock.requests(); + assert_eq!(requests.len(), 2, "expected two model requests"); + + let compact_request = compact_mock.single_request(); + let after_resume_request = &requests[1]; + assert_request_contains_realtime_end(after_resume_request); + + insta::assert_snapshot!( + "remote_compact_resume_restates_realtime_end_shapes", + format_labeled_requests_snapshot( + "After remote manual /compact and resume, the first resumed turn rebuilds history from the compaction item and restates realtime-end instructions from reconstructed previous-turn settings.", + &[ + ("Remote Compaction Request", &compact_request), + ("Remote Post-Resume History Layout", after_resume_request), + ] + ) + ); + + realtime_server.shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] // TODO(ccunningham): Update once remote pre-turn compaction includes incoming user input. async fn snapshot_request_shape_remote_pre_turn_compaction_including_incoming_user_message() diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 1f7504d41b7..b6191bc88eb 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -367,6 +367,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { server.shutdown().await; Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/core/tests/suite/resume_warning.rs b/codex-rs/core/tests/suite/resume_warning.rs index 4742b417ae8..ca40c0c191a 100644 --- a/codex-rs/core/tests/suite/resume_warning.rs +++ b/codex-rs/core/tests/suite/resume_warning.rs @@ -36,6 +36,7 @@ fn resume_history( model: previous_model.to_string(), personality: None, collaboration_mode: None, + realtime_active: None, effort: config.model_reasoning_effort, summary: config .model_reasoning_summary diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_compact_resume_restates_realtime_end_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_compact_resume_restates_realtime_end_shapes.snap new file mode 100644 index 00000000000..1b95615375d --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_compact_resume_restates_realtime_end_shapes.snap @@ -0,0 +1,24 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"After remote manual /compact and resume, the first resumed turn rebuilds history from the compaction item and restates realtime-end instructions from reconstructed previous-turn settings.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Resume History Layout\", after_resume_request),])" +--- +Scenario: After remote manual /compact and resume, the first resumed turn rebuilds history from the compaction item and restates realtime-end instructions from reconstructed previous-turn settings. + +## Remote Compaction Request +00:message/developer: +01:message/user[2]: + [01] + [02] > +02:message/developer:\nRealtime conversation started.\n\nYou a... +03:message/user:USER_ONE +04:message/assistant:REMOTE_FIRST_REPLY + +## Remote Post-Resume History Layout +00:compaction:encrypted=true +01:message/developer[2]: + [01] + [02] \nRealtime conversation ended.\n\nYou are... +02:message/user[2]: + [01] + [02] > +03:message/user:USER_TWO diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_restates_realtime_start_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_restates_realtime_start_shapes.snap new file mode 100644 index 00000000000..cb046308940 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_restates_realtime_start_shapes.snap @@ -0,0 +1,24 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote manual /compact while realtime remains active: the next regular turn restates realtime-start instructions after compaction clears the baseline.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", post_compact_request),])" +--- +Scenario: Remote manual /compact while realtime remains active: the next regular turn restates realtime-start instructions after compaction clears the baseline. + +## Remote Compaction Request +00:message/developer: +01:message/user[2]: + [01] + [02] > +02:message/developer:\nRealtime conversation started.\n\nYou a... +03:message/user:USER_ONE +04:message/assistant:REMOTE_FIRST_REPLY + +## Remote Post-Compaction History Layout +00:compaction:encrypted=true +01:message/developer[2]: + [01] + [02] \nRealtime conversation started.\n\nYou a... +02:message/user[2]: + [01] + [02] > +03:message/user:USER_TWO diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_mid_turn_compaction_does_not_restate_realtime_end_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_mid_turn_compaction_does_not_restate_realtime_end_shapes.snap new file mode 100644 index 00000000000..eb7e583874e --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_mid_turn_compaction_does_not_restate_realtime_end_shapes.snap @@ -0,0 +1,36 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote mid-turn continuation compaction after realtime was closed before the turn: the initial second-turn request emits realtime-end instructions, but the continuation request does not restate them after compaction because the current turn already established the inactive baseline.\",\n&[(\"Second Turn Initial Request\", second_turn_request),\n(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", post_compact_request),])" +--- +Scenario: Remote mid-turn continuation compaction after realtime was closed before the turn: the initial second-turn request emits realtime-end instructions, but the continuation request does not restate them after compaction because the current turn already established the inactive baseline. + +## Second Turn Initial Request +00:message/developer: +01:message/user[2]: + [01] + [02] > +02:message/developer:\nRealtime conversation started.\n\nYou a... +03:message/user:SETUP_USER +04:message/assistant:REMOTE_SETUP_REPLY +05:message/developer:\nRealtime conversation ended.\n\nYou are... +06:message/user:USER_TWO + +## Remote Compaction Request +00:message/developer: +01:message/user[2]: + [01] + [02] > +02:message/developer:\nRealtime conversation started.\n\nYou a... +03:message/user:SETUP_USER +04:message/assistant:REMOTE_SETUP_REPLY +05:message/developer:\nRealtime conversation ended.\n\nYou are... +06:message/user:USER_TWO +07:function_call/test_tool +08:function_call_output:unsupported call: test_tool + +## Remote Post-Compaction History Layout +00:message/developer: +01:message/user[2]: + [01] + [02] > +02:compaction:encrypted=true diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_restates_realtime_end_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_restates_realtime_end_shapes.snap new file mode 100644 index 00000000000..7f14509dd2b --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_restates_realtime_end_shapes.snap @@ -0,0 +1,24 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction after realtime was closed between turns: the follow-up request emits realtime-end instructions from previous-turn settings even though compaction cleared the reference baseline.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", post_compact_request),])" +--- +Scenario: Remote pre-turn auto-compaction after realtime was closed between turns: the follow-up request emits realtime-end instructions from previous-turn settings even though compaction cleared the reference baseline. + +## Remote Compaction Request +00:message/developer: +01:message/user[2]: + [01] + [02] > +02:message/developer:\nRealtime conversation started.\n\nYou a... +03:message/user:USER_ONE +04:message/assistant:REMOTE_FIRST_REPLY + +## Remote Post-Compaction History Layout +00:compaction:encrypted=true +01:message/developer[2]: + [01] + [02] \nRealtime conversation ended.\n\nYou are... +02:message/user[2]: + [01] + [02] > +03:message/user:USER_TWO diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_restates_realtime_start_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_restates_realtime_start_shapes.snap new file mode 100644 index 00000000000..a72f581bbc8 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_restates_realtime_start_shapes.snap @@ -0,0 +1,24 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction while realtime remains active: compaction clears the reference baseline, so the follow-up request restates realtime-start instructions.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", post_compact_request),])" +--- +Scenario: Remote pre-turn auto-compaction while realtime remains active: compaction clears the reference baseline, so the follow-up request restates realtime-start instructions. + +## Remote Compaction Request +00:message/developer: +01:message/user[2]: + [01] + [02] > +02:message/developer:\nRealtime conversation started.\n\nYou a... +03:message/user:USER_ONE +04:message/assistant:REMOTE_FIRST_REPLY + +## Remote Post-Compaction History Layout +00:compaction:encrypted=true +01:message/developer[2]: + [01] + [02] \nRealtime conversation started.\n\nYou a... +02:message/user[2]: + [01] + [02] > +03:message/user:USER_TWO diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index cf06636ad82..8461209823c 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -14,6 +14,8 @@ use crate::protocol::AskForApproval; use crate::protocol::COLLABORATION_MODE_CLOSE_TAG; use crate::protocol::COLLABORATION_MODE_OPEN_TAG; use crate::protocol::NetworkAccess; +use crate::protocol::REALTIME_CONVERSATION_CLOSE_TAG; +use crate::protocol::REALTIME_CONVERSATION_OPEN_TAG; use crate::protocol::SandboxPolicy; use crate::protocol::WritableRoot; use crate::user_input::UserInput; @@ -340,6 +342,9 @@ const SANDBOX_MODE_WORKSPACE_WRITE: &str = include_str!("prompts/permissions/sandbox_mode/workspace_write.md"); const SANDBOX_MODE_READ_ONLY: &str = include_str!("prompts/permissions/sandbox_mode/read_only.md"); +const REALTIME_START_INSTRUCTIONS: &str = include_str!("prompts/realtime/realtime_start.md"); +const REALTIME_END_INSTRUCTIONS: &str = include_str!("prompts/realtime/realtime_end.md"); + impl DeveloperInstructions { pub fn new>(text: T) -> Self { Self { text: text.into() } @@ -409,6 +414,20 @@ impl DeveloperInstructions { )) } + pub fn realtime_start_message() -> Self { + DeveloperInstructions::new(format!( + "{REALTIME_CONVERSATION_OPEN_TAG}\n{}\n{REALTIME_CONVERSATION_CLOSE_TAG}", + REALTIME_START_INSTRUCTIONS.trim() + )) + } + + pub fn realtime_end_message(reason: &str) -> Self { + DeveloperInstructions::new(format!( + "{REALTIME_CONVERSATION_OPEN_TAG}\n{}\n\nReason: {reason}\n{REALTIME_CONVERSATION_CLOSE_TAG}", + REALTIME_END_INSTRUCTIONS.trim() + )) + } + pub fn personality_spec_message(spec: String) -> Self { let message = format!( " The user has requested a new communication style. Future messages should adhere to the following personality: \n{spec} " diff --git a/codex-rs/protocol/src/prompts/realtime/realtime_end.md b/codex-rs/protocol/src/prompts/realtime/realtime_end.md new file mode 100644 index 00000000000..07ccae52865 --- /dev/null +++ b/codex-rs/protocol/src/prompts/realtime/realtime_end.md @@ -0,0 +1,5 @@ +Realtime conversation ended. + +You are still operating behind an intermediary rather than speaking to the user directly. Use the conversation transcript and current context to decide whether backend help is actually needed, and avoid verbose responses that only add latency. + +Subsequent user input may return to typed text rather than transcript-style text. Do not assume recognition errors or missing punctuation once realtime has ended. Resume normal chat behavior. diff --git a/codex-rs/protocol/src/prompts/realtime/realtime_start.md b/codex-rs/protocol/src/prompts/realtime/realtime_start.md new file mode 100644 index 00000000000..c7d363eff7a --- /dev/null +++ b/codex-rs/protocol/src/prompts/realtime/realtime_start.md @@ -0,0 +1,10 @@ +Realtime conversation started. + +You are operating as a backend executor behind an intermediary. The user does not talk to you directly. Any response you produce will be consumed by the intermediary and may be summarized before the user sees it. + +When invoked, you receive the latest conversation transcript and any relevant mode or metadata. The intermediary may invoke you even when backend help is not actually needed. Use the transcript to decide whether you should do work. If backend help is unnecessary, avoid verbose responses that add user-visible latency. + +When user text is routed from realtime, treat it as a transcript. It may be unpunctuated or contain recognition errors. + +- Ask brief clarification questions when needed. +- Keep responses concise and action-oriented. diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 880e6a24fe5..bc107ca6c75 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -70,6 +70,8 @@ pub const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = ""; pub const ENVIRONMENT_CONTEXT_CLOSE_TAG: &str = ""; pub const COLLABORATION_MODE_OPEN_TAG: &str = ""; pub const COLLABORATION_MODE_CLOSE_TAG: &str = ""; +pub const REALTIME_CONVERSATION_OPEN_TAG: &str = ""; +pub const REALTIME_CONVERSATION_CLOSE_TAG: &str = ""; pub const USER_MESSAGE_BEGIN: &str = "## My request for Codex:"; /// Submission Queue Entry - requests from user @@ -2146,6 +2148,8 @@ pub struct TurnContextItem { pub personality: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub collaboration_mode: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub realtime_active: Option, #[serde(skip_serializing_if = "Option::is_none")] pub effort: Option, pub summary: ReasoningSummaryConfig, @@ -3376,6 +3380,7 @@ mod tests { model: "gpt-5".to_string(), personality: None, collaboration_mode: None, + realtime_active: None, effort: None, summary: ReasoningSummaryConfig::Auto, user_instructions: None, diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index f9782389790..e62f884254a 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -260,6 +260,7 @@ mod tests { model: "gpt-5".to_string(), personality: None, collaboration_mode: None, + realtime_active: None, effort: None, summary: ReasoningSummary::Auto, user_instructions: None, @@ -296,6 +297,7 @@ mod tests { model: "gpt-5".to_string(), personality: None, collaboration_mode: None, + realtime_active: None, effort: None, summary: ReasoningSummary::Auto, user_instructions: None, diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 8dfdbcd8717..5d36ec441de 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -1280,6 +1280,7 @@ mod tests { model, personality: None, collaboration_mode: None, + realtime_active: Some(false), effort: config.model_reasoning_effort, summary: config .model_reasoning_summary