Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codex-rs/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub use runtime::state_db_path;
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";

pub const LOGS_DB_FILENAME: &str = "logs";
pub const LOGS_DB_VERSION: u32 = 1;
pub const LOGS_DB_VERSION: u32 = 2;
pub const STATE_DB_FILENAME: &str = "state";
pub const STATE_DB_VERSION: u32 = 5;

Expand Down
13 changes: 0 additions & 13 deletions codex-rs/state/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
//! # }
//! ```

use chrono::Duration as ChronoDuration;
use chrono::Utc;
use std::sync::OnceLock;
use std::time::Duration;
use std::time::SystemTime;
Expand Down Expand Up @@ -47,8 +45,6 @@ use crate::StateRuntime;
const LOG_QUEUE_CAPACITY: usize = 512;
const LOG_BATCH_SIZE: usize = 128;
const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(2);
const LOG_RETENTION_DAYS: i64 = 10;

pub struct LogDbLayer {
sender: mpsc::Sender<LogDbCommand>,
process_uuid: String,
Expand All @@ -58,7 +54,6 @@ pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
let process_uuid = current_process_log_uuid().to_string();
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver));
tokio::spawn(run_retention_cleanup(state_db));

LogDbLayer {
sender,
Expand Down Expand Up @@ -337,14 +332,6 @@ async fn flush(state_db: &std::sync::Arc<StateRuntime>, buffer: &mut Vec<LogEntr
let _ = state_db.insert_logs(entries.as_slice()).await;
}

async fn run_retention_cleanup(state_db: std::sync::Arc<StateRuntime>) {
let Some(cutoff) = Utc::now().checked_sub_signed(ChronoDuration::days(LOG_RETENTION_DAYS))
else {
return;
};
let _ = state_db.delete_logs_before(cutoff.timestamp()).await;
}

#[derive(Default)]
struct MessageVisitor {
message: Option<String>,
Expand Down
31 changes: 26 additions & 5 deletions codex-rs/state/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sqlx::Sqlite;
use sqlx::SqliteConnection;
use sqlx::SqlitePool;
use sqlx::migrate::Migrator;
use sqlx::sqlite::SqliteAutoVacuum;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqliteJournalMode;
use sqlx::sqlite::SqlitePoolOptions;
Expand Down Expand Up @@ -100,14 +101,14 @@ impl StateRuntime {
.await;
let state_path = state_db_path(codex_home.as_path());
let logs_path = logs_db_path(codex_home.as_path());
let pool = match open_sqlite(&state_path, &STATE_MIGRATOR).await {
let pool = match open_state_sqlite(&state_path, &STATE_MIGRATOR).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open state db at {}: {err}", state_path.display());
return Err(err);
}
};
let logs_pool = match open_sqlite(&logs_path, &LOGS_MIGRATOR).await {
let logs_pool = match open_logs_sqlite(&logs_path, &LOGS_MIGRATOR).await {
Ok(db) => Arc::new(db),
Err(err) => {
warn!("failed to open logs db at {}: {err}", logs_path.display());
Expand All @@ -120,6 +121,12 @@ impl StateRuntime {
codex_home,
default_provider,
});
if let Err(err) = runtime.run_logs_startup_maintenance().await {
warn!(
"failed to run startup maintenance for logs db at {}: {err}",
logs_path.display(),
);
}
Ok(runtime)
}

Expand All @@ -129,14 +136,28 @@ impl StateRuntime {
}
}

async fn open_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
let options = SqliteConnectOptions::new()
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(5))
.log_statements(LevelFilter::Off);
.log_statements(LevelFilter::Off)
}

async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
.await?;
migrator.run(&pool).await?;
Ok(pool)
}

async fn open_logs_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(options)
Expand Down
99 changes: 98 additions & 1 deletion codex-rs/state/src/runtime/logs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;

const LOG_RETENTION_DAYS: i64 = 10;

