Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 202 additions & 3 deletions codex-rs/app-server-test-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::VecDeque;
use std::ffi::OsString;
use std::fs;
use std::fs::OpenOptions;
use std::io::BufRead;
Expand Down Expand Up @@ -29,6 +30,7 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::DynamicToolSpec;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
Expand All @@ -55,6 +57,7 @@ use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadResumeParams;
Expand All @@ -78,6 +81,30 @@ use tungstenite::stream::MaybeTlsStream;
use url::Url;
use uuid::Uuid;

const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[
// Legacy codex/event (v1-style) deltas.
"codex/event/agent_message_content_delta",
"codex/event/agent_message_delta",
"codex/event/agent_reasoning_delta",
"codex/event/reasoning_content_delta",
"codex/event/reasoning_raw_content_delta",
"codex/event/exec_command_output_delta",
// Other legacy events.
"codex/event/exec_approval_request",
"codex/event/exec_command_begin",
"codex/event/exec_command_end",
"codex/event/exec_output",
"codex/event/item_started",
"codex/event/item_completed",
// v2 item deltas.
"item/agentMessage/delta",
"item/plan/delta",
"item/commandExecution/outputDelta",
"item/fileChange/outputDelta",
"item/reasoning/summaryTextDelta",
"item/reasoning/textDelta",
];

/// Minimal launcher that initializes the Codex app-server and logs the handshake.
#[derive(Parser)]
#[command(author = "Codex", version, about = "Bootstrap Codex app-server", long_about = None)]
Expand Down Expand Up @@ -180,6 +207,18 @@ enum CliCommand {
/// Follow-up user message for the second turn.
follow_up_message: String,
},
/// Trigger zsh-fork multi-subcommand approvals and assert expected approval behavior.
#[command(name = "trigger-zsh-fork-multi-cmd-approval")]
TriggerZshForkMultiCmdApproval {
/// Optional prompt; defaults to an explicit `/usr/bin/true && /usr/bin/true` command.
user_message: Option<String>,
/// Minimum number of command-approval callbacks expected in the turn.
#[arg(long, default_value_t = 2)]
min_approvals: usize,
/// One-based approval index to abort (e.g. --abort-on 2 aborts the second approval).
#[arg(long)]
abort_on: Option<usize>,
},
/// Trigger the ChatGPT login flow and wait for completion.
TestLogin,
/// Fetch the current account rate limits from the Codex app-server.
Expand Down Expand Up @@ -265,6 +304,21 @@ pub fn run() -> Result<()> {
&dynamic_tools,
)
}
CliCommand::TriggerZshForkMultiCmdApproval {
user_message,
min_approvals,
abort_on,
} => {
let endpoint = resolve_endpoint(codex_bin, url)?;
trigger_zsh_fork_multi_cmd_approval(
&endpoint,
&config_overrides,
user_message,
min_approvals,
abort_on,
&dynamic_tools,
)
}
CliCommand::TestLogin => {
ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?;
let endpoint = resolve_endpoint(codex_bin, url)?;
Expand Down Expand Up @@ -470,6 +524,101 @@ fn send_message_v2_endpoint(
)
}

fn trigger_zsh_fork_multi_cmd_approval(
endpoint: &Endpoint,
config_overrides: &[String],
user_message: Option<String>,
min_approvals: usize,
abort_on: Option<usize>,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
if let Some(abort_on) = abort_on
&& abort_on == 0
{
bail!("--abort-on must be >= 1 when provided");
}

let default_prompt = "Run this exact command using shell command execution without rewriting or splitting it: /usr/bin/true && /usr/bin/true";
let message = user_message.unwrap_or_else(|| default_prompt.to_string());

let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");

let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");

client.command_approval_behavior = match abort_on {
Some(index) => CommandApprovalBehavior::AbortOn(index),
None => CommandApprovalBehavior::AlwaysAccept,
};
client.command_approval_count = 0;
client.command_approval_item_ids.clear();
client.command_execution_statuses.clear();
client.last_turn_status = None;

let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: message,
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = Some(AskForApproval::OnRequest);
turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
});

let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;

if client.command_approval_count < min_approvals {
bail!(
"expected at least {min_approvals} command approvals, got {}",
client.command_approval_count
);
}
let mut approvals_per_item = std::collections::BTreeMap::new();
for item_id in &client.command_approval_item_ids {
*approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1;
}
let max_approvals_for_one_item = approvals_per_item.values().copied().max().unwrap_or(0);
if max_approvals_for_one_item < min_approvals {
bail!(
"expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}"
);
}

let last_command_status = client.command_execution_statuses.last();
if abort_on.is_none() {
if last_command_status != Some(&CommandExecutionStatus::Completed) {
bail!("expected completed command execution, got {last_command_status:?}");
}
if client.last_turn_status != Some(TurnStatus::Completed) {
bail!(
"expected completed turn in all-accept flow, got {:?}",
client.last_turn_status
);
}
} else if last_command_status == Some(&CommandExecutionStatus::Completed) {
bail!(
"expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}"
);
}

println!(
"[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}",
client.command_approval_count, client.command_execution_statuses, client.last_turn_status
);

