Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions codex-rs/app-server-protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ client_request_definitions! {
response: GetAccountRateLimitsResponse,
},

#[serde(rename = "feedback/upload")]
#[ts(rename = "feedback/upload")]
UploadFeedback {
params: UploadFeedbackParams,
response: UploadFeedbackResponse,
},

#[serde(rename = "account/read")]
#[ts(rename = "account/read")]
GetAccount {
Expand Down Expand Up @@ -378,6 +385,23 @@ pub struct ListModelsResponse {
pub next_cursor: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct UploadFeedbackParams {
pub classification: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub conversation_id: Option<ConversationId>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread_id?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still call it ConversationId tho

pub include_logs: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct UploadFeedbackResponse {
pub thread_id: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type")]
#[ts(tag = "type")]
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ codex-file-search = { workspace = true }
codex-login = { workspace = true }
codex-protocol = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-feedback = { workspace = true }
codex-utils-json-to-toml = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true, features = ["derive"] }
Expand Down
82 changes: 82 additions & 0 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SessionConfiguredNotification;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse;
use codex_app_server_protocol::UploadFeedbackParams;
use codex_app_server_protocol::UploadFeedbackResponse;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserSavedConfig;
use codex_backend_client::Client as BackendClient;
Expand Down Expand Up @@ -85,6 +87,7 @@ use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_feedback::CodexFeedback;
use codex_login::ServerOptions as LoginServerOptions;
use codex_login::ShutdownHandle;
use codex_login::run_login_server;
Expand Down Expand Up @@ -136,6 +139,7 @@ pub(crate) struct CodexMessageProcessor {
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
feedback: CodexFeedback,
}

impl CodexMessageProcessor {
Expand All @@ -145,6 +149,7 @@ impl CodexMessageProcessor {
outgoing: Arc<OutgoingMessageSender>,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
feedback: CodexFeedback,
) -> Self {
Self {
auth_manager,
Expand All @@ -156,6 +161,7 @@ impl CodexMessageProcessor {
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}

Expand Down Expand Up @@ -275,6 +281,9 @@ impl CodexMessageProcessor {
} => {
self.get_account_rate_limits(request_id).await;
}
ClientRequest::UploadFeedback { request_id, params } => {
self.upload_feedback(request_id, params).await;
}
}
}

Expand Down Expand Up @@ -1418,6 +1427,79 @@ impl CodexMessageProcessor {
let response = FuzzyFileSearchResponse { files: results };
self.outgoing.send_response(request_id, response).await;
}

async fn upload_feedback(&self, request_id: RequestId, params: UploadFeedbackParams) {
let UploadFeedbackParams {
classification,
reason,
conversation_id,
include_logs,
} = params;

let snapshot = self.feedback.snapshot(conversation_id);
let thread_id = snapshot.thread_id.clone();

let validated_rollout_path = if include_logs {
match conversation_id {
Some(conv_id) => self.resolve_rollout_path(conv_id).await,
None => None,
}
} else {
None
};

let cli_version = env!("CARGO_PKG_VERSION").to_string();
let upload_result = tokio::task::spawn_blocking(move || {
let rollout_path_ref = validated_rollout_path.as_deref();
snapshot.upload_feedback(
&classification,
reason.as_deref(),
cli_version.as_str(),
include_logs,
rollout_path_ref,
)
})
.await;

let upload_result = match upload_result {
Ok(result) => result,
Err(join_err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to upload feedback: {join_err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

match upload_result {
Ok(()) => {
let response = UploadFeedbackResponse { thread_id };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to upload feedback: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}

async fn resolve_rollout_path(&self, conversation_id: ConversationId) -> Option<PathBuf> {
match self
.conversation_manager
.get_conversation(conversation_id)
.await
{
Ok(conv) => Some(conv.rollout_path()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we have an Op to get rollout path? Let's stick to it even though I don't like it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Op::GetPath

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot receive the events here because we are doing it in a loop

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use this new method in other callsites?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All callers of codex.submit(Op::GetPath) can be find-replaced to codex.rollout_path()

Err(_) => None,
}
}
}

async fn apply_bespoke_event_handling(
Expand Down
13 changes: 13 additions & 0 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ use crate::message_processor::MessageProcessor;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::JSONRPCMessage;
use codex_feedback::CodexFeedback;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::{self};
use tokio::sync::mpsc;
use tracing::Level;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

Expand Down Expand Up @@ -82,6 +85,8 @@ pub async fn run_main(
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
})?;

let feedback = CodexFeedback::new();

let otel =
codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION")).map_err(|e| {
std::io::Error::new(
Expand All @@ -96,8 +101,15 @@ pub async fn run_main(
.with_writer(std::io::stderr)
.with_filter(EnvFilter::from_default_env());

let feedback_layer = tracing_subscriber::fmt::layer()
.with_writer(feedback.make_writer())
.with_ansi(false)
.with_target(false)
.with_filter(Targets::new().with_default(Level::TRACE));

let _ = tracing_subscriber::registry()
.with(stderr_fmt)
.with(feedback_layer)
.with(otel.as_ref().map(|provider| {
OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
Expand All @@ -112,6 +124,7 @@ pub async fn run_main(
outgoing_message_sender,
codex_linux_sandbox_exe,
std::sync::Arc::new(config),
feedback.clone(),
);
async move {
while let Some(msg) = incoming_rx.recv().await {
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use codex_core::ConversationManager;
use codex_core::config::Config;
use codex_core::default_client::USER_AGENT_SUFFIX;
use codex_core::default_client::get_codex_user_agent;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use std::sync::Arc;

Expand All @@ -33,6 +34,7 @@ impl MessageProcessor {
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
feedback: CodexFeedback,
) -> Self {
let outgoing = Arc::new(outgoing);
let auth_manager = AuthManager::shared(config.codex_home.clone(), false);
Expand All @@ -46,6 +48,7 @@ impl MessageProcessor {
outgoing.clone(),
codex_linux_sandbox_exe,
config,
feedback,
);

Self {
Expand Down
10 changes: 10 additions & 0 deletions codex-rs/app-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::UploadFeedbackParams;

use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
Expand Down Expand Up @@ -242,6 +243,15 @@ impl McpProcess {
self.send_request("account/rateLimits/read", None).await
}

/// Send a `feedback/upload` JSON-RPC request.
pub async fn send_upload_feedback_request(
&mut self,
params: UploadFeedbackParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("feedback/upload", params).await
}

/// Send a `userInfo` JSON-RPC request.
pub async fn send_user_info_request(&mut self) -> anyhow::Result<i64> {
self.send_request("userInfo", None).await
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/tests/suite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod auth;
mod codex_message_processor_flow;
mod config;
mod create_conversation;
mod feedback;
mod fuzzy_file_search;
mod interrupt;
mod list_resume;
Expand Down
13 changes: 11 additions & 2 deletions codex-rs/core/src/codex_conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use std::path::PathBuf;

pub struct CodexConversation {
codex: Codex,
rollout_path: PathBuf,
}

/// Conduit for the bidirectional stream of messages that compose a conversation
/// in Codex.
impl CodexConversation {
pub(crate) fn new(codex: Codex) -> Self {
Self { codex }
pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self {
Self {
codex,
rollout_path,
}
}

pub async fn submit(&self, op: Op) -> CodexResult<String> {
Expand All @@ -27,4 +32,8 @@ impl CodexConversation {
pub async fn next_event(&self) -> CodexResult<Event> {
self.codex.next_event().await
}

pub fn rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}
}
5 changes: 4 additions & 1 deletion codex-rs/core/src/conversation_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ impl ConversationManager {
}
};

let conversation = Arc::new(CodexConversation::new(codex));
let conversation = Arc::new(CodexConversation::new(
codex,
session_configured.rollout_path.clone(),
));
self.conversations
.write()
.await
Expand Down
Loading