diff --git a/crates/openfang-runtime/src/drivers/gemini.rs b/crates/openfang-runtime/src/drivers/gemini.rs index cd37cc1b3..95a5ca2d0 100644 --- a/crates/openfang-runtime/src/drivers/gemini.rs +++ b/crates/openfang-runtime/src/drivers/gemini.rs @@ -336,9 +336,131 @@ fn convert_messages( } } + // Post-process: enforce Gemini turn-ordering invariant. + // A "model" turn containing functionCall parts MUST be immediately followed + // by a "user" turn containing functionResponse parts. Intervening turns + // (e.g. "[no response]" / "Please continue" injected by the agent loop or + // session-repair) violate this and cause INVALID_ARGUMENT 400 errors. + let contents = enforce_function_call_ordering(contents); + (contents, system_instruction) } +/// Enforce Gemini's strict function-call turn ordering. +/// +/// Rules: +/// 1. A `model` turn with `functionCall` parts must be immediately followed by +/// a `user` turn with `functionResponse` parts. +/// 2. Any intervening text-only turns between a function-call model turn and +/// its function-response user turn are removed. +/// 3. A `model` turn with `functionCall` that has no matching `functionResponse` +/// anywhere after it gets the functionCall parts stripped (keeping any text). +fn enforce_function_call_ordering(contents: Vec) -> Vec { + if contents.is_empty() { + return contents; + } + + // First pass: identify which model turns contain functionCall parts. + let has_function_call = |c: &GeminiContent| -> bool { + c.role.as_deref() == Some("model") + && c.parts + .iter() + .any(|p| matches!(p, GeminiPart::FunctionCall { .. })) + }; + + let has_function_response = |c: &GeminiContent| -> bool { + c.role.as_deref() == Some("user") + && c.parts + .iter() + .any(|p| matches!(p, GeminiPart::FunctionResponse { .. })) + }; + + let mut result: Vec = Vec::with_capacity(contents.len()); + + let mut i = 0; + while i < contents.len() { + if has_function_call(&contents[i]) { + // Found a model turn with functionCall. + // Look ahead for the matching user turn with functionResponse, + // skipping any intervening text-only turns. + result.push(contents[i].clone()); + i += 1; + + // Collect and skip intervening turns until we find functionResponse + let mut skipped = Vec::new(); + while i < contents.len() && !has_function_response(&contents[i]) { + skipped.push(i); + i += 1; + } + + if i < contents.len() && has_function_response(&contents[i]) { + // Found the matching functionResponse — drop skipped turns + if !skipped.is_empty() { + warn!( + skipped_turns = skipped.len(), + "Gemini: removed intervening turns between functionCall and functionResponse" + ); + } + result.push(contents[i].clone()); + i += 1; + } else { + // No functionResponse found — strip functionCall parts from the + // model turn we already pushed, keeping any text parts. + if let Some(last) = result.last_mut() { + last.parts + .retain(|p| !matches!(p, GeminiPart::FunctionCall { .. })); + if last.parts.is_empty() { + last.parts.push(GeminiPart::Text { + text: "[tool call removed — no response received]".to_string(), + thought_signature: None, + }); + } + warn!("Gemini: stripped orphaned functionCall with no matching functionResponse"); + } + // Re-add the skipped turns since there's no functionResponse to pair with + for &idx in &skipped { + result.push(contents[idx].clone()); + } + } + } else { + result.push(contents[i].clone()); + i += 1; + } + } + + // Final pass: merge consecutive same-role turns (Gemini also rejects these) + let mut merged: Vec = Vec::with_capacity(result.len()); + for content in result { + if let Some(last) = merged.last_mut() { + if last.role == content.role { + last.parts.extend(content.parts); + continue; + } + } + merged.push(content); + } + + // Gemini requires the conversation to start with a "user" turn. + // If the first turn is "model", prepend a synthetic user turn. + if let Some(first) = merged.first() { + if first.role.as_deref() == Some("model") { + warn!("Gemini: conversation starts with model turn — prepending synthetic user turn"); + merged.insert( + 0, + GeminiContent { + role: Some("user".to_string()), + parts: vec![GeminiPart::Text { + text: "Continue.".to_string(), + thought_signature: None, + }], + }, + ); + } + } + + merged +} + /// Extract system prompt from messages or the explicit system field. fn extract_system(messages: &[Message], system: &Option) -> Option { let text = system.clone().or_else(|| { @@ -496,12 +618,53 @@ fn convert_response(resp: GeminiResponse) -> Result = c + .parts + .iter() + .map(|p| match p { + GeminiPart::Text { .. } => "text", + GeminiPart::FunctionCall { .. } => "functionCall", + GeminiPart::FunctionResponse { .. } => "functionResponse", + GeminiPart::InlineData { .. } => "inlineData", + GeminiPart::Thought { .. } => "thought", + }) + .collect(); + if !summary.is_empty() { + summary.push_str(" → "); + } + summary.push_str(&format!("[{}] {}:{}", i, role, part_types.join("+"))); + } + debug!(turns = contents.len(), structure = %summary, "Gemini request turn structure"); +} + +/// Truncate a string for logging, appending "…" if truncated. +fn truncate_for_log(s: &str, max_len: usize) -> String { + if s.len() <= max_len { + s.to_string() + } else { + let mut boundary = max_len; + while boundary > 0 && !s.is_char_boundary(boundary) { + boundary -= 1; + } + format!("{}…[truncated, {} total bytes]", &s[..boundary], s.len()) + } +} + #[async_trait] impl LlmDriver for GeminiDriver { async fn complete(&self, request: CompletionRequest) -> Result { let (contents, system_instruction) = convert_messages(&request.messages, &request.system); let tools = convert_tools(&request); + // Log the turn structure being sent to Gemini for debugging + log_gemini_turn_structure(&contents); + let gemini_request = GeminiRequest { contents, system_instruction, @@ -555,6 +718,13 @@ impl LlmDriver for GeminiDriver { if !resp.status().is_success() { let body = resp.text().await.unwrap_or_default(); let message = parse_gemini_error(&body); + warn!( + status, + error_message = %message, + raw_body_len = body.len(), + raw_body_preview = %truncate_for_log(&body, 1000), + "Gemini API error response" + ); if status == 401 || status == 403 { return Err(LlmError::AuthenticationFailed(message)); } @@ -568,6 +738,11 @@ impl LlmDriver for GeminiDriver { .text() .await .map_err(|e| LlmError::Http(e.to_string()))?; + debug!( + body_len = body.len(), + body_preview = %truncate_for_log(&body, 500), + "Gemini API success response" + ); let gemini_response: GeminiResponse = serde_json::from_str(&body).map_err(|e| LlmError::Parse(e.to_string()))?; @@ -588,6 +763,9 @@ impl LlmDriver for GeminiDriver { let (contents, system_instruction) = convert_messages(&request.messages, &request.system); let tools = convert_tools(&request); + // Log the turn structure being sent to Gemini for debugging + log_gemini_turn_structure(&contents); + let gemini_request = GeminiRequest { contents, system_instruction, @@ -662,13 +840,23 @@ impl LlmDriver for GeminiDriver { let mut fn_calls: Vec<(String, serde_json::Value, Option)> = Vec::new(); let mut finish_reason: Option = None; let mut usage = TokenUsage::default(); + let mut events_parsed: usize = 0; let mut byte_stream = resp.bytes_stream(); while let Some(chunk_result) = byte_stream.next().await { let chunk = chunk_result.map_err(|e| LlmError::Http(e.to_string()))?; - buffer.push_str(&String::from_utf8_lossy(&chunk)); + let chunk_str = String::from_utf8_lossy(&chunk); + debug!( + chunk_len = chunk.len(), + chunk_preview = %truncate_for_log(&chunk_str, 200), + "Gemini SSE chunk received" + ); + // Normalize \r\n to \n so the SSE delimiter \n\n works for + // both Unix (\n\n) and HTTP-standard (\r\n\r\n) line endings. + // Normalize per chunk to avoid O(n*m) re-scanning of the full buffer. + buffer.push_str(&chunk_str.replace("\r\n", "\n")); - // Process complete SSE events (delimited by \n\n or \r\n\r\n) + // Process complete SSE events (delimited by \n\n) while let Some(pos) = buffer.find("\n\n") { let event_text = buffer[..pos].to_string(); buffer = buffer[pos + 2..].to_string(); @@ -685,8 +873,16 @@ impl LlmDriver for GeminiDriver { let json: GeminiResponse = match serde_json::from_str(data) { Ok(v) => v, - Err(_) => continue, + Err(e) => { + warn!( + error = %e, + data_preview = %truncate_for_log(data, 200), + "Gemini SSE: failed to parse event JSON" + ); + continue; + } }; + events_parsed += 1; // Extract usage from each chunk (last one wins) if let Some(ref u) = json.usage_metadata { @@ -768,6 +964,37 @@ impl LlmDriver for GeminiDriver { } } + // Log stream completion summary + debug!( + events_parsed, + text_len = text_content.len(), + fn_call_count = fn_calls.len(), + finish_reason = ?finish_reason, + input_tokens = usage.input_tokens, + output_tokens = usage.output_tokens, + remaining_buffer_len = buffer.len(), + remaining_buffer_preview = %truncate_for_log(&buffer, 200), + "Gemini SSE stream completed" + ); + + // If no events were parsed but there's data in the buffer, + // try to parse it as a single JSON response (non-SSE fallback). + if events_parsed == 0 && !buffer.trim().is_empty() { + warn!( + buffer_len = buffer.len(), + buffer_preview = %truncate_for_log(&buffer, 300), + "Gemini SSE: no events parsed, attempting fallback parse of buffer" + ); + // Try stripping "data:" prefix if present + let fallback_data = buffer + .lines() + .find_map(|line| line.strip_prefix("data:").map(|d| d.trim_start().to_string())) + .unwrap_or_else(|| buffer.trim().to_string()); + if let Ok(json) = serde_json::from_str::(&fallback_data) { + return convert_response(json); + } + } + // Build final response let mut content = Vec::new(); let mut tool_calls = Vec::new(); @@ -1679,6 +1906,139 @@ mod tests { } } + // ── enforce_function_call_ordering tests ───────────────────────────── + + fn make_text_content(role: &str, text: &str) -> GeminiContent { + GeminiContent { + role: Some(role.to_string()), + parts: vec![GeminiPart::Text { + text: text.to_string(), + thought_signature: None, + }], + } + } + + fn make_function_call_content(name: &str) -> GeminiContent { + GeminiContent { + role: Some("model".to_string()), + parts: vec![GeminiPart::FunctionCall { + function_call: GeminiFunctionCallData { + name: name.to_string(), + args: serde_json::json!({}), + }, + thought_signature: None, + }], + } + } + + fn make_function_response_content(name: &str) -> GeminiContent { + GeminiContent { + role: Some("user".to_string()), + parts: vec![GeminiPart::FunctionResponse { + function_response: GeminiFunctionResponseData { + name: name.to_string(), + response: serde_json::json!({"result": "ok"}), + }, + }], + } + } + + #[test] + fn test_enforce_ordering_removes_intervening_text_between_fc_and_fr() { + let contents = vec![ + make_text_content("user", "Hello"), + make_function_call_content("get_weather"), + make_text_content("model", "[no response]"), + make_function_response_content("get_weather"), + make_text_content("model", "The weather is sunny."), + ]; + let result = enforce_function_call_ordering(contents); + // The intervening "[no response]" model turn should be removed. + // Expected: user("Hello"), model(functionCall), user(functionResponse), model("The weather is sunny.") + assert_eq!(result.len(), 4); + assert!(result[1] + .parts + .iter() + .any(|p| matches!(p, GeminiPart::FunctionCall { .. }))); + assert!(result[2] + .parts + .iter() + .any(|p| matches!(p, GeminiPart::FunctionResponse { .. }))); + } + + #[test] + fn test_enforce_ordering_strips_orphaned_function_call() { + let contents = vec![ + make_text_content("user", "Hello"), + make_function_call_content("get_weather"), + make_text_content("model", "I could not call that tool."), + ]; + let result = enforce_function_call_ordering(contents); + // The orphaned functionCall should be stripped, replaced with placeholder text. + // No FunctionCall parts should remain. + for content in &result { + for part in &content.parts { + assert!( + !matches!(part, GeminiPart::FunctionCall { .. }), + "Orphaned FunctionCall should have been stripped" + ); + } + } + } + + #[test] + fn test_enforce_ordering_prepends_user_turn_when_starts_with_model() { + let contents = vec![ + make_function_call_content("search"), + make_function_response_content("search"), + ]; + let result = enforce_function_call_ordering(contents); + // First turn must be "user" + assert_eq!(result[0].role.as_deref(), Some("user")); + // And it should be a synthetic text turn, not the functionResponse + assert!(result[0] + .parts + .iter() + .any(|p| matches!(p, GeminiPart::Text { .. }))); + } + + #[test] + fn test_enforce_ordering_merges_consecutive_same_role_turns() { + let contents = vec![ + make_text_content("user", "Hello"), + make_text_content("user", "How are you?"), + make_text_content("model", "I'm fine."), + ]; + let result = enforce_function_call_ordering(contents); + // Two consecutive user turns should be merged into one + assert_eq!(result.len(), 2); + assert_eq!(result[0].role.as_deref(), Some("user")); + assert_eq!(result[0].parts.len(), 2); + } + + #[test] + fn test_enforce_ordering_passthrough_when_already_valid() { + let contents = vec![ + make_text_content("user", "Hello"), + make_function_call_content("search"), + make_function_response_content("search"), + make_text_content("model", "Done."), + ]; + let result = enforce_function_call_ordering(contents); + assert_eq!(result.len(), 4); + } + + #[test] + fn test_truncate_for_log_handles_multibyte_utf8() { + // Japanese text: each char is 3 bytes in UTF-8 + let s = "こんにちは世界"; // 7 chars, 21 bytes + // Truncating at 5 bytes should not panic — lands in middle of 2nd char + let result = truncate_for_log(&s, 5); + assert!(result.contains("…[truncated")); + // Should contain only the first complete character (3 bytes) + assert!(result.starts_with("こ")); + } + #[test] fn test_thought_part_in_response_produces_thinking_block() { let resp = GeminiResponse {