diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 2edc559c1da..967b020d18b 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1342,6 +1342,7 @@ dependencies = [ "codex-login", "codex-plugin", "codex-protocol", + "os_info", "pretty_assertions", "serde", "serde_json", diff --git a/codex-rs/analytics/Cargo.toml b/codex-rs/analytics/Cargo.toml index 63bf63c9112..2239c4bc36d 100644 --- a/codex-rs/analytics/Cargo.toml +++ b/codex-rs/analytics/Cargo.toml @@ -18,6 +18,7 @@ codex-git-utils = { workspace = true } codex-login = { workspace = true } codex-plugin = { workspace = true } codex-protocol = { workspace = true } +os_info = { workspace = true } serde = { workspace = true, features = ["derive"] } sha1 = { workspace = true } tokio = { workspace = true, features = [ diff --git a/codex-rs/analytics/src/analytics_client.rs b/codex-rs/analytics/src/analytics_client.rs deleted file mode 100644 index 24618168cc1..00000000000 --- a/codex-rs/analytics/src/analytics_client.rs +++ /dev/null @@ -1,671 +0,0 @@ -use codex_app_server_protocol::ClientRequest; -use codex_app_server_protocol::ClientResponse; -use codex_app_server_protocol::InitializeParams; -use codex_app_server_protocol::RequestId; -use codex_app_server_protocol::ServerNotification; -use codex_git_utils::collect_git_info; -use codex_git_utils::get_git_repo_root; -use codex_login::AuthManager; -use codex_login::default_client::create_client; -use codex_login::default_client::originator; -use codex_plugin::PluginTelemetryMetadata; -use codex_protocol::protocol::SkillScope; -use serde::Serialize; -use sha1::Digest; -use sha1::Sha1; -use std::collections::HashSet; -use std::path::Path; -use std::path::PathBuf; -use std::sync::Arc; -use std::sync::Mutex; -use std::time::Duration; -use tokio::sync::mpsc; - -#[derive(Clone)] -pub struct TrackEventsContext { - pub model_slug: String, - pub thread_id: String, - pub turn_id: String, -} - -pub fn build_track_events_context( - model_slug: String, - thread_id: String, - turn_id: String, -) -> TrackEventsContext { - TrackEventsContext { - model_slug, - thread_id, - turn_id, - } -} - -#[derive(Clone, Debug)] -pub struct SkillInvocation { - pub skill_name: String, - pub skill_scope: SkillScope, - pub skill_path: PathBuf, - pub invocation_type: InvocationType, -} - -#[derive(Clone, Copy, Debug, Serialize)] -#[serde(rename_all = "lowercase")] -pub enum InvocationType { - Explicit, - Implicit, -} - -pub struct AppInvocation { - pub connector_id: Option, - pub app_name: Option, - pub invocation_type: Option, -} - -pub enum AnalyticsFact { - Initialize { - connection_id: u64, - params: InitializeParams, - }, - Request { - connection_id: u64, - request_id: RequestId, - request: Box, - }, - Response { - connection_id: u64, - response: Box, - }, - Notification(Box), - // Facts that do not naturally exist on the app-server protocol surface, or - // would require non-trivial protocol reshaping on this branch. - Custom(CustomAnalyticsFact), -} - -pub enum CustomAnalyticsFact { - SkillInvoked(SkillInvokedInput), - AppMentioned(AppMentionedInput), - AppUsed(AppUsedInput), - PluginUsed(PluginUsedInput), - PluginStateChanged(PluginStateChangedInput), -} - -pub struct SkillInvokedInput { - pub tracking: TrackEventsContext, - pub invocations: Vec, -} - -pub struct AppMentionedInput { - pub tracking: TrackEventsContext, - pub mentions: Vec, -} - -pub struct AppUsedInput { - pub tracking: TrackEventsContext, - pub app: AppInvocation, -} - -pub struct PluginUsedInput { - pub tracking: TrackEventsContext, - pub plugin: PluginTelemetryMetadata, -} - -pub struct PluginStateChangedInput { - pub plugin: PluginTelemetryMetadata, - pub state: PluginState, -} - -#[derive(Clone, Copy)] -pub enum PluginState { - Installed, - Uninstalled, - Enabled, - Disabled, -} - -#[derive(Default)] -pub struct AnalyticsReducer; - -#[derive(Clone)] -pub(crate) struct AnalyticsEventsQueue { - sender: mpsc::Sender, - app_used_emitted_keys: Arc>>, - plugin_used_emitted_keys: Arc>>, -} - -#[derive(Clone)] -pub struct AnalyticsEventsClient { - queue: AnalyticsEventsQueue, - analytics_enabled: Option, -} - -impl AnalyticsEventsQueue { - pub(crate) fn new(auth_manager: Arc, base_url: String) -> Self { - let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE); - tokio::spawn(async move { - let mut reducer = AnalyticsReducer; - while let Some(input) = receiver.recv().await { - let mut events = Vec::new(); - reducer.ingest(input, &mut events).await; - send_track_events(&auth_manager, &base_url, events).await; - } - }); - Self { - sender, - app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), - plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), - } - } - - fn try_send(&self, input: AnalyticsFact) { - if self.sender.try_send(input).is_err() { - //TODO: add a metric for this - tracing::warn!("dropping analytics events: queue is full"); - } - } - - fn should_enqueue_app_used(&self, tracking: &TrackEventsContext, app: &AppInvocation) -> bool { - let Some(connector_id) = app.connector_id.as_ref() else { - return true; - }; - let mut emitted = self - .app_used_emitted_keys - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS { - emitted.clear(); - } - emitted.insert((tracking.turn_id.clone(), connector_id.clone())) - } - - fn should_enqueue_plugin_used( - &self, - tracking: &TrackEventsContext, - plugin: &PluginTelemetryMetadata, - ) -> bool { - let mut emitted = self - .plugin_used_emitted_keys - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS { - emitted.clear(); - } - emitted.insert((tracking.turn_id.clone(), plugin.plugin_id.as_key())) - } -} - -impl AnalyticsEventsClient { - pub fn new( - auth_manager: Arc, - base_url: String, - analytics_enabled: Option, - ) -> Self { - Self { - queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url), - analytics_enabled, - } - } - - pub fn track_skill_invocations( - &self, - tracking: TrackEventsContext, - invocations: Vec, - ) { - if invocations.is_empty() { - return; - } - self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked( - SkillInvokedInput { - tracking, - invocations, - }, - ))); - } - - pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec) { - if mentions.is_empty() { - return; - } - self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned( - AppMentionedInput { tracking, mentions }, - ))); - } - - pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) { - if !self.queue.should_enqueue_app_used(&tracking, &app) { - return; - } - self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed( - AppUsedInput { tracking, app }, - ))); - } - - pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) { - if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) { - return; - } - self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed( - PluginUsedInput { tracking, plugin }, - ))); - } - - pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) { - self.record_fact(AnalyticsFact::Custom( - CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { - plugin, - state: PluginState::Installed, - }), - )); - } - - pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) { - self.record_fact(AnalyticsFact::Custom( - CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { - plugin, - state: PluginState::Uninstalled, - }), - )); - } - - pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) { - self.record_fact(AnalyticsFact::Custom( - CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { - plugin, - state: PluginState::Enabled, - }), - )); - } - - pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) { - self.record_fact(AnalyticsFact::Custom( - CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { - plugin, - state: PluginState::Disabled, - }), - )); - } - - fn record_fact(&self, input: AnalyticsFact) { - if self.analytics_enabled == Some(false) { - return; - } - self.queue.try_send(input); - } -} - -const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256; -const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10); -const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096; - -#[derive(Serialize)] -struct TrackEventsRequest { - events: Vec, -} - -#[derive(Serialize)] -#[serde(untagged)] -enum TrackEventRequest { - SkillInvocation(SkillInvocationEventRequest), - AppMentioned(CodexAppMentionedEventRequest), - AppUsed(CodexAppUsedEventRequest), - PluginUsed(CodexPluginUsedEventRequest), - PluginInstalled(CodexPluginEventRequest), - PluginUninstalled(CodexPluginEventRequest), - PluginEnabled(CodexPluginEventRequest), - PluginDisabled(CodexPluginEventRequest), -} - -#[derive(Serialize)] -struct SkillInvocationEventRequest { - event_type: &'static str, - skill_id: String, - skill_name: String, - event_params: SkillInvocationEventParams, -} - -#[derive(Serialize)] -struct SkillInvocationEventParams { - product_client_id: Option, - skill_scope: Option, - repo_url: Option, - thread_id: Option, - invoke_type: Option, - model_slug: Option, -} - -#[derive(Serialize)] -struct CodexAppMetadata { - connector_id: Option, - thread_id: Option, - turn_id: Option, - app_name: Option, - product_client_id: Option, - invoke_type: Option, - model_slug: Option, -} - -#[derive(Serialize)] -struct CodexAppMentionedEventRequest { - event_type: &'static str, - event_params: CodexAppMetadata, -} - -#[derive(Serialize)] -struct CodexAppUsedEventRequest { - event_type: &'static str, - event_params: CodexAppMetadata, -} - -#[derive(Serialize)] -struct CodexPluginMetadata { - plugin_id: Option, - plugin_name: Option, - marketplace_name: Option, - has_skills: Option, - mcp_server_count: Option, - connector_ids: Option>, - product_client_id: Option, -} - -#[derive(Serialize)] -struct CodexPluginUsedMetadata { - #[serde(flatten)] - plugin: CodexPluginMetadata, - thread_id: Option, - turn_id: Option, - model_slug: Option, -} - -#[derive(Serialize)] -struct CodexPluginEventRequest { - event_type: &'static str, - event_params: CodexPluginMetadata, -} - -#[derive(Serialize)] -struct CodexPluginUsedEventRequest { - event_type: &'static str, - event_params: CodexPluginUsedMetadata, -} - -impl AnalyticsReducer { - async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec) { - match input { - AnalyticsFact::Initialize { - connection_id: _connection_id, - params: _params, - } => {} - AnalyticsFact::Request { - connection_id: _connection_id, - request_id: _request_id, - request: _request, - } => {} - AnalyticsFact::Response { - connection_id: _connection_id, - response: _response, - } => {} - AnalyticsFact::Notification(_notification) => {} - AnalyticsFact::Custom(input) => match input { - CustomAnalyticsFact::SkillInvoked(input) => { - self.ingest_skill_invoked(input, out).await; - } - CustomAnalyticsFact::AppMentioned(input) => { - self.ingest_app_mentioned(input, out); - } - CustomAnalyticsFact::AppUsed(input) => { - self.ingest_app_used(input, out); - } - CustomAnalyticsFact::PluginUsed(input) => { - self.ingest_plugin_used(input, out); - } - CustomAnalyticsFact::PluginStateChanged(input) => { - self.ingest_plugin_state_changed(input, out); - } - }, - } - } - - async fn ingest_skill_invoked( - &mut self, - input: SkillInvokedInput, - out: &mut Vec, - ) { - let SkillInvokedInput { - tracking, - invocations, - } = input; - for invocation in invocations { - let skill_scope = match invocation.skill_scope { - SkillScope::User => "user", - SkillScope::Repo => "repo", - SkillScope::System => "system", - SkillScope::Admin => "admin", - }; - let repo_root = get_git_repo_root(invocation.skill_path.as_path()); - let repo_url = if let Some(root) = repo_root.as_ref() { - collect_git_info(root) - .await - .and_then(|info| info.repository_url) - } else { - None - }; - let skill_id = skill_id_for_local_skill( - repo_url.as_deref(), - repo_root.as_deref(), - invocation.skill_path.as_path(), - invocation.skill_name.as_str(), - ); - out.push(TrackEventRequest::SkillInvocation( - SkillInvocationEventRequest { - event_type: "skill_invocation", - skill_id, - skill_name: invocation.skill_name.clone(), - event_params: SkillInvocationEventParams { - thread_id: Some(tracking.thread_id.clone()), - invoke_type: Some(invocation.invocation_type), - model_slug: Some(tracking.model_slug.clone()), - product_client_id: Some(originator().value), - repo_url, - skill_scope: Some(skill_scope.to_string()), - }, - }, - )); - } - } - - fn ingest_app_mentioned(&mut self, input: AppMentionedInput, out: &mut Vec) { - let AppMentionedInput { tracking, mentions } = input; - out.extend(mentions.into_iter().map(|mention| { - let event_params = codex_app_metadata(&tracking, mention); - TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest { - event_type: "codex_app_mentioned", - event_params, - }) - })); - } - - fn ingest_app_used(&mut self, input: AppUsedInput, out: &mut Vec) { - let AppUsedInput { tracking, app } = input; - let event_params = codex_app_metadata(&tracking, app); - out.push(TrackEventRequest::AppUsed(CodexAppUsedEventRequest { - event_type: "codex_app_used", - event_params, - })); - } - - fn ingest_plugin_used(&mut self, input: PluginUsedInput, out: &mut Vec) { - let PluginUsedInput { tracking, plugin } = input; - out.push(TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest { - event_type: "codex_plugin_used", - event_params: codex_plugin_used_metadata(&tracking, plugin), - })); - } - - fn ingest_plugin_state_changed( - &mut self, - input: PluginStateChangedInput, - out: &mut Vec, - ) { - let PluginStateChangedInput { plugin, state } = input; - let event = CodexPluginEventRequest { - event_type: plugin_state_event_type(state), - event_params: codex_plugin_metadata(plugin), - }; - out.push(match state { - PluginState::Installed => TrackEventRequest::PluginInstalled(event), - PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event), - PluginState::Enabled => TrackEventRequest::PluginEnabled(event), - PluginState::Disabled => TrackEventRequest::PluginDisabled(event), - }); - } -} - -fn plugin_state_event_type(state: PluginState) -> &'static str { - match state { - PluginState::Installed => "codex_plugin_installed", - PluginState::Uninstalled => "codex_plugin_uninstalled", - PluginState::Enabled => "codex_plugin_enabled", - PluginState::Disabled => "codex_plugin_disabled", - } -} - -fn codex_app_metadata(tracking: &TrackEventsContext, app: AppInvocation) -> CodexAppMetadata { - CodexAppMetadata { - connector_id: app.connector_id, - thread_id: Some(tracking.thread_id.clone()), - turn_id: Some(tracking.turn_id.clone()), - app_name: app.app_name, - product_client_id: Some(originator().value), - invoke_type: app.invocation_type, - model_slug: Some(tracking.model_slug.clone()), - } -} - -fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPluginMetadata { - let capability_summary = plugin.capability_summary; - CodexPluginMetadata { - plugin_id: Some(plugin.plugin_id.as_key()), - plugin_name: Some(plugin.plugin_id.plugin_name), - marketplace_name: Some(plugin.plugin_id.marketplace_name), - has_skills: capability_summary - .as_ref() - .map(|summary| summary.has_skills), - mcp_server_count: capability_summary - .as_ref() - .map(|summary| summary.mcp_server_names.len()), - connector_ids: capability_summary.map(|summary| { - summary - .app_connector_ids - .into_iter() - .map(|connector_id| connector_id.0) - .collect() - }), - product_client_id: Some(originator().value), - } -} - -fn codex_plugin_used_metadata( - tracking: &TrackEventsContext, - plugin: PluginTelemetryMetadata, -) -> CodexPluginUsedMetadata { - CodexPluginUsedMetadata { - plugin: codex_plugin_metadata(plugin), - thread_id: Some(tracking.thread_id.clone()), - turn_id: Some(tracking.turn_id.clone()), - model_slug: Some(tracking.model_slug.clone()), - } -} - -async fn send_track_events( - auth_manager: &AuthManager, - base_url: &str, - events: Vec, -) { - if events.is_empty() { - return; - } - let Some(auth) = auth_manager.auth().await else { - return; - }; - if !auth.is_chatgpt_auth() { - return; - } - let access_token = match auth.get_token() { - Ok(token) => token, - Err(_) => return, - }; - let Some(account_id) = auth.get_account_id() else { - return; - }; - - let base_url = base_url.trim_end_matches('/'); - let url = format!("{base_url}/codex/analytics-events/events"); - let payload = TrackEventsRequest { events }; - - let response = create_client() - .post(&url) - .timeout(ANALYTICS_EVENTS_TIMEOUT) - .bearer_auth(&access_token) - .header("chatgpt-account-id", &account_id) - .header("Content-Type", "application/json") - .json(&payload) - .send() - .await; - - match response { - Ok(response) if response.status().is_success() => {} - Ok(response) => { - let status = response.status(); - let body = response.text().await.unwrap_or_default(); - tracing::warn!("events failed with status {status}: {body}"); - } - Err(err) => { - tracing::warn!("failed to send events request: {err}"); - } - } -} - -pub(crate) fn skill_id_for_local_skill( - repo_url: Option<&str>, - repo_root: Option<&Path>, - skill_path: &Path, - skill_name: &str, -) -> String { - let path = normalize_path_for_skill_id(repo_url, repo_root, skill_path); - let prefix = if let Some(url) = repo_url { - format!("repo_{url}") - } else { - "personal".to_string() - }; - let raw_id = format!("{prefix}_{path}_{skill_name}"); - let mut hasher = Sha1::new(); - hasher.update(raw_id.as_bytes()); - format!("{:x}", hasher.finalize()) -} - -/// Returns a normalized path for skill ID construction. -/// -/// - Repo-scoped skills use a path relative to the repo root. -/// - User/admin/system skills use an absolute path. -fn normalize_path_for_skill_id( - repo_url: Option<&str>, - repo_root: Option<&Path>, - skill_path: &Path, -) -> String { - let resolved_path = - std::fs::canonicalize(skill_path).unwrap_or_else(|_| skill_path.to_path_buf()); - match (repo_url, repo_root) { - (Some(_), Some(root)) => { - let resolved_root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf()); - resolved_path - .strip_prefix(&resolved_root) - .unwrap_or(resolved_path.as_path()) - .to_string_lossy() - .replace('\\', "/") - } - _ => resolved_path.to_string_lossy().replace('\\', "/"), - } -} - -#[cfg(test)] -#[path = "analytics_client_tests.rs"] -mod tests; diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 991f9771601..3a98fbb8376 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -1,26 +1,47 @@ -use super::AnalyticsEventsQueue; -use super::AnalyticsFact; -use super::AnalyticsReducer; -use super::AppInvocation; -use super::AppMentionedInput; -use super::AppUsedInput; -use super::CodexAppMentionedEventRequest; -use super::CodexAppUsedEventRequest; -use super::CodexPluginEventRequest; -use super::CodexPluginUsedEventRequest; -use super::CustomAnalyticsFact; -use super::InvocationType; -use super::PluginState; -use super::PluginStateChangedInput; -use super::PluginUsedInput; -use super::SkillInvocation; -use super::SkillInvokedInput; -use super::TrackEventRequest; -use super::TrackEventsContext; -use super::codex_app_metadata; -use super::codex_plugin_metadata; -use super::codex_plugin_used_metadata; -use super::normalize_path_for_skill_id; +use crate::client::AnalyticsEventsQueue; +use crate::events::AppServerRpcTransport; +use crate::events::CodexAppMentionedEventRequest; +use crate::events::CodexAppServerClientMetadata; +use crate::events::CodexAppUsedEventRequest; +use crate::events::CodexPluginEventRequest; +use crate::events::CodexPluginUsedEventRequest; +use crate::events::CodexRuntimeMetadata; +use crate::events::ThreadInitializationMode; +use crate::events::ThreadInitializedEvent; +use crate::events::ThreadInitializedEventParams; +use crate::events::TrackEventRequest; +use crate::events::codex_app_metadata; +use crate::events::codex_plugin_metadata; +use crate::events::codex_plugin_used_metadata; +use crate::facts::AnalyticsFact; +use crate::facts::AppInvocation; +use crate::facts::AppMentionedInput; +use crate::facts::AppUsedInput; +use crate::facts::CustomAnalyticsFact; +use crate::facts::InvocationType; +use crate::facts::PluginState; +use crate::facts::PluginStateChangedInput; +use crate::facts::PluginUsedInput; +use crate::facts::SkillInvocation; +use crate::facts::SkillInvokedInput; +use crate::facts::TrackEventsContext; +use crate::reducer::AnalyticsReducer; +use crate::reducer::normalize_path_for_skill_id; +use crate::reducer::skill_id_for_local_skill; +use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer; +use codex_app_server_protocol::AskForApproval as AppServerAskForApproval; +use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientResponse; +use codex_app_server_protocol::InitializeCapabilities; +use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy; +use codex_app_server_protocol::SessionSource as AppServerSessionSource; +use codex_app_server_protocol::Thread; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus; +use codex_login::default_client::DEFAULT_ORIGINATOR; use codex_login::default_client::originator; use codex_plugin::AppConnectorId; use codex_plugin::PluginCapabilitySummary; @@ -34,6 +55,61 @@ use std::sync::Arc; use std::sync::Mutex; use tokio::sync::mpsc; +fn sample_thread(thread_id: &str, ephemeral: bool) -> Thread { + Thread { + id: thread_id.to_string(), + preview: "first prompt".to_string(), + ephemeral, + model_provider: "openai".to_string(), + created_at: 1, + updated_at: 2, + status: AppServerThreadStatus::Idle, + path: None, + cwd: PathBuf::from("/tmp"), + cli_version: "0.0.0".to_string(), + source: AppServerSessionSource::Exec, + agent_nickname: None, + agent_role: None, + git_info: None, + name: None, + turns: Vec::new(), + } +} + +fn sample_thread_start_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse { + ClientResponse::ThreadStart { + request_id: RequestId::Integer(1), + response: ThreadStartResponse { + thread: sample_thread(thread_id, ephemeral), + model: model.to_string(), + model_provider: "openai".to_string(), + service_tier: None, + cwd: PathBuf::from("/tmp"), + approval_policy: AppServerAskForApproval::OnFailure, + approvals_reviewer: AppServerApprovalsReviewer::User, + sandbox: AppServerSandboxPolicy::DangerFullAccess, + reasoning_effort: None, + }, + } +} + +fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse { + ClientResponse::ThreadResume { + request_id: RequestId::Integer(2), + response: ThreadResumeResponse { + thread: sample_thread(thread_id, ephemeral), + model: model.to_string(), + model_provider: "openai".to_string(), + service_tier: None, + cwd: PathBuf::from("/tmp"), + approval_policy: AppServerAskForApproval::OnFailure, + approvals_reviewer: AppServerApprovalsReviewer::User, + sandbox: AppServerSandboxPolicy::DangerFullAccess, + reasoning_effort: None, + }, + } +} + fn expected_absolute_path(path: &PathBuf) -> String { std::fs::canonicalize(path) .unwrap_or_else(|_| path.to_path_buf()) @@ -204,6 +280,171 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() { assert_eq!(queue.should_enqueue_app_used(&turn_2, &app), true); } +#[test] +fn thread_initialized_event_serializes_expected_shape() { + let event = TrackEventRequest::ThreadInitialized(ThreadInitializedEvent { + event_type: "codex_thread_initialized", + event_params: ThreadInitializedEventParams { + thread_id: "thread-0".to_string(), + app_server_client: CodexAppServerClientMetadata { + product_client_id: DEFAULT_ORIGINATOR.to_string(), + client_name: Some("codex-tui".to_string()), + client_version: Some("1.0.0".to_string()), + rpc_transport: AppServerRpcTransport::Stdio, + experimental_api_enabled: Some(true), + }, + runtime: CodexRuntimeMetadata { + codex_rs_version: "0.1.0".to_string(), + runtime_os: "macos".to_string(), + runtime_os_version: "15.3.1".to_string(), + runtime_arch: "aarch64".to_string(), + }, + model: "gpt-5".to_string(), + ephemeral: true, + thread_source: Some("user"), + initialization_mode: ThreadInitializationMode::New, + subagent_source: None, + parent_thread_id: None, + created_at: 1, + }, + }); + + let payload = serde_json::to_value(&event).expect("serialize thread initialized event"); + + assert_eq!( + payload, + json!({ + "event_type": "codex_thread_initialized", + "event_params": { + "thread_id": "thread-0", + "app_server_client": { + "product_client_id": DEFAULT_ORIGINATOR, + "client_name": "codex-tui", + "client_version": "1.0.0", + "rpc_transport": "stdio", + "experimental_api_enabled": true + }, + "runtime": { + "codex_rs_version": "0.1.0", + "runtime_os": "macos", + "runtime_os_version": "15.3.1", + "runtime_arch": "aarch64" + }, + "model": "gpt-5", + "ephemeral": true, + "thread_source": "user", + "initialization_mode": "new", + "subagent_source": null, + "parent_thread_id": null, + "created_at": 1 + } + }) + ); +} + +#[tokio::test] +async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialized() { + let mut reducer = AnalyticsReducer::default(); + let mut events = Vec::new(); + + reducer + .ingest( + AnalyticsFact::Response { + connection_id: 7, + response: Box::new(sample_thread_start_response( + "thread-no-client", + /*ephemeral*/ false, + "gpt-5", + )), + }, + &mut events, + ) + .await; + assert!(events.is_empty(), "thread events should require initialize"); + + reducer + .ingest( + AnalyticsFact::Initialize { + connection_id: 7, + params: InitializeParams { + client_info: ClientInfo { + name: "codex-tui".to_string(), + title: None, + version: "1.0.0".to_string(), + }, + capabilities: Some(InitializeCapabilities { + experimental_api: false, + opt_out_notification_methods: None, + }), + }, + product_client_id: DEFAULT_ORIGINATOR.to_string(), + runtime: CodexRuntimeMetadata { + codex_rs_version: "0.99.0".to_string(), + runtime_os: "linux".to_string(), + runtime_os_version: "24.04".to_string(), + runtime_arch: "x86_64".to_string(), + }, + rpc_transport: AppServerRpcTransport::Websocket, + }, + &mut events, + ) + .await; + assert!(events.is_empty(), "initialize should not publish by itself"); + + reducer + .ingest( + AnalyticsFact::Response { + connection_id: 7, + response: Box::new(sample_thread_resume_response( + "thread-1", /*ephemeral*/ true, "gpt-5", + )), + }, + &mut events, + ) + .await; + + let payload = serde_json::to_value(&events).expect("serialize events"); + assert_eq!(payload.as_array().expect("events array").len(), 1); + assert_eq!(payload[0]["event_type"], "codex_thread_initialized"); + assert_eq!( + payload[0]["event_params"]["app_server_client"]["product_client_id"], + DEFAULT_ORIGINATOR + ); + assert_eq!( + payload[0]["event_params"]["app_server_client"]["client_name"], + "codex-tui" + ); + assert_eq!( + payload[0]["event_params"]["app_server_client"]["client_version"], + "1.0.0" + ); + assert_eq!( + payload[0]["event_params"]["app_server_client"]["rpc_transport"], + "websocket" + ); + assert_eq!( + payload[0]["event_params"]["app_server_client"]["experimental_api_enabled"], + false + ); + assert_eq!( + payload[0]["event_params"]["runtime"]["codex_rs_version"], + "0.99.0" + ); + assert_eq!(payload[0]["event_params"]["runtime"]["runtime_os"], "linux"); + assert_eq!( + payload[0]["event_params"]["runtime"]["runtime_os_version"], + "24.04" + ); + assert_eq!( + payload[0]["event_params"]["runtime"]["runtime_arch"], + "x86_64" + ); + assert_eq!(payload[0]["event_params"]["initialization_mode"], "resumed"); + assert_eq!(payload[0]["event_params"]["thread_source"], "user"); + assert_eq!(payload[0]["event_params"]["subagent_source"], json!(null)); + assert_eq!(payload[0]["event_params"]["parent_thread_id"], json!(null)); +} + #[test] fn plugin_used_event_serializes_expected_shape() { let tracking = TrackEventsContext { @@ -292,7 +533,7 @@ fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() { #[tokio::test] async fn reducer_ingests_skill_invoked_fact() { - let mut reducer = AnalyticsReducer; + let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); let tracking = TrackEventsContext { model_slug: "gpt-5".to_string(), @@ -300,7 +541,7 @@ async fn reducer_ingests_skill_invoked_fact() { turn_id: "turn-1".to_string(), }; let skill_path = PathBuf::from("/Users/abc/.codex/skills/doc/SKILL.md"); - let expected_skill_id = super::skill_id_for_local_skill( + let expected_skill_id = skill_id_for_local_skill( /*repo_url*/ None, /*repo_root*/ None, skill_path.as_path(), @@ -343,7 +584,7 @@ async fn reducer_ingests_skill_invoked_fact() { #[tokio::test] async fn reducer_ingests_app_and_plugin_facts() { - let mut reducer = AnalyticsReducer; + let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); let tracking = TrackEventsContext { model_slug: "gpt-5".to_string(), @@ -396,7 +637,7 @@ async fn reducer_ingests_app_and_plugin_facts() { #[tokio::test] async fn reducer_ingests_plugin_state_changed_fact() { - let mut reducer = AnalyticsReducer; + let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); reducer diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs new file mode 100644 index 00000000000..b8a543a3b2e --- /dev/null +++ b/codex-rs/analytics/src/client.rs @@ -0,0 +1,272 @@ +use crate::events::AppServerRpcTransport; +use crate::events::TrackEventRequest; +use crate::events::TrackEventsRequest; +use crate::events::current_runtime_metadata; +use crate::facts::AnalyticsFact; +use crate::facts::AppInvocation; +use crate::facts::AppMentionedInput; +use crate::facts::AppUsedInput; +use crate::facts::CustomAnalyticsFact; +use crate::facts::PluginState; +use crate::facts::PluginStateChangedInput; +use crate::facts::SkillInvocation; +use crate::facts::SkillInvokedInput; +use crate::facts::TrackEventsContext; +use crate::reducer::AnalyticsReducer; +use codex_app_server_protocol::ClientResponse; +use codex_app_server_protocol::InitializeParams; +use codex_login::AuthManager; +use codex_login::default_client::create_client; +use codex_plugin::PluginTelemetryMetadata; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; +use tokio::sync::mpsc; + +const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256; +const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10); +const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096; + +#[derive(Clone)] +pub(crate) struct AnalyticsEventsQueue { + pub(crate) sender: mpsc::Sender, + pub(crate) app_used_emitted_keys: Arc>>, + pub(crate) plugin_used_emitted_keys: Arc>>, +} + +#[derive(Clone)] +pub struct AnalyticsEventsClient { + queue: AnalyticsEventsQueue, + analytics_enabled: Option, +} + +impl AnalyticsEventsQueue { + pub(crate) fn new(auth_manager: Arc, base_url: String) -> Self { + let (sender, mut receiver) = mpsc::channel(ANALYTICS_EVENTS_QUEUE_SIZE); + tokio::spawn(async move { + let mut reducer = AnalyticsReducer::default(); + while let Some(input) = receiver.recv().await { + let mut events = Vec::new(); + reducer.ingest(input, &mut events).await; + send_track_events(&auth_manager, &base_url, events).await; + } + }); + Self { + sender, + app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), + plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())), + } + } + + fn try_send(&self, input: AnalyticsFact) { + if self.sender.try_send(input).is_err() { + //TODO: add a metric for this + tracing::warn!("dropping analytics events: queue is full"); + } + } + + pub(crate) fn should_enqueue_app_used( + &self, + tracking: &TrackEventsContext, + app: &AppInvocation, + ) -> bool { + let Some(connector_id) = app.connector_id.as_ref() else { + return true; + }; + let mut emitted = self + .app_used_emitted_keys + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS { + emitted.clear(); + } + emitted.insert((tracking.turn_id.clone(), connector_id.clone())) + } + + pub(crate) fn should_enqueue_plugin_used( + &self, + tracking: &TrackEventsContext, + plugin: &PluginTelemetryMetadata, + ) -> bool { + let mut emitted = self + .plugin_used_emitted_keys + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if emitted.len() >= ANALYTICS_EVENT_DEDUPE_MAX_KEYS { + emitted.clear(); + } + emitted.insert((tracking.turn_id.clone(), plugin.plugin_id.as_key())) + } +} + +impl AnalyticsEventsClient { + pub fn new( + auth_manager: Arc, + base_url: String, + analytics_enabled: Option, + ) -> Self { + Self { + queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url), + analytics_enabled, + } + } + + pub fn track_skill_invocations( + &self, + tracking: TrackEventsContext, + invocations: Vec, + ) { + if invocations.is_empty() { + return; + } + self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked( + SkillInvokedInput { + tracking, + invocations, + }, + ))); + } + + pub fn track_initialize( + &self, + connection_id: u64, + params: InitializeParams, + product_client_id: String, + rpc_transport: AppServerRpcTransport, + ) { + self.record_fact(AnalyticsFact::Initialize { + connection_id, + params, + product_client_id, + runtime: current_runtime_metadata(), + rpc_transport, + }); + } + + pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec) { + if mentions.is_empty() { + return; + } + self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned( + AppMentionedInput { tracking, mentions }, + ))); + } + + pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) { + if !self.queue.should_enqueue_app_used(&tracking, &app) { + return; + } + self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed( + AppUsedInput { tracking, app }, + ))); + } + + pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) { + if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) { + return; + } + self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed( + crate::facts::PluginUsedInput { tracking, plugin }, + ))); + } + + pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) { + self.record_fact(AnalyticsFact::Custom( + CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { + plugin, + state: PluginState::Installed, + }), + )); + } + + pub fn track_plugin_uninstalled(&self, plugin: PluginTelemetryMetadata) { + self.record_fact(AnalyticsFact::Custom( + CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { + plugin, + state: PluginState::Uninstalled, + }), + )); + } + + pub fn track_plugin_enabled(&self, plugin: PluginTelemetryMetadata) { + self.record_fact(AnalyticsFact::Custom( + CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { + plugin, + state: PluginState::Enabled, + }), + )); + } + + pub fn track_plugin_disabled(&self, plugin: PluginTelemetryMetadata) { + self.record_fact(AnalyticsFact::Custom( + CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput { + plugin, + state: PluginState::Disabled, + }), + )); + } + + pub(crate) fn record_fact(&self, input: AnalyticsFact) { + if self.analytics_enabled == Some(false) { + return; + } + self.queue.try_send(input); + } + + pub fn track_response(&self, connection_id: u64, response: ClientResponse) { + self.record_fact(AnalyticsFact::Response { + connection_id, + response: Box::new(response), + }); + } +} + +async fn send_track_events( + auth_manager: &AuthManager, + base_url: &str, + events: Vec, +) { + if events.is_empty() { + return; + } + let Some(auth) = auth_manager.auth().await else { + return; + }; + if !auth.is_chatgpt_auth() { + return; + } + let access_token = match auth.get_token() { + Ok(token) => token, + Err(_) => return, + }; + let Some(account_id) = auth.get_account_id() else { + return; + }; + + let base_url = base_url.trim_end_matches('/'); + let url = format!("{base_url}/codex/analytics-events/events"); + let payload = TrackEventsRequest { events }; + + let response = create_client() + .post(&url) + .timeout(ANALYTICS_EVENTS_TIMEOUT) + .bearer_auth(&access_token) + .header("chatgpt-account-id", &account_id) + .header("Content-Type", "application/json") + .json(&payload) + .send() + .await; + + match response { + Ok(response) if response.status().is_success() => {} + Ok(response) => { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + tracing::warn!("events failed with status {status}: {body}"); + } + Err(err) => { + tracing::warn!("failed to send events request: {err}"); + } + } +} diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs new file mode 100644 index 00000000000..36efb01a695 --- /dev/null +++ b/codex-rs/analytics/src/events.rs @@ -0,0 +1,230 @@ +use crate::facts::AppInvocation; +use crate::facts::InvocationType; +use crate::facts::PluginState; +use crate::facts::TrackEventsContext; +use codex_login::default_client::originator; +use codex_plugin::PluginTelemetryMetadata; +use codex_protocol::protocol::SessionSource; +use serde::Serialize; + +#[derive(Clone, Copy, Debug, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum AppServerRpcTransport { + Stdio, + Websocket, + InProcess, +} + +#[derive(Clone, Copy, Debug, Serialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum ThreadInitializationMode { + New, + Forked, + Resumed, +} + +#[derive(Serialize)] +pub(crate) struct TrackEventsRequest { + pub(crate) events: Vec, +} + +#[derive(Serialize)] +#[serde(untagged)] +pub(crate) enum TrackEventRequest { + SkillInvocation(SkillInvocationEventRequest), + ThreadInitialized(ThreadInitializedEvent), + AppMentioned(CodexAppMentionedEventRequest), + AppUsed(CodexAppUsedEventRequest), + PluginUsed(CodexPluginUsedEventRequest), + PluginInstalled(CodexPluginEventRequest), + PluginUninstalled(CodexPluginEventRequest), + PluginEnabled(CodexPluginEventRequest), + PluginDisabled(CodexPluginEventRequest), +} + +#[derive(Serialize)] +pub(crate) struct SkillInvocationEventRequest { + pub(crate) event_type: &'static str, + pub(crate) skill_id: String, + pub(crate) skill_name: String, + pub(crate) event_params: SkillInvocationEventParams, +} + +#[derive(Serialize)] +pub(crate) struct SkillInvocationEventParams { + pub(crate) product_client_id: Option, + pub(crate) skill_scope: Option, + pub(crate) repo_url: Option, + pub(crate) thread_id: Option, + pub(crate) invoke_type: Option, + pub(crate) model_slug: Option, +} + +#[derive(Clone, Serialize)] +pub(crate) struct CodexAppServerClientMetadata { + pub(crate) product_client_id: String, + pub(crate) client_name: Option, + pub(crate) client_version: Option, + pub(crate) rpc_transport: AppServerRpcTransport, + pub(crate) experimental_api_enabled: Option, +} + +#[derive(Clone, Serialize)] +pub(crate) struct CodexRuntimeMetadata { + pub(crate) codex_rs_version: String, + pub(crate) runtime_os: String, + pub(crate) runtime_os_version: String, + pub(crate) runtime_arch: String, +} + +#[derive(Serialize)] +pub(crate) struct ThreadInitializedEventParams { + pub(crate) thread_id: String, + pub(crate) app_server_client: CodexAppServerClientMetadata, + pub(crate) runtime: CodexRuntimeMetadata, + pub(crate) model: String, + pub(crate) ephemeral: bool, + pub(crate) thread_source: Option<&'static str>, + pub(crate) initialization_mode: ThreadInitializationMode, + pub(crate) subagent_source: Option, + pub(crate) parent_thread_id: Option, + pub(crate) created_at: u64, +} + +#[derive(Serialize)] +pub(crate) struct ThreadInitializedEvent { + pub(crate) event_type: &'static str, + pub(crate) event_params: ThreadInitializedEventParams, +} + +#[derive(Serialize)] +pub(crate) struct CodexAppMetadata { + pub(crate) connector_id: Option, + pub(crate) thread_id: Option, + pub(crate) turn_id: Option, + pub(crate) app_name: Option, + pub(crate) product_client_id: Option, + pub(crate) invoke_type: Option, + pub(crate) model_slug: Option, +} + +#[derive(Serialize)] +pub(crate) struct CodexAppMentionedEventRequest { + pub(crate) event_type: &'static str, + pub(crate) event_params: CodexAppMetadata, +} + +#[derive(Serialize)] +pub(crate) struct CodexAppUsedEventRequest { + pub(crate) event_type: &'static str, + pub(crate) event_params: CodexAppMetadata, +} + +#[derive(Serialize)] +pub(crate) struct CodexPluginMetadata { + pub(crate) plugin_id: Option, + pub(crate) plugin_name: Option, + pub(crate) marketplace_name: Option, + pub(crate) has_skills: Option, + pub(crate) mcp_server_count: Option, + pub(crate) connector_ids: Option>, + pub(crate) product_client_id: Option, +} + +#[derive(Serialize)] +pub(crate) struct CodexPluginUsedMetadata { + #[serde(flatten)] + pub(crate) plugin: CodexPluginMetadata, + pub(crate) thread_id: Option, + pub(crate) turn_id: Option, + pub(crate) model_slug: Option, +} + +#[derive(Serialize)] +pub(crate) struct CodexPluginEventRequest { + pub(crate) event_type: &'static str, + pub(crate) event_params: CodexPluginMetadata, +} + +#[derive(Serialize)] +pub(crate) struct CodexPluginUsedEventRequest { + pub(crate) event_type: &'static str, + pub(crate) event_params: CodexPluginUsedMetadata, +} + +pub(crate) fn plugin_state_event_type(state: PluginState) -> &'static str { + match state { + PluginState::Installed => "codex_plugin_installed", + PluginState::Uninstalled => "codex_plugin_uninstalled", + PluginState::Enabled => "codex_plugin_enabled", + PluginState::Disabled => "codex_plugin_disabled", + } +} + +pub(crate) fn codex_app_metadata( + tracking: &TrackEventsContext, + app: AppInvocation, +) -> CodexAppMetadata { + CodexAppMetadata { + connector_id: app.connector_id, + thread_id: Some(tracking.thread_id.clone()), + turn_id: Some(tracking.turn_id.clone()), + app_name: app.app_name, + product_client_id: Some(originator().value), + invoke_type: app.invocation_type, + model_slug: Some(tracking.model_slug.clone()), + } +} + +pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPluginMetadata { + let capability_summary = plugin.capability_summary; + CodexPluginMetadata { + plugin_id: Some(plugin.plugin_id.as_key()), + plugin_name: Some(plugin.plugin_id.plugin_name), + marketplace_name: Some(plugin.plugin_id.marketplace_name), + has_skills: capability_summary + .as_ref() + .map(|summary| summary.has_skills), + mcp_server_count: capability_summary + .as_ref() + .map(|summary| summary.mcp_server_names.len()), + connector_ids: capability_summary.map(|summary| { + summary + .app_connector_ids + .into_iter() + .map(|connector_id| connector_id.0) + .collect() + }), + product_client_id: Some(originator().value), + } +} + +pub(crate) fn codex_plugin_used_metadata( + tracking: &TrackEventsContext, + plugin: PluginTelemetryMetadata, +) -> CodexPluginUsedMetadata { + CodexPluginUsedMetadata { + plugin: codex_plugin_metadata(plugin), + thread_id: Some(tracking.thread_id.clone()), + turn_id: Some(tracking.turn_id.clone()), + model_slug: Some(tracking.model_slug.clone()), + } +} + +pub(crate) fn thread_source_name(thread_source: &SessionSource) -> Option<&'static str> { + match thread_source { + SessionSource::Cli | SessionSource::VSCode | SessionSource::Exec => Some("user"), + SessionSource::SubAgent(_) => Some("subagent"), + SessionSource::Mcp | SessionSource::Custom(_) | SessionSource::Unknown => None, + } +} + +pub(crate) fn current_runtime_metadata() -> CodexRuntimeMetadata { + let os_info = os_info::get(); + CodexRuntimeMetadata { + codex_rs_version: env!("CARGO_PKG_VERSION").to_string(), + runtime_os: std::env::consts::OS.to_string(), + runtime_os_version: os_info.version().to_string(), + runtime_arch: std::env::consts::ARCH.to_string(), + } +} diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs new file mode 100644 index 00000000000..31b8516e858 --- /dev/null +++ b/codex-rs/analytics/src/facts.rs @@ -0,0 +1,116 @@ +use crate::events::AppServerRpcTransport; +use crate::events::CodexRuntimeMetadata; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::ClientResponse; +use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerNotification; +use codex_plugin::PluginTelemetryMetadata; +use codex_protocol::protocol::SkillScope; +use serde::Serialize; +use std::path::PathBuf; + +#[derive(Clone)] +pub struct TrackEventsContext { + pub model_slug: String, + pub thread_id: String, + pub turn_id: String, +} + +pub fn build_track_events_context( + model_slug: String, + thread_id: String, + turn_id: String, +) -> TrackEventsContext { + TrackEventsContext { + model_slug, + thread_id, + turn_id, + } +} + +#[derive(Clone, Debug)] +pub struct SkillInvocation { + pub skill_name: String, + pub skill_scope: SkillScope, + pub skill_path: PathBuf, + pub invocation_type: InvocationType, +} + +#[derive(Clone, Copy, Debug, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum InvocationType { + Explicit, + Implicit, +} + +pub struct AppInvocation { + pub connector_id: Option, + pub app_name: Option, + pub invocation_type: Option, +} + +#[allow(dead_code)] +pub(crate) enum AnalyticsFact { + Initialize { + connection_id: u64, + params: InitializeParams, + product_client_id: String, + runtime: CodexRuntimeMetadata, + rpc_transport: AppServerRpcTransport, + }, + Request { + connection_id: u64, + request_id: RequestId, + request: Box, + }, + Response { + connection_id: u64, + response: Box, + }, + Notification(Box), + // Facts that do not naturally exist on the app-server protocol surface, or + // would require non-trivial protocol reshaping on this branch. + Custom(CustomAnalyticsFact), +} + +pub(crate) enum CustomAnalyticsFact { + SkillInvoked(SkillInvokedInput), + AppMentioned(AppMentionedInput), + AppUsed(AppUsedInput), + PluginUsed(PluginUsedInput), + PluginStateChanged(PluginStateChangedInput), +} + +pub(crate) struct SkillInvokedInput { + pub tracking: TrackEventsContext, + pub invocations: Vec, +} + +pub(crate) struct AppMentionedInput { + pub tracking: TrackEventsContext, + pub mentions: Vec, +} + +pub(crate) struct AppUsedInput { + pub tracking: TrackEventsContext, + pub app: AppInvocation, +} + +pub(crate) struct PluginUsedInput { + pub tracking: TrackEventsContext, + pub plugin: PluginTelemetryMetadata, +} + +pub(crate) struct PluginStateChangedInput { + pub plugin: PluginTelemetryMetadata, + pub state: PluginState, +} + +#[derive(Clone, Copy)] +pub(crate) enum PluginState { + Installed, + Uninstalled, + Enabled, + Disabled, +} diff --git a/codex-rs/analytics/src/lib.rs b/codex-rs/analytics/src/lib.rs index 5b15d2830a6..6f927d09c01 100644 --- a/codex-rs/analytics/src/lib.rs +++ b/codex-rs/analytics/src/lib.rs @@ -1,17 +1,15 @@ -mod analytics_client; +mod client; +mod events; +mod facts; +mod reducer; -pub use analytics_client::AnalyticsEventsClient; -pub use analytics_client::AnalyticsFact; -pub use analytics_client::AnalyticsReducer; -pub use analytics_client::AppInvocation; -pub use analytics_client::AppMentionedInput; -pub use analytics_client::AppUsedInput; -pub use analytics_client::CustomAnalyticsFact; -pub use analytics_client::InvocationType; -pub use analytics_client::PluginState; -pub use analytics_client::PluginStateChangedInput; -pub use analytics_client::PluginUsedInput; -pub use analytics_client::SkillInvocation; -pub use analytics_client::SkillInvokedInput; -pub use analytics_client::TrackEventsContext; -pub use analytics_client::build_track_events_context; +pub use client::AnalyticsEventsClient; +pub use events::AppServerRpcTransport; +pub use facts::AppInvocation; +pub use facts::InvocationType; +pub use facts::SkillInvocation; +pub use facts::TrackEventsContext; +pub use facts::build_track_events_context; + +#[cfg(test)] +mod analytics_client_tests; diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs new file mode 100644 index 00000000000..d83cddcbbc9 --- /dev/null +++ b/codex-rs/analytics/src/reducer.rs @@ -0,0 +1,305 @@ +use crate::events::AppServerRpcTransport; +use crate::events::CodexAppMentionedEventRequest; +use crate::events::CodexAppServerClientMetadata; +use crate::events::CodexAppUsedEventRequest; +use crate::events::CodexPluginEventRequest; +use crate::events::CodexPluginUsedEventRequest; +use crate::events::CodexRuntimeMetadata; +use crate::events::SkillInvocationEventParams; +use crate::events::SkillInvocationEventRequest; +use crate::events::ThreadInitializationMode; +use crate::events::ThreadInitializedEvent; +use crate::events::ThreadInitializedEventParams; +use crate::events::TrackEventRequest; +use crate::events::codex_app_metadata; +use crate::events::codex_plugin_metadata; +use crate::events::codex_plugin_used_metadata; +use crate::events::plugin_state_event_type; +use crate::events::thread_source_name; +use crate::facts::AnalyticsFact; +use crate::facts::AppMentionedInput; +use crate::facts::AppUsedInput; +use crate::facts::CustomAnalyticsFact; +use crate::facts::PluginState; +use crate::facts::PluginStateChangedInput; +use crate::facts::PluginUsedInput; +use crate::facts::SkillInvokedInput; +use codex_app_server_protocol::ClientResponse; +use codex_app_server_protocol::InitializeParams; +use codex_git_utils::collect_git_info; +use codex_git_utils::get_git_repo_root; +use codex_login::default_client::originator; +use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SkillScope; +use sha1::Digest; +use std::collections::HashMap; +use std::path::Path; + +#[derive(Default)] +pub(crate) struct AnalyticsReducer { + connections: HashMap, +} + +struct ConnectionState { + app_server_client: CodexAppServerClientMetadata, + runtime: CodexRuntimeMetadata, +} + +impl AnalyticsReducer { + pub(crate) async fn ingest(&mut self, input: AnalyticsFact, out: &mut Vec) { + match input { + AnalyticsFact::Initialize { + connection_id, + params, + product_client_id, + runtime, + rpc_transport, + } => { + self.ingest_initialize( + connection_id, + params, + product_client_id, + runtime, + rpc_transport, + ); + } + AnalyticsFact::Request { + connection_id: _connection_id, + request_id: _request_id, + request: _request, + } => {} + AnalyticsFact::Response { + connection_id, + response, + } => { + self.ingest_response(connection_id, *response, out); + } + AnalyticsFact::Notification(_notification) => {} + AnalyticsFact::Custom(input) => match input { + CustomAnalyticsFact::SkillInvoked(input) => { + self.ingest_skill_invoked(input, out).await; + } + CustomAnalyticsFact::AppMentioned(input) => { + self.ingest_app_mentioned(input, out); + } + CustomAnalyticsFact::AppUsed(input) => { + self.ingest_app_used(input, out); + } + CustomAnalyticsFact::PluginUsed(input) => { + self.ingest_plugin_used(input, out); + } + CustomAnalyticsFact::PluginStateChanged(input) => { + self.ingest_plugin_state_changed(input, out); + } + }, + } + } + + fn ingest_initialize( + &mut self, + connection_id: u64, + params: InitializeParams, + product_client_id: String, + runtime: CodexRuntimeMetadata, + rpc_transport: AppServerRpcTransport, + ) { + self.connections.insert( + connection_id, + ConnectionState { + app_server_client: CodexAppServerClientMetadata { + product_client_id, + client_name: Some(params.client_info.name), + client_version: Some(params.client_info.version), + rpc_transport, + experimental_api_enabled: params + .capabilities + .map(|capabilities| capabilities.experimental_api), + }, + runtime, + }, + ); + } + + async fn ingest_skill_invoked( + &mut self, + input: SkillInvokedInput, + out: &mut Vec, + ) { + let SkillInvokedInput { + tracking, + invocations, + } = input; + for invocation in invocations { + let skill_scope = match invocation.skill_scope { + SkillScope::User => "user", + SkillScope::Repo => "repo", + SkillScope::System => "system", + SkillScope::Admin => "admin", + }; + let repo_root = get_git_repo_root(invocation.skill_path.as_path()); + let repo_url = if let Some(root) = repo_root.as_ref() { + collect_git_info(root) + .await + .and_then(|info| info.repository_url) + } else { + None + }; + let skill_id = skill_id_for_local_skill( + repo_url.as_deref(), + repo_root.as_deref(), + invocation.skill_path.as_path(), + invocation.skill_name.as_str(), + ); + out.push(TrackEventRequest::SkillInvocation( + SkillInvocationEventRequest { + event_type: "skill_invocation", + skill_id, + skill_name: invocation.skill_name.clone(), + event_params: SkillInvocationEventParams { + thread_id: Some(tracking.thread_id.clone()), + invoke_type: Some(invocation.invocation_type), + model_slug: Some(tracking.model_slug.clone()), + product_client_id: Some(originator().value), + repo_url, + skill_scope: Some(skill_scope.to_string()), + }, + }, + )); + } + } + + fn ingest_app_mentioned(&mut self, input: AppMentionedInput, out: &mut Vec) { + let AppMentionedInput { tracking, mentions } = input; + out.extend(mentions.into_iter().map(|mention| { + let event_params = codex_app_metadata(&tracking, mention); + TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest { + event_type: "codex_app_mentioned", + event_params, + }) + })); + } + + fn ingest_app_used(&mut self, input: AppUsedInput, out: &mut Vec) { + let AppUsedInput { tracking, app } = input; + let event_params = codex_app_metadata(&tracking, app); + out.push(TrackEventRequest::AppUsed(CodexAppUsedEventRequest { + event_type: "codex_app_used", + event_params, + })); + } + + fn ingest_plugin_used(&mut self, input: PluginUsedInput, out: &mut Vec) { + let PluginUsedInput { tracking, plugin } = input; + out.push(TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest { + event_type: "codex_plugin_used", + event_params: codex_plugin_used_metadata(&tracking, plugin), + })); + } + + fn ingest_plugin_state_changed( + &mut self, + input: PluginStateChangedInput, + out: &mut Vec, + ) { + let PluginStateChangedInput { plugin, state } = input; + let event = CodexPluginEventRequest { + event_type: plugin_state_event_type(state), + event_params: codex_plugin_metadata(plugin), + }; + out.push(match state { + PluginState::Installed => TrackEventRequest::PluginInstalled(event), + PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event), + PluginState::Enabled => TrackEventRequest::PluginEnabled(event), + PluginState::Disabled => TrackEventRequest::PluginDisabled(event), + }); + } + + fn ingest_response( + &mut self, + connection_id: u64, + response: ClientResponse, + out: &mut Vec, + ) { + let (thread, model, initialization_mode) = match response { + ClientResponse::ThreadStart { response, .. } => ( + response.thread, + response.model, + ThreadInitializationMode::New, + ), + ClientResponse::ThreadResume { response, .. } => ( + response.thread, + response.model, + ThreadInitializationMode::Resumed, + ), + ClientResponse::ThreadFork { response, .. } => ( + response.thread, + response.model, + ThreadInitializationMode::Forked, + ), + _ => return, + }; + let thread_source: SessionSource = thread.source.into(); + let Some(connection_state) = self.connections.get(&connection_id) else { + return; + }; + out.push(TrackEventRequest::ThreadInitialized( + ThreadInitializedEvent { + event_type: "codex_thread_initialized", + event_params: ThreadInitializedEventParams { + thread_id: thread.id, + app_server_client: connection_state.app_server_client.clone(), + runtime: connection_state.runtime.clone(), + model, + ephemeral: thread.ephemeral, + thread_source: thread_source_name(&thread_source), + initialization_mode, + subagent_source: None, + parent_thread_id: None, + created_at: u64::try_from(thread.created_at).unwrap_or_default(), + }, + }, + )); + } +} + +pub(crate) fn skill_id_for_local_skill( + repo_url: Option<&str>, + repo_root: Option<&Path>, + skill_path: &Path, + skill_name: &str, +) -> String { + let path = normalize_path_for_skill_id(repo_url, repo_root, skill_path); + let prefix = if let Some(url) = repo_url { + format!("repo_{url}") + } else { + "personal".to_string() + }; + let raw_id = format!("{prefix}_{path}_{skill_name}"); + let mut hasher = sha1::Sha1::new(); + sha1::Digest::update(&mut hasher, raw_id.as_bytes()); + format!("{:x}", sha1::Digest::finalize(hasher)) +} + +/// Returns a normalized path for skill ID construction. +/// +/// - Repo-scoped skills use a path relative to the repo root. +/// - User/admin/system skills use an absolute path. +pub(crate) fn normalize_path_for_skill_id( + repo_url: Option<&str>, + repo_root: Option<&Path>, + skill_path: &Path, +) -> String { + let resolved_path = + std::fs::canonicalize(skill_path).unwrap_or_else(|_| skill_path.to_path_buf()); + match (repo_url, repo_root) { + (Some(_), Some(root)) => { + let resolved_root = std::fs::canonicalize(root).unwrap_or_else(|_| root.to_path_buf()); + resolved_path + .strip_prefix(&resolved_root) + .unwrap_or(resolved_path.as_path()) + .to_string_lossy() + .replace('\\', "/") + } + _ => resolved_path.to_string_lossy().replace('\\', "/"), + } +} diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 2798dcb92e4..e2ee8e7eedc 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -32,6 +32,7 @@ use codex_app_server_protocol::CancelLoginAccountParams; use codex_app_server_protocol::CancelLoginAccountResponse; use codex_app_server_protocol::CancelLoginAccountStatus; use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::ClientResponse; use codex_app_server_protocol::CodexErrorInfo as AppServerCodexErrorInfo; use codex_app_server_protocol::CollaborationModeListParams; use codex_app_server_protocol::CollaborationModeListResponse; @@ -179,6 +180,7 @@ use codex_arg0::Arg0DispatchPaths; use codex_backend_client::Client as BackendClient; use codex_chatgpt::connectors; use codex_cloud_requirements::cloud_requirements_loader; +use codex_core::AnalyticsEventsClient; use codex_core::AuthManager; use codex_core::CodexAuth; use codex_core::CodexThread; @@ -403,6 +405,7 @@ pub(crate) struct CodexMessageProcessor { auth_manager: Arc, thread_manager: Arc, outgoing: Arc, + analytics_events_client: AnalyticsEventsClient, arg0_paths: Arg0DispatchPaths, config: Arc, cli_overrides: Arc>>, @@ -433,6 +436,8 @@ struct ListenerTaskContext { thread_manager: Arc, thread_state_manager: ThreadStateManager, outgoing: Arc, + analytics_events_client: AnalyticsEventsClient, + general_analytics_enabled: bool, thread_watch_manager: ThreadWatchManager, fallback_model_provider: String, codex_home: PathBuf, @@ -455,6 +460,7 @@ pub(crate) struct CodexMessageProcessorArgs { pub(crate) auth_manager: Arc, pub(crate) thread_manager: Arc, pub(crate) outgoing: Arc, + pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) arg0_paths: Arg0DispatchPaths, pub(crate) config: Arc, pub(crate) cli_overrides: Arc>>, @@ -519,6 +525,7 @@ impl CodexMessageProcessor { auth_manager, thread_manager, outgoing, + analytics_events_client, arg0_paths, config, cli_overrides, @@ -531,6 +538,7 @@ impl CodexMessageProcessor { auth_manager, thread_manager, outgoing: outgoing.clone(), + analytics_events_client, arg0_paths, config, cli_overrides, @@ -2086,6 +2094,8 @@ impl CodexMessageProcessor { thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), + analytics_events_client: self.analytics_events_client.clone(), + general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics), thread_watch_manager: self.thread_watch_manager.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.clone(), @@ -2318,6 +2328,17 @@ impl CodexMessageProcessor { sandbox: config_snapshot.sandbox_policy.into(), reasoning_effort: config_snapshot.reasoning_effort, }; + if listener_task_context.general_analytics_enabled { + listener_task_context + .analytics_events_client + .track_response( + request_id.connection_id.0, + ClientResponse::ThreadStart { + request_id: request_id.request_id.clone(), + response: response.clone(), + }, + ); + } listener_task_context .outgoing @@ -3796,6 +3817,15 @@ impl CodexMessageProcessor { sandbox: session_configured.sandbox_policy.into(), reasoning_effort: session_configured.reasoning_effort, }; + if self.config.features.enabled(Feature::GeneralAnalytics) { + self.analytics_events_client.track_response( + request_id.connection_id.0, + ClientResponse::ThreadResume { + request_id: request_id.request_id.clone(), + response: response.clone(), + }, + ); + } self.outgoing.send_response(request_id, response).await; } @@ -4403,6 +4433,15 @@ impl CodexMessageProcessor { sandbox: session_configured.sandbox_policy.into(), reasoning_effort: session_configured.reasoning_effort, }; + if self.config.features.enabled(Feature::GeneralAnalytics) { + self.analytics_events_client.track_response( + request_id.connection_id.0, + ClientResponse::ThreadFork { + request_id: request_id.request_id.clone(), + response: response.clone(), + }, + ); + } self.outgoing.send_response(request_id, response).await; @@ -6984,6 +7023,8 @@ impl CodexMessageProcessor { thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), + analytics_events_client: self.analytics_events_client.clone(), + general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics), thread_watch_manager: self.thread_watch_manager.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.clone(), @@ -7071,6 +7112,8 @@ impl CodexMessageProcessor { thread_manager: Arc::clone(&self.thread_manager), thread_state_manager: self.thread_state_manager.clone(), outgoing: Arc::clone(&self.outgoing), + analytics_events_client: self.analytics_events_client.clone(), + general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics), thread_watch_manager: self.thread_watch_manager.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.clone(), @@ -7102,6 +7145,8 @@ impl CodexMessageProcessor { outgoing, thread_manager, thread_state_manager, + analytics_events_client: _, + general_analytics_enabled: _, thread_watch_manager, fallback_model_provider, codex_home, diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 6b96bb4e38f..4f79b1445ae 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -74,6 +74,7 @@ use codex_app_server_protocol::Result; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_arg0::Arg0DispatchPaths; +use codex_core::AppServerRpcTransport; use codex_core::config::Config; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::LoaderOverrides; @@ -393,6 +394,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { config_warnings: args.config_warnings, session_source: args.session_source, enable_codex_api_key_env: args.enable_codex_api_key_env, + rpc_transport: AppServerRpcTransport::InProcess, }); let mut thread_created_rx = processor.thread_created_receiver(); let mut session = ConnectionSessionState::default(); diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 3ecefa83fa7..d2bae7ccd71 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -36,6 +36,7 @@ use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::TextPosition as AppTextPosition; use codex_app_server_protocol::TextRange as AppTextRange; +use codex_core::AppServerRpcTransport; use codex_core::ExecPolicyError; use codex_core::check_execpolicy_for_warnings; use codex_core::config_loader::ConfigLoadError; @@ -623,6 +624,7 @@ pub async fn run_main_with_transport( config_warnings, session_source, enable_codex_api_key_env: false, + rpc_transport: analytics_rpc_transport(transport), }); let mut thread_created_rx = processor.thread_created_receiver(); let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count(); @@ -846,6 +848,13 @@ pub async fn run_main_with_transport( Ok(()) } +fn analytics_rpc_transport(transport: AppServerTransport) -> AppServerRpcTransport { + match transport { + AppServerTransport::Stdio => AppServerRpcTransport::Stdio, + AppServerTransport::WebSocket { .. } => AppServerRpcTransport::Websocket, + } +} + #[cfg(test)] mod tests { use super::LogFormat; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 7f0e1d18f88..33e127dc037 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -56,6 +56,7 @@ use codex_app_server_protocol::experimental_required_message; use codex_arg0::Arg0DispatchPaths; use codex_chatgpt::connectors; use codex_core::AnalyticsEventsClient; +use codex_core::AppServerRpcTransport; use codex_core::AuthManager; use codex_core::ThreadManager; use codex_core::config::Config; @@ -159,9 +160,11 @@ pub(crate) struct MessageProcessor { external_agent_config_api: ExternalAgentConfigApi, fs_api: FsApi, auth_manager: Arc, + analytics_events_client: AnalyticsEventsClient, fs_watch_manager: FsWatchManager, config: Arc, config_warnings: Arc>, + rpc_transport: AppServerRpcTransport, } #[derive(Clone, Debug, Default)] @@ -186,6 +189,7 @@ pub(crate) struct MessageProcessorArgs { pub(crate) config_warnings: Vec, pub(crate) session_source: SessionSource, pub(crate) enable_codex_api_key_env: bool, + pub(crate) rpc_transport: AppServerRpcTransport, } impl MessageProcessor { @@ -205,6 +209,7 @@ impl MessageProcessor { config_warnings, session_source, enable_codex_api_key_env, + rpc_transport, } = args; let auth_manager = AuthManager::shared( config.codex_home.clone(), @@ -242,6 +247,7 @@ impl MessageProcessor { auth_manager: auth_manager.clone(), thread_manager: Arc::clone(&thread_manager), outgoing: outgoing.clone(), + analytics_events_client: analytics_events_client.clone(), arg0_paths, config: Arc::clone(&config), cli_overrides: cli_overrides.clone(), @@ -262,7 +268,7 @@ impl MessageProcessor { loader_overrides, cloud_requirements, thread_manager, - analytics_events_client, + analytics_events_client.clone(), ); let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone()); let fs_api = FsApi::default(); @@ -275,9 +281,11 @@ impl MessageProcessor { external_agent_config_api, fs_api, auth_manager, + analytics_events_client, fs_watch_manager, config, config_warnings: Arc::new(config_warnings), + rpc_transport, } } @@ -547,6 +555,7 @@ impl MessageProcessor { // shared thread when another connected client did not opt into // experimental API). Proposed direction is instance-global first-write-wins // with initialize-time mismatch rejection. + let analytics_initialize_params = params.clone(); let (experimental_api_enabled, opt_out_notification_methods) = match params.capabilities { Some(capabilities) => ( @@ -568,7 +577,7 @@ impl MessageProcessor { session.app_server_client_name = Some(name.clone()); session.client_version = Some(version.clone()); let originator = name.clone(); - if let Err(error) = set_default_originator(originator) { + if let Err(error) = set_default_originator(originator.clone()) { match error { SetOriginatorError::InvalidHeaderValue => { let error = JSONRPCErrorError { @@ -591,6 +600,14 @@ impl MessageProcessor { } } } + if self.config.features.enabled(Feature::GeneralAnalytics) { + self.analytics_events_client.track_initialize( + connection_id.0, + analytics_initialize_params, + originator, + self.rpc_transport, + ); + } set_default_client_residency_requirement(self.config.enforce_residency.value()); let user_agent_suffix = format!("{name}; {version}"); if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() { diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index f52b6affe60..44cbe21b60f 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -20,6 +20,7 @@ use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::UserInput; use codex_arg0::Arg0DispatchPaths; +use codex_core::AppServerRpcTransport; use codex_core::config::Config; use codex_core::config::ConfigBuilder; use codex_core::config_loader::CloudRequirementsLoader; @@ -246,6 +247,7 @@ fn build_test_processor( config_warnings: Vec::new(), session_source: SessionSource::VSCode, enable_codex_api_key_env: false, + rpc_transport: AppServerRpcTransport::Stdio, }); (processor, outgoing_rx) } diff --git a/codex-rs/app-server/tests/suite/v2/analytics.rs b/codex-rs/app-server/tests/suite/v2/analytics.rs index a389ffbe1a3..bd9276ddcd4 100644 --- a/codex-rs/app-server/tests/suite/v2/analytics.rs +++ b/codex-rs/app-server/tests/suite/v2/analytics.rs @@ -1,10 +1,23 @@ use anyhow::Result; +use app_test_support::ChatGptAuthFixture; +use app_test_support::DEFAULT_CLIENT_NAME; +use app_test_support::write_chatgpt_auth; +use codex_core::auth::AuthCredentialsStoreMode; use codex_core::config::ConfigBuilder; use codex_core::config::types::OtelExporterKind; use codex_core::config::types::OtelHttpProtocol; use pretty_assertions::assert_eq; +use serde_json::Value; use std::collections::HashMap; +use std::path::Path; +use std::time::Duration; use tempfile::TempDir; +use tokio::time::timeout; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; +use wiremock::matchers::method; +use wiremock::matchers::path; const SERVICE_VERSION: &str = "0.0.0-test"; @@ -65,3 +78,91 @@ async fn app_server_default_analytics_enabled_with_flag() -> Result<()> { assert_eq!(has_metrics, true); Ok(()) } + +pub(crate) async fn enable_analytics_capture(server: &MockServer, codex_home: &Path) -> Result<()> { + Mock::given(method("POST")) + .and(path("/codex/analytics-events/events")) + .respond_with(ResponseTemplate::new(200)) + .mount(server) + .await; + + write_chatgpt_auth( + codex_home, + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + Ok(()) +} + +pub(crate) async fn wait_for_analytics_payload( + server: &MockServer, + read_timeout: Duration, +) -> Result { + let body = timeout(read_timeout, async { + loop { + let Some(requests) = server.received_requests().await else { + tokio::time::sleep(Duration::from_millis(25)).await; + continue; + }; + if let Some(request) = requests.iter().find(|request| { + request.method == "POST" && request.url.path() == "/codex/analytics-events/events" + }) { + break request.body.clone(); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + }) + .await?; + serde_json::from_slice(&body).map_err(|err| anyhow::anyhow!("invalid analytics payload: {err}")) +} + +pub(crate) fn thread_initialized_event(payload: &Value) -> Result<&Value> { + let events = payload["events"] + .as_array() + .ok_or_else(|| anyhow::anyhow!("analytics payload missing events array"))?; + events + .iter() + .find(|event| event["event_type"] == "codex_thread_initialized") + .ok_or_else(|| anyhow::anyhow!("codex_thread_initialized event should be present")) +} + +pub(crate) fn assert_basic_thread_initialized_event( + event: &Value, + thread_id: &str, + expected_model: &str, + initialization_mode: &str, +) { + assert_eq!(event["event_params"]["thread_id"], thread_id); + assert_eq!( + event["event_params"]["app_server_client"]["product_client_id"], + DEFAULT_CLIENT_NAME + ); + assert_eq!( + event["event_params"]["app_server_client"]["client_name"], + DEFAULT_CLIENT_NAME + ); + assert_eq!( + event["event_params"]["app_server_client"]["rpc_transport"], + "stdio" + ); + assert_eq!(event["event_params"]["model"], expected_model); + assert_eq!(event["event_params"]["ephemeral"], false); + assert_eq!(event["event_params"]["thread_source"], "user"); + assert_eq!( + event["event_params"]["subagent_source"], + serde_json::Value::Null + ); + assert_eq!( + event["event_params"]["parent_thread_id"], + serde_json::Value::Null + ); + assert_eq!( + event["event_params"]["initialization_mode"], + initialization_mode + ); + assert!(event["event_params"]["created_at"].as_u64().is_some()); +} diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index e6be4e5a7cf..dc68992fe4d 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -38,6 +38,11 @@ use wiremock::ResponseTemplate; use wiremock::matchers::method; use wiremock::matchers::path; +use super::analytics::assert_basic_thread_initialized_event; +use super::analytics::enable_analytics_capture; +use super::analytics::thread_initialized_event; +use super::analytics::wait_for_analytics_payload; + const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[tokio::test] @@ -177,6 +182,50 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + + let codex_home = TempDir::new()?; + create_config_toml_with_chatgpt_base_url( + codex_home.path(), + &server.uri(), + &server.uri(), + /*general_analytics_enabled*/ true, + )?; + enable_analytics_capture(&server, codex_home.path()).await?; + + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + /*git_info*/ None, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id, + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?; + let event = thread_initialized_event(&payload)?; + assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "forked"); + Ok(()) +} + #[tokio::test] async fn thread_fork_rejects_unmaterialized_thread() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -249,6 +298,7 @@ async fn thread_fork_surfaces_cloud_requirements_load_errors() -> Result<()> { codex_home.path(), &model_server.uri(), &chatgpt_base_url, + /*general_analytics_enabled*/ false, )?; write_chatgpt_auth( codex_home.path(), @@ -509,7 +559,13 @@ fn create_config_toml_with_chatgpt_base_url( codex_home: &Path, server_uri: &str, chatgpt_base_url: &str, + general_analytics_enabled: bool, ) -> std::io::Result<()> { + let general_analytics_toml = if general_analytics_enabled { + "\n[features]\ngeneral_analytics = true\n".to_string() + } else { + String::new() + }; let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, @@ -521,6 +577,7 @@ sandbox_mode = "read-only" chatgpt_base_url = "{chatgpt_base_url}" model_provider = "mock_provider" +{general_analytics_toml} [model_providers.mock_provider] name = "Mock provider for test" diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 564de50d3ae..3a934513268 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -70,6 +70,11 @@ use wiremock::ResponseTemplate; use wiremock::matchers::method; use wiremock::matchers::path; +use super::analytics::assert_basic_thread_initialized_event; +use super::analytics::enable_analytics_capture; +use super::analytics::thread_initialized_event; +use super::analytics::wait_for_analytics_payload; + const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals."; @@ -150,6 +155,51 @@ async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + + let codex_home = TempDir::new()?; + create_config_toml_with_chatgpt_base_url( + codex_home.path(), + &server.uri(), + &server.uri(), + /*general_analytics_enabled*/ true, + )?; + enable_analytics_capture(&server, codex_home.path()).await?; + + let conversation_id = create_fake_rollout_with_text_elements( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Vec::new(), + Some("mock_provider"), + /*git_info*/ None, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + + let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?; + let event = thread_initialized_event(&payload)?; + assert_basic_thread_initialized_event(event, &thread.id, "gpt-5.2-codex", "resumed"); + Ok(()) +} + #[tokio::test] async fn thread_resume_returns_rollout_history() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -1449,6 +1499,7 @@ async fn thread_resume_surfaces_cloud_requirements_load_errors() -> Result<()> { codex_home.path(), &model_server.uri(), &chatgpt_base_url, + /*general_analytics_enabled*/ false, )?; write_chatgpt_auth( codex_home.path(), @@ -1827,6 +1878,7 @@ model_provider = "mock_provider" [features] personality = true +general_analytics = true [model_providers.mock_provider] name = "Mock provider for test" @@ -1843,7 +1895,13 @@ fn create_config_toml_with_chatgpt_base_url( codex_home: &std::path::Path, server_uri: &str, chatgpt_base_url: &str, + general_analytics_enabled: bool, ) -> std::io::Result<()> { + let general_analytics_toml = if general_analytics_enabled { + "\ngeneral_analytics = true".to_string() + } else { + String::new() + }; let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, @@ -1858,6 +1916,7 @@ model_provider = "mock_provider" [features] personality = true +{general_analytics_toml} [model_providers.mock_provider] name = "Mock provider for test" diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index 1a73444bda5..c684bd23361 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -26,6 +26,7 @@ use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; use std::path::Path; +use std::time::Duration; use tempfile::TempDir; use tokio::time::timeout; use wiremock::Mock; @@ -34,6 +35,11 @@ use wiremock::ResponseTemplate; use wiremock::matchers::method; use wiremock::matchers::path; +use super::analytics::assert_basic_thread_initialized_event; +use super::analytics::enable_analytics_capture; +use super::analytics::thread_initialized_event; +use super::analytics::wait_for_analytics_payload; + const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[tokio::test] @@ -153,6 +159,73 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + + let codex_home = TempDir::new()?; + create_config_toml_with_chatgpt_base_url( + codex_home.path(), + &server.uri(), + &server.uri(), + /*general_analytics_enabled*/ true, + )?; + enable_analytics_capture(&server, codex_home.path()).await?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let req_id = mcp + .send_thread_start_request(ThreadStartParams::default()) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(resp)?; + + let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?; + assert_eq!(payload["events"].as_array().expect("events array").len(), 1); + let event = thread_initialized_event(&payload)?; + assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "new"); + Ok(()) +} + +#[tokio::test] +async fn thread_start_does_not_track_thread_initialized_analytics_without_feature() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + + let codex_home = TempDir::new()?; + create_config_toml_with_chatgpt_base_url( + codex_home.path(), + &server.uri(), + &server.uri(), + /*general_analytics_enabled*/ false, + )?; + enable_analytics_capture(&server, codex_home.path()).await?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let req_id = mcp + .send_thread_start_request(ThreadStartParams::default()) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await??; + let _ = to_response::(resp)?; + + let payload = wait_for_analytics_payload(&server, Duration::from_millis(250)).await; + assert!( + payload.is_err(), + "thread analytics should be gated off when general_analytics is disabled" + ); + Ok(()) +} + #[tokio::test] async fn thread_start_respects_project_config_from_cwd() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -455,6 +528,7 @@ async fn thread_start_surfaces_cloud_requirements_load_errors() -> Result<()> { codex_home.path(), &model_server.uri(), &chatgpt_base_url, + /*general_analytics_enabled*/ false, )?; write_chatgpt_auth( codex_home.path(), @@ -538,7 +612,13 @@ fn create_config_toml_with_chatgpt_base_url( codex_home: &Path, server_uri: &str, chatgpt_base_url: &str, + general_analytics_enabled: bool, ) -> std::io::Result<()> { + let general_analytics_toml = if general_analytics_enabled { + "\n[features]\ngeneral_analytics = true\n".to_string() + } else { + String::new() + }; let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, @@ -550,6 +630,7 @@ sandbox_mode = "read-only" chatgpt_base_url = "{chatgpt_base_url}" model_provider = "mock_provider" +{general_analytics_toml} [model_providers.mock_provider] name = "Mock provider for test" diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index e36f495a1b0..5fee36e1b81 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -392,6 +392,9 @@ "fast_mode": { "type": "boolean" }, + "general_analytics": { + "type": "boolean" + }, "guardian_approval": { "type": "boolean" }, @@ -2034,6 +2037,9 @@ "fast_mode": { "type": "boolean" }, + "general_analytics": { + "type": "boolean" + }, "guardian_approval": { "type": "boolean" }, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 5276b09de37..f43989e84ab 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -133,6 +133,7 @@ pub type CodexConversation = CodexThread; pub use auth::AuthManager; pub use auth::CodexAuth; pub use codex_analytics::AnalyticsEventsClient; +pub use codex_analytics::AppServerRpcTransport; mod default_client_forwarding; /// Default Codex HTTP client headers and reqwest construction. diff --git a/codex-rs/core/tests/suite/plugins.rs b/codex-rs/core/tests/suite/plugins.rs index 78df34652ac..5448e49161f 100644 --- a/codex-rs/core/tests/suite/plugins.rs +++ b/codex-rs/core/tests/suite/plugins.rs @@ -365,13 +365,22 @@ async fn explicit_plugin_mentions_track_plugin_used_analytics() -> Result<()> { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; let deadline = Instant::now() + Duration::from_secs(10); - let analytics_request = loop { + let plugin_event = loop { let requests = server.received_requests().await.unwrap_or_default(); - if let Some(request) = requests + if let Some(event) = requests .into_iter() - .find(|request| request.url.path() == "/codex/analytics-events/events") + .filter(|request| request.url.path() == "/codex/analytics-events/events") + .find_map(|request| { + let payload: serde_json::Value = serde_json::from_slice(&request.body).ok()?; + payload["events"].as_array().and_then(|events| { + events + .iter() + .find(|event| event["event_type"] == "codex_plugin_used") + .cloned() + }) + }) { - break request; + break event; } if Instant::now() >= deadline { panic!("timed out waiting for plugin analytics request"); @@ -379,10 +388,7 @@ async fn explicit_plugin_mentions_track_plugin_used_analytics() -> Result<()> { tokio::time::sleep(Duration::from_millis(50)).await; }; - let payload: serde_json::Value = - serde_json::from_slice(&analytics_request.body).expect("analytics payload"); - let event = &payload["events"][0]; - assert_eq!(event["event_type"], "codex_plugin_used"); + let event = plugin_event; assert_eq!(event["event_params"]["plugin_id"], "sample@test"); assert_eq!(event["event_params"]["plugin_name"], "sample"); assert_eq!(event["event_params"]["marketplace_name"], "test"); diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 71a37cc6aa1..ae1285be4d4 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -440,7 +440,7 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result config_warnings, session_source: SessionSource::Exec, enable_codex_api_key_env: true, - client_name: "codex-exec".to_string(), + client_name: "codex_exec".to_string(), client_version: env!("CARGO_PKG_VERSION").to_string(), experimental_api: true, opt_out_notification_methods: Vec::new(), diff --git a/codex-rs/features/src/lib.rs b/codex-rs/features/src/lib.rs index 7f0d326a068..c87b6dc9ff3 100644 --- a/codex-rs/features/src/lib.rs +++ b/codex-rs/features/src/lib.rs @@ -124,6 +124,8 @@ pub enum Feature { CodexGitCommit, /// Enable runtime metrics snapshots via a manual reader. RuntimeMetrics, + /// Enable thread lifecycle analytics emitted via the app-server analytics pipeline. + GeneralAnalytics, /// Persist rollout metadata to a local SQLite database. Sqlite, /// Enable startup memory extraction and file-backed memory consolidation. @@ -609,6 +611,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::UnderDevelopment, default_enabled: false, }, + FeatureSpec { + id: Feature::GeneralAnalytics, + key: "general_analytics", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::Sqlite, key: "sqlite", diff --git a/codex-rs/features/src/tests.rs b/codex-rs/features/src/tests.rs index 5fdb03a7725..0777262065d 100644 --- a/codex-rs/features/src/tests.rs +++ b/codex-rs/features/src/tests.rs @@ -126,6 +126,12 @@ fn tool_search_is_under_development_and_disabled_by_default() { assert_eq!(Feature::ToolSearch.default_enabled(), false); } +#[test] +fn general_analytics_is_under_development_and_disabled_by_default() { + assert_eq!(Feature::GeneralAnalytics.stage(), Stage::UnderDevelopment); + assert_eq!(Feature::GeneralAnalytics.default_enabled(), false); +} + #[test] fn use_linux_sandbox_bwrap_is_a_removed_feature_key() { assert_eq!(