Ok(())
}

fn resume_message_v2(
endpoint: &Endpoint,
config_overrides: &[String],
Expand Down Expand Up @@ -791,6 +940,17 @@ enum ClientTransport {
struct CodexClient {
transport: ClientTransport,
pending_notifications: VecDeque<JSONRPCNotification>,
command_approval_behavior: CommandApprovalBehavior,
command_approval_count: usize,
command_approval_item_ids: Vec<String>,
command_execution_statuses: Vec<CommandExecutionStatus>,
last_turn_status: Option<TurnStatus>,
}

#[derive(Debug, Clone, Copy)]
enum CommandApprovalBehavior {
AlwaysAccept,
AbortOn(usize),
}

impl CodexClient {
Expand All @@ -804,6 +964,14 @@ impl CodexClient {
fn spawn_stdio(codex_bin: &Path, config_overrides: &[String]) -> Result<Self> {
let codex_bin_display = codex_bin.display();
let mut cmd = Command::new(codex_bin);
if let Some(codex_bin_parent) = codex_bin.parent() {
let mut path = OsString::from(codex_bin_parent.as_os_str());
if let Some(existing_path) = std::env::var_os("PATH") {
path.push(":");
path.push(existing_path);
}
cmd.env("PATH", path);
}
for override_kv in config_overrides {
cmd.arg("--config").arg(override_kv);
}
Expand Down Expand Up @@ -831,6 +999,11 @@ impl CodexClient {
stdout: BufReader::new(stdout),
},
pending_notifications: VecDeque::new(),
command_approval_behavior: CommandApprovalBehavior::AlwaysAccept,
command_approval_count: 0,
command_approval_item_ids: Vec::new(),
command_execution_statuses: Vec::new(),
last_turn_status: None,
})
}

Expand All @@ -847,6 +1020,11 @@ impl CodexClient {
socket: Box::new(socket),
},
pending_notifications: VecDeque::new(),
command_approval_behavior: CommandApprovalBehavior::AlwaysAccept,
command_approval_count: 0,
command_approval_item_ids: Vec::new(),
command_execution_statuses: Vec::new(),
last_turn_status: None,
})
}

Expand All @@ -862,7 +1040,12 @@ impl CodexClient {
},
capabilities: Some(InitializeCapabilities {
experimental_api: true,
opt_out_notification_methods: None,
opt_out_notification_methods: Some(
NOTIFICATIONS_TO_OPT_OUT
.iter()
.map(|method| (*method).to_string())
.collect(),
),
}),
},
};
Expand Down Expand Up @@ -1121,10 +1304,14 @@ impl CodexClient {
println!("\n< item started: {:?}", payload.item);
}
ServerNotification::ItemCompleted(payload) => {
if let ThreadItem::CommandExecution { status, .. } = payload.item.clone() {
self.command_execution_statuses.push(status);
}
println!("< item completed: {:?}", payload.item);
}
ServerNotification::TurnCompleted(payload) => {
if payload.turn.id == turn_id {
self.last_turn_status = Some(payload.turn.status.clone());
println!("\n< turn/completed notification: {:?}", payload.turn.status);
if payload.turn.status == TurnStatus::Failed
&& let Some(error) = payload.turn.error
Expand Down Expand Up @@ -1314,6 +1501,8 @@ impl CodexClient {
"\n< commandExecution approval requested for thread {thread_id}, turn {turn_id}, item {item_id}, approval {}",
approval_id.as_deref().unwrap_or("<none>")
);
self.command_approval_count += 1;
self.command_approval_item_ids.push(item_id.clone());
if let Some(reason) = reason.as_deref() {
println!("< reason: {reason}");
}
Expand All @@ -1332,11 +1521,21 @@ impl CodexClient {
println!("< proposed execpolicy amendment: {execpolicy_amendment:?}");
}

let decision = match self.command_approval_behavior {
CommandApprovalBehavior::AlwaysAccept => CommandExecutionApprovalDecision::Accept,
CommandApprovalBehavior::AbortOn(index) if self.command_approval_count == index => {
CommandExecutionApprovalDecision::Cancel
}
CommandApprovalBehavior::AbortOn(_) => CommandExecutionApprovalDecision::Accept,
};
let response = CommandExecutionRequestApprovalResponse {
decision: CommandExecutionApprovalDecision::Accept,
decision: decision.clone(),
};
self.send_server_request_response(request_id, &response)?;
println!("< approved commandExecution request for item {item_id}");
println!(
"< commandExecution decision for approval #{} on item {item_id}: {:?}",
self.command_approval_count, decision
);
Ok(())
}

Expand Down
3 changes: 3 additions & 0 deletions codex-rs/app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ struct AppServerArgs {
}

fn main() -> anyhow::Result<()> {
if codex_core::maybe_run_zsh_exec_wrapper_mode()? {
return Ok(());
}
arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move {
let args = AppServerArgs::parse();
let managed_config_path = managed_config_path_from_debug_env();
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/tests/suite/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ mod thread_start;
mod thread_unarchive;
mod turn_interrupt;
mod turn_start;
mod turn_start_zsh_fork;
mod turn_steer;
Loading
Loading