impl StateRuntime {
pub async fn insert_log(&self, entry: &LogEntry) -> anyhow::Result<()> {
self.insert_logs(std::slice::from_ref(entry)).await
Expand Down Expand Up @@ -291,6 +293,22 @@ WHERE id IN (
Ok(result.rows_affected())
}

pub(crate) async fn run_logs_startup_maintenance(&self) -> anyhow::Result<()> {
let Some(cutoff) =
Utc::now().checked_sub_signed(chrono::Duration::days(LOG_RETENTION_DAYS))
else {
return Ok(());
};
self.delete_logs_before(cutoff.timestamp()).await?;
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(self.logs_pool.as_ref())
.await?;
sqlx::query("PRAGMA incremental_vacuum")
.execute(self.logs_pool.as_ref())
.await?;
Ok(())
}

/// Query logs with optional filters.
pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result<Vec<LogRow>> {
let mut builder = QueryBuilder::<Sqlite>::new(
Expand Down Expand Up @@ -520,6 +538,7 @@ mod tests {
use crate::logs_db_path;
use crate::migrations::LOGS_MIGRATOR;
use crate::state_db_path;
use chrono::Utc;
use pretty_assertions::assert_eq;
use sqlx::SqlitePool;
use sqlx::migrate::Migrator;
Expand Down Expand Up @@ -607,7 +626,7 @@ mod tests {
sqlx::query(
"INSERT INTO logs (ts, ts_nanos, level, target, message, module_path, file, line, thread_id, process_uuid, estimated_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(1_i64)
.bind(Utc::now().timestamp())
.bind(0_i64)
.bind("INFO")
.bind("cli")
Expand Down Expand Up @@ -676,6 +695,84 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}

#[tokio::test]
async fn init_recreates_legacy_logs_db_when_log_version_changes() {
let codex_home = unique_temp_dir();
tokio::fs::create_dir_all(&codex_home)
.await
.expect("create codex home");
let legacy_logs_path = codex_home.join("logs_1.sqlite");
let pool = SqlitePool::connect_with(
SqliteConnectOptions::new()
.filename(&legacy_logs_path)
.create_if_missing(true),
)
.await
.expect("open legacy logs db");
LOGS_MIGRATOR
.run(&pool)
.await
.expect("apply legacy logs schema");
sqlx::query(
"INSERT INTO logs (ts, ts_nanos, level, target, feedback_log_body, module_path, file, line, thread_id, process_uuid, estimated_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(1_i64)
.bind(0_i64)
.bind("INFO")
.bind("cli")
.bind("legacy-log-row")
.bind("mod")
.bind("main.rs")
.bind(7_i64)
.bind("thread-1")
.bind("proc-1")
.bind(16_i64)
.execute(&pool)
.await
.expect("insert legacy log row");
pool.close().await;

let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");

assert!(
!legacy_logs_path.exists(),
"legacy logs db should be removed when the version changes"
);
assert!(
logs_db_path(codex_home.as_path()).exists(),
"current logs db should be recreated during init"
);
assert!(
runtime
.query_logs(&LogQuery::default())
.await
.expect("query recreated logs db")
.is_empty()
);

let _ = tokio::fs::remove_dir_all(codex_home).await;
}

#[tokio::test]
async fn init_configures_logs_db_with_incremental_auto_vacuum() {
let codex_home = unique_temp_dir();
let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");

let pool = open_db_pool(logs_db_path(codex_home.as_path()).as_path()).await;
let auto_vacuum = sqlx::query_scalar::<_, i64>("PRAGMA auto_vacuum")
.fetch_one(&pool)
.await
.expect("read auto_vacuum pragma");
assert_eq!(auto_vacuum, 2);
pool.close().await;

let _ = tokio::fs::remove_dir_all(codex_home).await;
}

#[test]
fn format_feedback_log_line_matches_feedback_formatter_shape() {
assert_eq!(
Expand Down
Loading