diff --git a/crates/openfang-runtime/src/drivers/claude_code.rs b/crates/openfang-runtime/src/drivers/claude_code.rs index 8b93fca49..d3820a046 100644 --- a/crates/openfang-runtime/src/drivers/claude_code.rs +++ b/crates/openfang-runtime/src/drivers/claude_code.rs @@ -192,8 +192,13 @@ impl ClaudeCodeDriver { /// /// The CLI may return the response text in different fields depending on /// version: `result`, `content`, or `text`. We try all three. +/// All fields use `#[serde(default)]` so deserialization never fails on +/// missing keys — older and newer CLI versions differ in which fields are emitted. #[derive(Debug, Deserialize)] struct ClaudeJsonOutput { + // Fix: `result` now has #[serde(default)] so deserialization succeeds + // even when the CLI emits the response in `content` or `text` instead. + #[serde(default)] result: Option, #[serde(default)] content: Option, @@ -215,15 +220,42 @@ struct ClaudeUsage { output_tokens: u64, } -/// Stream JSON event from `claude -p --output-format stream-json`. +/// A single content block inside an `assistant` stream-json event. +/// The CLI emits `{"type":"text","text":"..."}` blocks inside `message.content`. +#[derive(Debug, Deserialize, Default)] +struct ClaudeMessageBlock { + #[serde(default, rename = "type")] + block_type: String, + #[serde(default)] + text: String, +} + +/// Nested `message` object carried by `type=assistant` stream-json events. +#[derive(Debug, Deserialize, Default)] +struct ClaudeAssistantMessage { + #[serde(default)] + content: Vec, +} + +/// Stream JSON event from `claude -p --output-format stream-json --verbose`. +/// +/// Newer CLI versions (≥2.x) carry the response text inside the nested +/// `message.content[].text` of `type=assistant` events rather than a +/// flat `content` string. Both layouts are handled here so that real-time +/// token streaming works across CLI versions. #[derive(Debug, Deserialize)] struct ClaudeStreamEvent { #[serde(default)] r#type: String, + /// Flat content string — used by older CLI versions and some event types. #[serde(default)] content: Option, + /// Final result text carried by `type=result` events. #[serde(default)] result: Option, + /// Nested assistant message — used by newer CLI `type=assistant` events. + #[serde(default)] + message: Option, #[serde(default)] usage: Option, } @@ -250,6 +282,13 @@ impl LlmDriver for ClaudeCodeDriver { Self::apply_env_filter(&mut cmd); + // Inject HOME so the CLI can find its credentials (~/.claude/) when + // OpenFang runs as a service without a login shell. + if let Some(home) = home_dir() { + cmd.env("HOME", &home); + } + // Detach stdin so the CLI does not block waiting for interactive input. + cmd.stdin(std::process::Stdio::null()); cmd.stdout(std::process::Stdio::piped()); cmd.stderr(std::process::Stdio::piped()); @@ -271,14 +310,36 @@ impl LlmDriver for ClaudeCodeDriver { debug!(pid = pid, model = %pid_label, "Claude Code CLI subprocess started"); } - // Read stdout/stderr before waiting (take ownership of pipes) + // Drain stdout and stderr concurrently while waiting for the process. + // Sequential drain (wait → read) deadlocks when the subprocess writes + // more than the OS pipe buffer (~64 KB): the child blocks on write, + // child.wait() never returns, the timeout fires, and output is lost. let child_stdout = child.stdout.take(); let child_stderr = child.stderr.take(); + let stdout_task = tokio::spawn(async move { + let mut buf = Vec::new(); + if let Some(mut out) = child_stdout { + let _ = out.read_to_end(&mut buf).await; + } + buf + }); + let stderr_task = tokio::spawn(async move { + let mut buf = Vec::new(); + if let Some(mut err) = child_stderr { + let _ = err.read_to_end(&mut buf).await; + } + buf + }); + // Wait with timeout let timeout_duration = std::time::Duration::from_secs(self.message_timeout_secs); let wait_result = tokio::time::timeout(timeout_duration, child.wait()).await; + // Collect pipe output — tasks complete once the process closes its end + let stdout_bytes = stdout_task.await.unwrap_or_default(); + let stderr_bytes = stderr_task.await.unwrap_or_default(); + // Clear PID tracking regardless of outcome self.active_pids.remove(&pid_label); @@ -305,16 +366,6 @@ impl LlmDriver for ClaudeCodeDriver { } }; - // Read captured output from pipes - let mut stdout_bytes = Vec::new(); - let mut stderr_bytes = Vec::new(); - if let Some(mut out) = child_stdout { - let _ = out.read_to_end(&mut stdout_bytes).await; - } - if let Some(mut err) = child_stderr { - let _ = err.read_to_end(&mut stderr_bytes).await; - }; - if !status.success() { let stderr = String::from_utf8_lossy(&stderr_bytes).trim().to_string(); let stdout_str = String::from_utf8_lossy(&stdout_bytes).trim().to_string(); @@ -423,6 +474,11 @@ impl LlmDriver for ClaudeCodeDriver { Self::apply_env_filter(&mut cmd); + // Same HOME and stdin hygiene as the non-streaming path. + if let Some(home) = home_dir() { + cmd.env("HOME", &home); + } + cmd.stdin(std::process::Stdio::null()); cmd.stdout(std::process::Stdio::piped()); cmd.stderr(std::process::Stdio::piped()); @@ -468,11 +524,30 @@ impl LlmDriver for ClaudeCodeDriver { Ok(event) => { match event.r#type.as_str() { "content" | "text" | "assistant" | "content_block_delta" => { - if let Some(ref content) = event.content { - full_text.push_str(content); + // Older CLI: flat `content` string. + // CLI ≥2.x (type=assistant): text is nested in + // `message.content[].text`; the flat `content` + // field is absent or null. + let chunk = event.content.clone().unwrap_or_default(); + let nested: String = event + .message + .as_ref() + .map(|msg| { + msg.content + .iter() + .filter(|b| b.block_type == "text") + .map(|b| b.text.as_str()) + .collect::>() + .join("") + }) + .unwrap_or_default(); + let text_chunk = + if !chunk.is_empty() { chunk } else { nested }; + if !text_chunk.is_empty() { + full_text.push_str(&text_chunk); let _ = tx .send(StreamEvent::TextDelta { - text: content.clone(), + text: text_chunk, }) .await; }