Skip to content
153 changes: 140 additions & 13 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ impl TurnSkillsContext {
#[derive(Debug)]
pub(crate) struct TurnContext {
pub(crate) sub_id: String,
pub(crate) realtime_active: bool,
pub(crate) config: Arc<Config>,
pub(crate) auth_manager: Option<Arc<AuthManager>>,
pub(crate) model_info: ModelInfo,
Expand Down Expand Up @@ -690,6 +691,7 @@ impl TurnContext {

Self {
sub_id: self.sub_id.clone(),
realtime_active: self.realtime_active,
config: Arc::new(config),
auth_manager: self.auth_manager.clone(),
model_info: model_info.clone(),
Expand Down Expand Up @@ -753,6 +755,7 @@ impl TurnContext {
model: self.model_info.slug.clone(),
personality: self.personality,
collaboration_mode: Some(self.collaboration_mode.clone()),
realtime_active: Some(self.realtime_active),
effort: self.reasoning_effort,
summary: self.reasoning_summary,
user_instructions: self.user_instructions.clone(),
Expand Down Expand Up @@ -1063,6 +1066,7 @@ impl Session {
let (current_date, timezone) = local_time_context();
TurnContext {
sub_id,
realtime_active: false,
config: per_turn_config.clone(),
auth_manager: auth_manager_for_context,
model_info: model_info.clone(),
Expand Down Expand Up @@ -1732,7 +1736,7 @@ impl Session {
// TODO(ccunningham): Defer initial context insertion until the first real turn
// starts so it reflects the actual first-turn settings (permissions, etc.) and
// we do not emit model-visible "diff" updates before the first user message.
let items = self.build_initial_context(&turn_context, None).await;
let items = self.build_initial_context(&turn_context, None, None).await;
self.record_conversation_items(&turn_context, &items).await;
{
let mut state = self.state.lock().await;
Expand Down Expand Up @@ -1800,6 +1804,7 @@ impl Session {
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
let previous_model = reconstructed_rollout.previous_model.clone();
let previous_context_item = reconstructed_rollout.reference_context_item.clone();
self.set_previous_model(previous_model).await;

// Always add response items to conversation history
Expand All @@ -1825,7 +1830,9 @@ impl Session {
}

// Append the current session's initial context after the reconstructed history.
let initial_context = self.build_initial_context(&turn_context, None).await;
let initial_context = self
.build_initial_context(&turn_context, previous_context_item.as_ref(), None)
.await;
self.record_conversation_items(&turn_context, &initial_context)
.await;
{
Expand Down Expand Up @@ -2067,6 +2074,7 @@ impl Session {
Arc::clone(&self.js_repl),
skills_outcome,
);
turn_context.realtime_active = self.conversation.running_state().await.is_some();

if let Some(final_schema) = final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
Expand Down Expand Up @@ -2879,6 +2887,7 @@ impl Session {
pub(crate) async fn build_initial_context(
&self,
turn_context: &TurnContext,
previous_context_item: Option<&TurnContextItem>,
previous_user_turn_model: Option<&str>,
) -> Vec<ResponseItem> {
let mut developer_sections = Vec::<String>::with_capacity(8);
Expand Down Expand Up @@ -2926,6 +2935,12 @@ impl Session {
{
developer_sections.push(collab_instructions.into_text());
}
if let Some(realtime_update) = crate::context_manager::updates::build_realtime_update_item(
previous_context_item,
turn_context,
) {
developer_sections.push(realtime_update.into_text());
}
if self.features.enabled(Feature::Personality)
&& let Some(personality) = turn_context.personality
{
Expand Down Expand Up @@ -3035,7 +3050,7 @@ impl Session {
};
let should_inject_full_context = reference_context_item.is_none();
let context_items = if should_inject_full_context {
self.build_initial_context(turn_context, previous_user_turn_model)
self.build_initial_context(turn_context, None, previous_user_turn_model)
.await
} else {
// Steady-state path: append only context diffs to minimize token overhead.
Expand Down Expand Up @@ -4552,6 +4567,7 @@ async fn spawn_review_thread(

let review_turn_context = TurnContext {
sub_id: review_turn_id,
realtime_active: parent_turn_context.realtime_active,
config: per_turn_config,
auth_manager: auth_manager_for_context,
model_info: model_info.clone(),
Expand Down Expand Up @@ -6506,6 +6522,23 @@ mod tests {
}
}

fn developer_input_texts(items: &[ResponseItem]) -> Vec<&str> {
items
.iter()
.filter_map(|item| match item {
ResponseItem::Message { role, content, .. } if role == "developer" => {
Some(content.as_slice())
}
_ => None,
})
.flat_map(|content| content.iter())
.filter_map(|item| match item {
ContentItem::InputText { text } => Some(text.as_str()),
_ => None,
})
.collect()
}

fn make_connector(id: &str, name: &str) -> AppInfo {
AppInfo {
id: id.to_string(),
Expand Down Expand Up @@ -7159,7 +7192,11 @@ mod tests {
session
.record_context_updates_and_set_reference_context_item(&turn_context, None)
.await;
expected.extend(session.build_initial_context(&turn_context, None).await);
expected.extend(
session
.build_initial_context(&turn_context, None, None)
.await,
);
let history_after_seed = session.clone_history().await;
assert_eq!(expected, history_after_seed.raw_items());

Expand Down Expand Up @@ -7324,7 +7361,7 @@ mod tests {
let reconstruction_turn = session.new_default_turn().await;
expected.extend(
session
.build_initial_context(reconstruction_turn.as_ref(), None)
.build_initial_context(reconstruction_turn.as_ref(), None, None)
.await,
);
let history = session.state.lock().await.clone_history();
Expand All @@ -7346,6 +7383,7 @@ mod tests {
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down Expand Up @@ -7396,7 +7434,7 @@ mod tests {
async fn thread_rollback_drops_last_turn_from_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;

let initial_context = sess.build_initial_context(tc.as_ref(), None).await;
let initial_context = sess.build_initial_context(tc.as_ref(), None, None).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;

Expand Down Expand Up @@ -7467,7 +7505,7 @@ mod tests {
async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;

let initial_context = sess.build_initial_context(tc.as_ref(), None).await;
let initial_context = sess.build_initial_context(tc.as_ref(), None, None).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;

Expand Down Expand Up @@ -7495,7 +7533,7 @@ mod tests {
async fn thread_rollback_fails_when_turn_in_progress() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;

let initial_context = sess.build_initial_context(tc.as_ref(), None).await;
let initial_context = sess.build_initial_context(tc.as_ref(), None, None).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;

Expand All @@ -7516,7 +7554,7 @@ mod tests {
async fn thread_rollback_fails_when_num_turns_is_zero() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;

let initial_context = sess.build_initial_context(tc.as_ref(), None).await;
let initial_context = sess.build_initial_context(tc.as_ref(), None, None).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;

Expand Down Expand Up @@ -8574,6 +8612,89 @@ mod tests {
assert!(environment_update.contains("<timezone>Europe/Berlin</timezone>"));
}

#[tokio::test]
async fn build_settings_update_items_emits_realtime_start_when_session_becomes_live() {
let (session, previous_context) = make_session_and_context().await;
let previous_context = Arc::new(previous_context);
let mut current_context = previous_context
.with_model(
previous_context.model_info.slug.clone(),
&session.services.models_manager,
)
.await;
current_context.realtime_active = true;

let update_items = session.build_settings_update_items(
Some(&previous_context.to_turn_context_item()),
None,
&current_context,
);

let developer_texts = developer_input_texts(&update_items);
assert!(
developer_texts
.iter()
.any(|text| text.contains("<realtime_conversation>")),
"expected a realtime start update, got {developer_texts:?}"
);
}

#[tokio::test]
async fn build_settings_update_items_emits_realtime_end_when_session_stops_being_live() {
let (session, mut previous_context) = make_session_and_context().await;
previous_context.realtime_active = true;
let mut current_context = previous_context
.with_model(
previous_context.model_info.slug.clone(),
&session.services.models_manager,
)
.await;
current_context.realtime_active = false;

let update_items = session.build_settings_update_items(
Some(&previous_context.to_turn_context_item()),
None,
&current_context,
);

let developer_texts = developer_input_texts(&update_items);
assert!(
developer_texts
.iter()
.any(|text| text.contains("Reason: inactive")),
"expected a realtime end update, got {developer_texts:?}"
);
}

#[tokio::test]
async fn build_initial_context_uses_previous_realtime_state() {
let (session, mut turn_context) = make_session_and_context().await;
turn_context.realtime_active = true;

let initial_context = session
.build_initial_context(&turn_context, None, None)
.await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
developer_texts
.iter()
.any(|text| text.contains("<realtime_conversation>")),
"expected initial context to describe active realtime state, got {developer_texts:?}"
);

let previous_context_item = turn_context.to_turn_context_item();
let resumed_context = session
.build_initial_context(&turn_context, Some(&previous_context_item), None)
.await;
let resumed_developer_texts = developer_input_texts(&resumed_context);
assert!(
!resumed_developer_texts
.iter()
.any(|text| text.contains("<realtime_conversation>")),
"did not expect a duplicate realtime update, got {resumed_developer_texts:?}"
);
}

#[tokio::test]
async fn record_context_updates_and_set_reference_context_item_injects_full_context_when_baseline_missing()
{
Expand All @@ -8582,7 +8703,9 @@ mod tests {
.record_context_updates_and_set_reference_context_item(&turn_context, None)
.await;
let history = session.clone_history().await;
let initial_context = session.build_initial_context(&turn_context, None).await;
let initial_context = session
.build_initial_context(&turn_context, None, None)
.await;
assert_eq!(history.raw_items().to_vec(), initial_context);

let current_context = session.reference_context_item().await;
Expand Down Expand Up @@ -8626,7 +8749,11 @@ mod tests {

let history = session.clone_history().await;
let mut expected_history = vec![compacted_summary];
expected_history.extend(session.build_initial_context(&turn_context, None).await);
expected_history.extend(
session
.build_initial_context(&turn_context, None, None)
.await,
);
assert_eq!(history.raw_items().to_vec(), expected_history);
}

Expand Down Expand Up @@ -8713,7 +8840,7 @@ mod tests {
let (session, turn_context) = make_session_and_context().await;

let initial_context = session
.build_initial_context(&turn_context, Some("previous-regular-model"))
.build_initial_context(&turn_context, None, Some("previous-regular-model"))
.await;

let ResponseItem::Message { role, content, .. } = &initial_context[0] else {
Expand Down Expand Up @@ -9198,7 +9325,7 @@ mod tests {
// personality_spec) matches reconstruction.
let reconstruction_turn = session.new_default_turn().await;
let mut initial_context = session
.build_initial_context(reconstruction_turn.as_ref(), None)
.build_initial_context(reconstruction_turn.as_ref(), None, None)
.await;
// Ensure personality_spec is present when Personality is enabled, so expected matches
// what reconstruction produces (build_initial_context may omit it when baked into model).
Expand Down
8 changes: 8 additions & 0 deletions codex-rs/core/src/codex/rollout_reconstruction_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down Expand Up @@ -85,6 +86,7 @@ async fn record_initial_history_resumed_hydrates_previous_model_from_lifecycle_t
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down Expand Up @@ -730,6 +732,7 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down Expand Up @@ -797,6 +800,7 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down Expand Up @@ -824,6 +828,7 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down Expand Up @@ -925,6 +930,7 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
model: current_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down Expand Up @@ -1022,6 +1028,7 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down Expand Up @@ -1158,6 +1165,7 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
Expand Down
Loading
Loading