Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 90 additions & 15 deletions crates/openfang-runtime/src/drivers/claude_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,13 @@ impl ClaudeCodeDriver {
///
/// The CLI may return the response text in different fields depending on
/// version: `result`, `content`, or `text`. We try all three.
/// All fields use `#[serde(default)]` so deserialization never fails on
/// missing keys — older and newer CLI versions differ in which fields are emitted.
#[derive(Debug, Deserialize)]
struct ClaudeJsonOutput {
// Fix: `result` now has #[serde(default)] so deserialization succeeds
// even when the CLI emits the response in `content` or `text` instead.
#[serde(default)]
result: Option<String>,
#[serde(default)]
content: Option<String>,
Expand All @@ -215,15 +220,42 @@ struct ClaudeUsage {
output_tokens: u64,
}

/// Stream JSON event from `claude -p --output-format stream-json`.
/// A single content block inside an `assistant` stream-json event.
/// The CLI emits `{"type":"text","text":"..."}` blocks inside `message.content`.
#[derive(Debug, Deserialize, Default)]
struct ClaudeMessageBlock {
#[serde(default, rename = "type")]
block_type: String,
#[serde(default)]
text: String,
}

/// Nested `message` object carried by `type=assistant` stream-json events.
#[derive(Debug, Deserialize, Default)]
struct ClaudeAssistantMessage {
#[serde(default)]
content: Vec<ClaudeMessageBlock>,
}

/// Stream JSON event from `claude -p --output-format stream-json --verbose`.
///
/// Newer CLI versions (≥2.x) carry the response text inside the nested
/// `message.content[].text` of `type=assistant` events rather than a
/// flat `content` string. Both layouts are handled here so that real-time
/// token streaming works across CLI versions.
#[derive(Debug, Deserialize)]
struct ClaudeStreamEvent {
#[serde(default)]
r#type: String,
/// Flat content string — used by older CLI versions and some event types.
#[serde(default)]
content: Option<String>,
/// Final result text carried by `type=result` events.
#[serde(default)]
result: Option<String>,
/// Nested assistant message — used by newer CLI `type=assistant` events.
#[serde(default)]
message: Option<ClaudeAssistantMessage>,
#[serde(default)]
usage: Option<ClaudeUsage>,
}
Expand All @@ -250,6 +282,13 @@ impl LlmDriver for ClaudeCodeDriver {

Self::apply_env_filter(&mut cmd);

// Inject HOME so the CLI can find its credentials (~/.claude/) when
// OpenFang runs as a service without a login shell.
if let Some(home) = home_dir() {
cmd.env("HOME", &home);
}
// Detach stdin so the CLI does not block waiting for interactive input.
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());

Expand All @@ -271,14 +310,36 @@ impl LlmDriver for ClaudeCodeDriver {
debug!(pid = pid, model = %pid_label, "Claude Code CLI subprocess started");
}

// Read stdout/stderr before waiting (take ownership of pipes)
// Drain stdout and stderr concurrently while waiting for the process.
// Sequential drain (wait → read) deadlocks when the subprocess writes
// more than the OS pipe buffer (~64 KB): the child blocks on write,
// child.wait() never returns, the timeout fires, and output is lost.
let child_stdout = child.stdout.take();
let child_stderr = child.stderr.take();

let stdout_task = tokio::spawn(async move {
let mut buf = Vec::new();
if let Some(mut out) = child_stdout {
let _ = out.read_to_end(&mut buf).await;
}
buf
});
let stderr_task = tokio::spawn(async move {
let mut buf = Vec::new();
if let Some(mut err) = child_stderr {
let _ = err.read_to_end(&mut buf).await;
}
buf
});

// Wait with timeout
let timeout_duration = std::time::Duration::from_secs(self.message_timeout_secs);
let wait_result = tokio::time::timeout(timeout_duration, child.wait()).await;

// Collect pipe output — tasks complete once the process closes its end
let stdout_bytes = stdout_task.await.unwrap_or_default();
let stderr_bytes = stderr_task.await.unwrap_or_default();

// Clear PID tracking regardless of outcome
self.active_pids.remove(&pid_label);

Expand All @@ -305,16 +366,6 @@ impl LlmDriver for ClaudeCodeDriver {
}
};

// Read captured output from pipes
let mut stdout_bytes = Vec::new();
let mut stderr_bytes = Vec::new();
if let Some(mut out) = child_stdout {
let _ = out.read_to_end(&mut stdout_bytes).await;
}
if let Some(mut err) = child_stderr {
let _ = err.read_to_end(&mut stderr_bytes).await;
};

if !status.success() {
let stderr = String::from_utf8_lossy(&stderr_bytes).trim().to_string();
let stdout_str = String::from_utf8_lossy(&stdout_bytes).trim().to_string();
Expand Down Expand Up @@ -423,6 +474,11 @@ impl LlmDriver for ClaudeCodeDriver {

Self::apply_env_filter(&mut cmd);

// Same HOME and stdin hygiene as the non-streaming path.
if let Some(home) = home_dir() {
cmd.env("HOME", &home);
}
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());

Expand Down Expand Up @@ -468,11 +524,30 @@ impl LlmDriver for ClaudeCodeDriver {
Ok(event) => {
match event.r#type.as_str() {
"content" | "text" | "assistant" | "content_block_delta" => {
if let Some(ref content) = event.content {
full_text.push_str(content);
// Older CLI: flat `content` string.
// CLI ≥2.x (type=assistant): text is nested in
// `message.content[].text`; the flat `content`
// field is absent or null.
let chunk = event.content.clone().unwrap_or_default();
let nested: String = event
.message
.as_ref()
.map(|msg| {
msg.content
.iter()
.filter(|b| b.block_type == "text")
.map(|b| b.text.as_str())
.collect::<Vec<_>>()
.join("")
})
.unwrap_or_default();
let text_chunk =
if !chunk.is_empty() { chunk } else { nested };
if !text_chunk.is_empty() {
full_text.push_str(&text_chunk);
let _ = tx
.send(StreamEvent::TextDelta {
text: content.clone(),
text: text_chunk,
})
.await;
}
Expand Down