From b0c412355ac9dec820ef8d6f13604f4388948598 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 31 Mar 2026 14:52:48 +0200 Subject: [PATCH] feat: log db better maintenance --- codex-rs/state/src/lib.rs | 2 +- codex-rs/state/src/log_db.rs | 13 ---- codex-rs/state/src/runtime.rs | 31 ++++++++-- codex-rs/state/src/runtime/logs.rs | 99 +++++++++++++++++++++++++++++- 4 files changed, 125 insertions(+), 20 deletions(-) diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 5929dad947d..ffaa1637e26 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -55,7 +55,7 @@ pub use runtime::state_db_path; pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME"; pub const LOGS_DB_FILENAME: &str = "logs"; -pub const LOGS_DB_VERSION: u32 = 1; +pub const LOGS_DB_VERSION: u32 = 2; pub const STATE_DB_FILENAME: &str = "state"; pub const STATE_DB_VERSION: u32 = 5; diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index 8ec4216659a..e33baae5737 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -18,8 +18,6 @@ //! # } //! ``` -use chrono::Duration as ChronoDuration; -use chrono::Utc; use std::sync::OnceLock; use std::time::Duration; use std::time::SystemTime; @@ -47,8 +45,6 @@ use crate::StateRuntime; const LOG_QUEUE_CAPACITY: usize = 512; const LOG_BATCH_SIZE: usize = 128; const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(2); -const LOG_RETENTION_DAYS: i64 = 10; - pub struct LogDbLayer { sender: mpsc::Sender, process_uuid: String, @@ -58,7 +54,6 @@ pub fn start(state_db: std::sync::Arc) -> LogDbLayer { let process_uuid = current_process_log_uuid().to_string(); let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY); tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver)); - tokio::spawn(run_retention_cleanup(state_db)); LogDbLayer { sender, @@ -337,14 +332,6 @@ async fn flush(state_db: &std::sync::Arc, buffer: &mut Vec) { - let Some(cutoff) = Utc::now().checked_sub_signed(ChronoDuration::days(LOG_RETENTION_DAYS)) - else { - return; - }; - let _ = state_db.delete_logs_before(cutoff.timestamp()).await; -} - #[derive(Default)] struct MessageVisitor { message: Option, diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 645aa426956..3fc524461c4 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -38,6 +38,7 @@ use sqlx::Sqlite; use sqlx::SqliteConnection; use sqlx::SqlitePool; use sqlx::migrate::Migrator; +use sqlx::sqlite::SqliteAutoVacuum; use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteJournalMode; use sqlx::sqlite::SqlitePoolOptions; @@ -100,14 +101,14 @@ impl StateRuntime { .await; let state_path = state_db_path(codex_home.as_path()); let logs_path = logs_db_path(codex_home.as_path()); - let pool = match open_sqlite(&state_path, &STATE_MIGRATOR).await { + let pool = match open_state_sqlite(&state_path, &STATE_MIGRATOR).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open state db at {}: {err}", state_path.display()); return Err(err); } }; - let logs_pool = match open_sqlite(&logs_path, &LOGS_MIGRATOR).await { + let logs_pool = match open_logs_sqlite(&logs_path, &LOGS_MIGRATOR).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open logs db at {}: {err}", logs_path.display()); @@ -120,6 +121,12 @@ impl StateRuntime { codex_home, default_provider, }); + if let Err(err) = runtime.run_logs_startup_maintenance().await { + warn!( + "failed to run startup maintenance for logs db at {}: {err}", + logs_path.display(), + ); + } Ok(runtime) } @@ -129,14 +136,28 @@ impl StateRuntime { } } -async fn open_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result { - let options = SqliteConnectOptions::new() +fn base_sqlite_options(path: &Path) -> SqliteConnectOptions { + SqliteConnectOptions::new() .filename(path) .create_if_missing(true) .journal_mode(SqliteJournalMode::Wal) .synchronous(SqliteSynchronous::Normal) .busy_timeout(Duration::from_secs(5)) - .log_statements(LevelFilter::Off); + .log_statements(LevelFilter::Off) +} + +async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result { + let options = base_sqlite_options(path); + let pool = SqlitePoolOptions::new() + .max_connections(5) + .connect_with(options) + .await?; + migrator.run(&pool).await?; + Ok(pool) +} + +async fn open_logs_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result { + let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental); let pool = SqlitePoolOptions::new() .max_connections(5) .connect_with(options) diff --git a/codex-rs/state/src/runtime/logs.rs b/codex-rs/state/src/runtime/logs.rs index 568795ba125..8cf3fe16df0 100644 --- a/codex-rs/state/src/runtime/logs.rs +++ b/codex-rs/state/src/runtime/logs.rs @@ -1,5 +1,7 @@ use super::*; +const LOG_RETENTION_DAYS: i64 = 10; + impl StateRuntime { pub async fn insert_log(&self, entry: &LogEntry) -> anyhow::Result<()> { self.insert_logs(std::slice::from_ref(entry)).await @@ -291,6 +293,22 @@ WHERE id IN ( Ok(result.rows_affected()) } + pub(crate) async fn run_logs_startup_maintenance(&self) -> anyhow::Result<()> { + let Some(cutoff) = + Utc::now().checked_sub_signed(chrono::Duration::days(LOG_RETENTION_DAYS)) + else { + return Ok(()); + }; + self.delete_logs_before(cutoff.timestamp()).await?; + sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)") + .execute(self.logs_pool.as_ref()) + .await?; + sqlx::query("PRAGMA incremental_vacuum") + .execute(self.logs_pool.as_ref()) + .await?; + Ok(()) + } + /// Query logs with optional filters. pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result> { let mut builder = QueryBuilder::::new( @@ -520,6 +538,7 @@ mod tests { use crate::logs_db_path; use crate::migrations::LOGS_MIGRATOR; use crate::state_db_path; + use chrono::Utc; use pretty_assertions::assert_eq; use sqlx::SqlitePool; use sqlx::migrate::Migrator; @@ -607,7 +626,7 @@ mod tests { sqlx::query( "INSERT INTO logs (ts, ts_nanos, level, target, message, module_path, file, line, thread_id, process_uuid, estimated_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) - .bind(1_i64) + .bind(Utc::now().timestamp()) .bind(0_i64) .bind("INFO") .bind("cli") @@ -676,6 +695,84 @@ mod tests { let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn init_recreates_legacy_logs_db_when_log_version_changes() { + let codex_home = unique_temp_dir(); + tokio::fs::create_dir_all(&codex_home) + .await + .expect("create codex home"); + let legacy_logs_path = codex_home.join("logs_1.sqlite"); + let pool = SqlitePool::connect_with( + SqliteConnectOptions::new() + .filename(&legacy_logs_path) + .create_if_missing(true), + ) + .await + .expect("open legacy logs db"); + LOGS_MIGRATOR + .run(&pool) + .await + .expect("apply legacy logs schema"); + sqlx::query( + "INSERT INTO logs (ts, ts_nanos, level, target, feedback_log_body, module_path, file, line, thread_id, process_uuid, estimated_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(1_i64) + .bind(0_i64) + .bind("INFO") + .bind("cli") + .bind("legacy-log-row") + .bind("mod") + .bind("main.rs") + .bind(7_i64) + .bind("thread-1") + .bind("proc-1") + .bind(16_i64) + .execute(&pool) + .await + .expect("insert legacy log row"); + pool.close().await; + + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("initialize runtime"); + + assert!( + !legacy_logs_path.exists(), + "legacy logs db should be removed when the version changes" + ); + assert!( + logs_db_path(codex_home.as_path()).exists(), + "current logs db should be recreated during init" + ); + assert!( + runtime + .query_logs(&LogQuery::default()) + .await + .expect("query recreated logs db") + .is_empty() + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn init_configures_logs_db_with_incremental_auto_vacuum() { + let codex_home = unique_temp_dir(); + let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("initialize runtime"); + + let pool = open_db_pool(logs_db_path(codex_home.as_path()).as_path()).await; + let auto_vacuum = sqlx::query_scalar::<_, i64>("PRAGMA auto_vacuum") + .fetch_one(&pool) + .await + .expect("read auto_vacuum pragma"); + assert_eq!(auto_vacuum, 2); + pool.close().await; + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[test] fn format_feedback_log_line_matches_feedback_formatter_shape() { assert_eq!(