Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions codex-rs/codex-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Typed clients for Codex/OpenAI APIs built on top of the generic transport in `codex-client`.

- Hosts the request/response models and prompt helpers for Responses, Chat Completions, and Compact APIs.
- Hosts the request/response models and prompt helpers for Responses and Compact APIs.
- Owns provider configuration (base URLs, headers, query params), auth header injection, retry tuning, and stream idle settings.
- Parses SSE streams into `ResponseEvent`/`ResponseStream`, including rate-limit snapshots and API-specific error mapping.
- Serves as the wire-level layer consumed by `codex-core`; higher layers handle auth refresh and business logic.
Expand All @@ -11,7 +11,7 @@ Typed clients for Codex/OpenAI APIs built on top of the generic transport in `co

The public interface of this crate is intentionally small and uniform:

- **Prompted endpoints (Chat + Responses)**
- **Prompted endpoints (Responses)**
- Input: a single `Prompt` plus endpoint-specific options.
- `Prompt` (re-exported as `codex_api::Prompt`) carries:
- `instructions: String` – the fully-resolved system prompt for this turn.
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/codex-api/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;

/// Canonical prompt input for Chat and Responses endpoints.
/// Canonical prompt input for Responses endpoints.
#[derive(Debug, Clone)]
pub struct Prompt {
/// Fully-resolved system instructions for this turn.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,111 +1,21 @@
use crate::ChatRequest;
use crate::auth::AuthProvider;
use crate::common::Prompt as ApiPrompt;
use crate::common::ResponseEvent;
use crate::common::ResponseStream;
use crate::endpoint::streaming::StreamingClient;
use crate::error::ApiError;
use crate::provider::Provider;
use crate::provider::WireApi;
use crate::sse::chat::spawn_chat_stream;
use crate::telemetry::SseTelemetry;
use codex_client::HttpTransport;
use codex_client::RequestCompression;
use codex_client::RequestTelemetry;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::SessionSource;
use futures::Stream;
use http::HeaderMap;
use serde_json::Value;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

pub struct ChatClient<T: HttpTransport, A: AuthProvider> {
streaming: StreamingClient<T, A>,
}

impl<T: HttpTransport, A: AuthProvider> ChatClient<T, A> {
pub fn new(transport: T, provider: Provider, auth: A) -> Self {
Self {
streaming: StreamingClient::new(transport, provider, auth),
}
}

pub fn with_telemetry(
self,
request: Option<Arc<dyn RequestTelemetry>>,
sse: Option<Arc<dyn SseTelemetry>>,
) -> Self {
Self {
streaming: self.streaming.with_telemetry(request, sse),
}
}

pub async fn stream_request(&self, request: ChatRequest) -> Result<ResponseStream, ApiError> {
self.stream(request.body, request.headers).await
}

pub async fn stream_prompt(
&self,
model: &str,
prompt: &ApiPrompt,
conversation_id: Option<String>,
session_source: Option<SessionSource>,
) -> Result<ResponseStream, ApiError> {
use crate::requests::ChatRequestBuilder;

let request =
ChatRequestBuilder::new(model, &prompt.instructions, &prompt.input, &prompt.tools)
.conversation_id(conversation_id)
.session_source(session_source)
.build(self.streaming.provider())?;

self.stream_request(request).await
}

fn path(&self) -> &'static str {
match self.streaming.provider().wire {
WireApi::Chat => "chat/completions",
_ => "responses",
}
}

pub async fn stream(
&self,
body: Value,
extra_headers: HeaderMap,
) -> Result<ResponseStream, ApiError> {
self.streaming
.stream(
self.path(),
body,
extra_headers,
RequestCompression::None,
spawn_chat_stream,
None,
)
.await
}
}

#[derive(Copy, Clone, Eq, PartialEq)]
pub enum AggregateMode {
AggregatedOnly,
Streaming,
}

/// Stream adapter that merges token deltas into a single assistant message per turn.
pub struct AggregatedStream {
inner: ResponseStream,
cumulative: String,
cumulative_reasoning: String,
pending: VecDeque<ResponseEvent>,
mode: AggregateMode,
}

impl Stream for AggregatedStream {
Expand All @@ -122,37 +32,24 @@ impl Stream for AggregatedStream {
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => {
let is_assistant_message = matches!(
&item,
ResponseItem::Message { role, .. } if role == "assistant"
);

if is_assistant_message {
match this.mode {
AggregateMode::AggregatedOnly => {
if this.cumulative.is_empty()
&& let ResponseItem::Message { content, .. } = &item
&& let Some(text) = content.iter().find_map(|c| match c {
ContentItem::OutputText { text } => Some(text),
_ => None,
})
{
this.cumulative.push_str(text);
}
continue;
}
AggregateMode::Streaming => {
if this.cumulative.is_empty() {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
item,
))));
} else {
continue;
}
}
if this.cumulative.is_empty()
&& let ResponseItem::Message { content, .. } = &item
&& let Some(text) = content.iter().find_map(|c| match c {
ContentItem::OutputText { text } => Some(text),
_ => None,
})
{
this.cumulative.push_str(text);
}
continue;
}

return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
Expand Down Expand Up @@ -215,35 +112,20 @@ impl Stream for AggregatedStream {
token_usage,
})));
}
Poll::Ready(Some(Ok(ResponseEvent::Created))) => {
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::Created))) => continue,
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
this.cumulative.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
} else {
continue;
}
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
delta,
content_index,
content_index: _,
}))) => {
this.cumulative_reasoning.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
delta,
content_index,
})));
} else {
continue;
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => continue,
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => {
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => continue,
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => continue,
Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item))));
}
Expand All @@ -254,28 +136,21 @@ impl Stream for AggregatedStream {

pub trait AggregateStreamExt {
fn aggregate(self) -> AggregatedStream;

fn streaming_mode(self) -> ResponseStream;
}

impl AggregateStreamExt for ResponseStream {
fn aggregate(self) -> AggregatedStream {
AggregatedStream::new(self, AggregateMode::AggregatedOnly)
}

fn streaming_mode(self) -> ResponseStream {
self
AggregatedStream::new(self)
}
}

impl AggregatedStream {
fn new(inner: ResponseStream, mode: AggregateMode) -> Self {
fn new(inner: ResponseStream) -> Self {
AggregatedStream {
inner,
cumulative: String::new(),
cumulative_reasoning: String::new(),
pending: VecDeque::new(),
mode,
}
}
}
37 changes: 12 additions & 25 deletions codex-rs/codex-api/src/endpoint/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ impl<T: HttpTransport, A: AuthProvider> CompactClient<T, A> {
self
}

fn path(&self) -> Result<&'static str, ApiError> {
fn path(&self) -> &'static str {
match self.provider.wire {
WireApi::Compact | WireApi::Responses => Ok("responses/compact"),
WireApi::Chat => Err(ApiError::Stream(
"compact endpoint requires responses wire api".to_string(),
)),
WireApi::Compact | WireApi::Responses => "responses/compact",
}
}

Expand All @@ -50,7 +47,7 @@ impl<T: HttpTransport, A: AuthProvider> CompactClient<T, A> {
body: serde_json::Value,
extra_headers: HeaderMap,
) -> Result<Vec<ResponseItem>, ApiError> {
let path = self.path()?;
let path = self.path();
let builder = || {
let mut req = self.provider.build_request(Method::POST, path);
req.headers.extend(extra_headers.clone());
Expand Down Expand Up @@ -139,24 +136,14 @@ mod tests {
}
}

#[tokio::test]
async fn errors_when_wire_is_chat() {
let client = CompactClient::new(DummyTransport, provider(WireApi::Chat), DummyAuth);
let input = CompactionInput {
model: "gpt-test",
input: &[],
instructions: "inst",
};
let err = client
.compact_input(&input, HeaderMap::new())
.await
.expect_err("expected wire mismatch to fail");

match err {
ApiError::Stream(msg) => {
assert_eq!(msg, "compact endpoint requires responses wire api");
}
other => panic!("unexpected error: {other:?}"),
}
#[test]
fn path_is_responses_compact_for_supported_wire_apis() {
let responses_client =
CompactClient::new(DummyTransport, provider(WireApi::Responses), DummyAuth);
assert_eq!(responses_client.path(), "responses/compact");

let compact_client =
CompactClient::new(DummyTransport, provider(WireApi::Compact), DummyAuth);
assert_eq!(compact_client.path(), "responses/compact");
}
}
2 changes: 1 addition & 1 deletion codex-rs/codex-api/src/endpoint/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod chat;
pub mod aggregate;
pub mod compact;
pub mod models;
pub mod responses;
Expand Down
1 change: 0 additions & 1 deletion codex-rs/codex-api/src/endpoint/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
fn path(&self) -> &'static str {
match self.streaming.provider().wire {
WireApi::Responses | WireApi::Compact => "responses",
WireApi::Chat => "chat/completions",
}
}

Expand Down
5 changes: 1 addition & 4 deletions codex-rs/codex-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ pub use crate::common::ResponseEvent;
pub use crate::common::ResponseStream;
pub use crate::common::ResponsesApiRequest;
pub use crate::common::create_text_param_for_request;
pub use crate::endpoint::chat::AggregateStreamExt;
pub use crate::endpoint::chat::ChatClient;
pub use crate::endpoint::aggregate::AggregateStreamExt;
pub use crate::endpoint::compact::CompactClient;
pub use crate::endpoint::models::ModelsClient;
pub use crate::endpoint::responses::ResponsesClient;
Expand All @@ -33,8 +32,6 @@ pub use crate::endpoint::responses_websocket::ResponsesWebsocketConnection;
pub use crate::error::ApiError;
pub use crate::provider::Provider;
pub use crate::provider::WireApi;
pub use crate::requests::ChatRequest;
pub use crate::requests::ChatRequestBuilder;
pub use crate::requests::ResponsesRequest;
pub use crate::requests::ResponsesRequestBuilder;
pub use crate::sse::stream_from_fixture;
Expand Down
1 change: 0 additions & 1 deletion codex-rs/codex-api/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use url::Url;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WireApi {
Responses,
Chat,
Compact,
}

Expand Down
Loading
Loading