diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 5923bad7334..c507e8a50b2 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -550,7 +550,7 @@ pub async fn command(config: Config) -> Result<()> { .register_node( &config.node_identifier_prefix, num_cpus as u64, - influxdb3_catalog::log::NodeMode::Core, + vec![influxdb3_catalog::log::NodeMode::Core], ) .await?; let node_def = catalog diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 72e40822315..b8fe6f3de35 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -6,7 +6,7 @@ use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use iox_time::{Time, TimeProvider}; use object_store::ObjectStore; -use observability_deps::tracing::{debug, info, warn}; +use observability_deps::tracing::{debug, info, trace, warn}; use parking_lot::RwLock; use schema::{Schema, SchemaBuilder}; use serde::{Deserialize, Serialize}; @@ -174,7 +174,7 @@ impl Catalog { // will be the sequence number that the catalog is updated to. let mut permit = CATALOG_WRITE_PERMIT.lock().await; *permit = self.sequence_number().next(); - debug!( + trace!( next_sequence = permit.get(), "got permit to write to catalog" ); @@ -198,10 +198,6 @@ impl Catalog { ) -> CatalogBatch { let batch_sequence = batch.sequence_number().get(); let current_sequence = self.sequence_number().get(); - debug!( - batch_sequence, - current_sequence, "apply ordered catalog batch" - ); assert_eq!( batch_sequence, current_sequence + 1, @@ -421,7 +417,7 @@ impl InnerCatalog { sequence: CatalogSequenceNumber, ) -> Result> { debug!( - ?catalog_batch, + n_ops = catalog_batch.n_ops(), current_sequence = self.sequence_number().get(), verified_sequence = sequence.get(), "verify catalog batch" @@ -437,7 +433,7 @@ impl InnerCatalog { sequence: CatalogSequenceNumber, ) -> Result> { debug!( - ?catalog_batch, + n_ops = catalog_batch.n_ops(), current_sequence = self.sequence_number().get(), applied_sequence = sequence.get(), "apply catalog batch" @@ -464,16 +460,7 @@ impl InnerCatalog { Ok(updated.then(|| { if apply_changes { - debug!( - sequence = sequence.get(), - "catalog batch applied, updating sequence" - ); self.sequence = sequence; - } else { - debug!( - sequence = sequence.get(), - "catalog batch verified, will update sequence" - ); } OrderedCatalogBatch::new(catalog_batch.clone(), sequence) })) @@ -497,7 +484,7 @@ impl InnerCatalog { let new_node = Arc::new(NodeDefinition { node_id: Arc::clone(node_id), instance_id: Arc::clone(instance_id), - mode: *mode, + mode: mode.clone(), core_count: *core_count, state: NodeState::Running { registered_time_ns: *registered_time_ns, @@ -585,7 +572,7 @@ fn check_overall_table_count( pub struct NodeDefinition { pub(crate) node_id: Arc, pub(crate) instance_id: Arc, - pub(crate) mode: NodeMode, + pub(crate) mode: Vec, pub(crate) core_count: u64, pub(crate) state: NodeState, } @@ -641,7 +628,7 @@ impl DatabaseSchema { db_schema: &DatabaseSchema, database_batch: &DatabaseBatch, ) -> Result> { - debug!( + trace!( name = ?db_schema.name, deleted = ?db_schema.deleted, full_batch = ?database_batch, diff --git a/influxdb3_catalog/src/catalog/update.rs b/influxdb3_catalog/src/catalog/update.rs index 8928737dd38..708b8f9105d 100644 --- a/influxdb3_catalog/src/catalog/update.rs +++ b/influxdb3_catalog/src/catalog/update.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use hashbrown::HashMap; use influxdb3_id::ColumnId; -use observability_deps::tracing::{debug, error, info, warn}; +use observability_deps::tracing::{debug, error, info, trace, warn}; use schema::{InfluxColumnType, InfluxFieldType}; use uuid::Uuid; @@ -91,7 +91,7 @@ impl Catalog { &self, node_id: &str, core_count: u64, - mode: NodeMode, + mode: Vec, ) -> Result> { self.catalog_update_with_retry(|| { let instance_id = if let Some(node) = self.node(node_id) { @@ -129,7 +129,7 @@ impl Catalog { instance_id, registered_time_ns: time_ns, core_count, - mode, + mode: mode.clone(), })], )) }) @@ -607,17 +607,14 @@ impl Catalog { if let Some((ordered_batch, permit)) = self.get_permit_and_verify_catalog_batch(&batch).await? { - debug!(?ordered_batch, "applied batch and got permit"); match self .persist_ordered_batch_to_object_store(&ordered_batch, &permit) .await? { UpdatePrompt::Retry => { - debug!("retry after catalog persist attempt"); continue; } UpdatePrompt::Applied => { - debug!("catalog persist attempt was applied"); self.apply_ordered_catalog_batch(&ordered_batch, &permit); self.background_checkpoint(&ordered_batch); self.broadcast_update(ordered_batch.clone().into_batch()); @@ -625,7 +622,7 @@ impl Catalog { } } } else { - debug!("applying batch did nothing"); + debug!("verified batch but it does not change the catalog"); return Ok(None); } } @@ -649,7 +646,7 @@ impl Catalog { ordered_batch: &OrderedCatalogBatch, permit: &CatalogWritePermit, ) -> Result { - debug!(?ordered_batch, "persisting ordered batch to store"); + trace!(?ordered_batch, "persisting ordered batch to store"); // TODO: maybe just an error? assert_eq!( ordered_batch.sequence_number(), @@ -704,14 +701,14 @@ impl Catalog { /// Broadcast a `CatalogUpdate` to all subscribed components in the system. fn broadcast_update(&self, update: impl Into) { if let Err(send_error) = self.channel.send(Arc::new(update.into())) { - warn!(?send_error, "nothing listening for catalog updates"); + info!("nothing listening for catalog updates"); + trace!(?send_error, "nothing listening for catalog updates"); } } /// Persist the catalog as a checkpoint in the background if we are at the _n_th sequence /// number. fn background_checkpoint(&self, ordered_batch: &OrderedCatalogBatch) { - debug!("background checkpoint"); if ordered_batch.sequence_number().get() % self.store.checkpoint_interval != 0 { return; } @@ -791,11 +788,15 @@ impl DatabaseCatalogTransaction { match self.database_schema.table_definition(table_name) { Some(def) => Ok(def), None => { - debug!(table_name, "create new table"); let database_id = self.database_schema.id; let database_name = Arc::clone(&self.database_schema.name); let db_schema = Arc::make_mut(&mut self.database_schema); let table_def = db_schema.create_new_empty_table(table_name)?; + debug!( + table_name, + table_id = table_def.table_id.get(), + "create new table" + ); self.ops .push(DatabaseCatalogOp::CreateTable(CreateTableLog { database_id, @@ -834,7 +835,12 @@ impl DatabaseCatalogTransaction { let table_name = Arc::clone(&table_def.table_name); let mut table_def = table_def.as_ref().clone(); let new_col_id = table_def.add_column(column_name.into(), column_type.into())?; - debug!(next_col_id = table_def.next_column_id.get(), "next col id"); + debug!( + table_name = table_name.as_ref(), + column_name, + column_id = new_col_id.get(), + "create new column" + ); self.ops.push(DatabaseCatalogOp::AddFields(AddFieldsLog { database_name, database_id, diff --git a/influxdb3_catalog/src/log.rs b/influxdb3_catalog/src/log.rs index a2530f6b6d5..c8fd462be3f 100644 --- a/influxdb3_catalog/src/log.rs +++ b/influxdb3_catalog/src/log.rs @@ -45,6 +45,13 @@ impl CatalogBatch { }) } + pub fn n_ops(&self) -> usize { + match self { + CatalogBatch::Node(node_batch) => node_batch.ops.len(), + CatalogBatch::Database(database_batch) => database_batch.ops.len(), + } + } + pub fn as_database(&self) -> Option<&DatabaseBatch> { match self { CatalogBatch::Node(_) => None, @@ -158,7 +165,7 @@ pub struct RegisterNodeLog { pub instance_id: Arc, pub registered_time_ns: i64, pub core_count: u64, - pub mode: NodeMode, + pub mode: Vec, } #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] diff --git a/influxdb3_catalog/src/object_store.rs b/influxdb3_catalog/src/object_store.rs index 1947e989e91..a06665f0898 100644 --- a/influxdb3_catalog/src/object_store.rs +++ b/influxdb3_catalog/src/object_store.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use futures::{StreamExt, stream::FuturesOrdered}; use object_store::ObjectStore; use object_store::{PutOptions, path::Path as ObjPath}; -use observability_deps::tracing::{debug, error, info, warn}; +use observability_deps::tracing::{debug, error, info, trace, warn}; use uuid::Uuid; use crate::catalog::InnerCatalog; @@ -150,7 +150,11 @@ impl ObjectStoreCatalog { ); for ordered_catalog_batch in catalog_files { - debug!(?ordered_catalog_batch, "processing catalog file"); + debug!( + sequence = ordered_catalog_batch.sequence_number().get(), + "processing catalog file" + ); + trace!(?ordered_catalog_batch, "processing catalog file"); inner_catalog .apply_catalog_batch( ordered_catalog_batch.batch(), @@ -158,7 +162,8 @@ impl ObjectStoreCatalog { ) .context("failed to apply persisted catalog batch")?; } - debug!(loaded_catalog = ?inner_catalog, "loaded the catalog"); + debug!("loaded the catalog"); + trace!(loaded_catalog = ?inner_catalog, "loaded the catalog"); Ok(Some(inner_catalog)) } diff --git a/influxdb3_catalog/src/snapshot.rs b/influxdb3_catalog/src/snapshot.rs index 366cf021db1..dd8a2238edc 100644 --- a/influxdb3_catalog/src/snapshot.rs +++ b/influxdb3_catalog/src/snapshot.rs @@ -116,7 +116,7 @@ impl From for InnerCatalog { struct NodeSnapshot { node_id: Arc, instance_id: Arc, - mode: NodeMode, + mode: Vec, state: NodeState, core_count: u64, } @@ -126,7 +126,7 @@ impl From<&NodeDefinition> for NodeSnapshot { Self { node_id: Arc::clone(&node.node_id), instance_id: Arc::clone(&node.instance_id), - mode: node.mode, + mode: node.mode.clone(), state: node.state, core_count: node.core_count, } diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 731ebaa693f..005a3c89888 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -123,7 +123,7 @@ impl WalObjectStore { last_wal_sequence_number: Option, all_wal_file_paths: &[Path], ) -> crate::Result<()> { - debug!(">>> replaying"); + debug!("replaying"); let paths = self.load_existing_wal_file_paths(last_wal_sequence_number, all_wal_file_paths); let last_snapshot_sequence_number = { @@ -378,7 +378,7 @@ impl WalObjectStore { debug!( ?path, ?last_wal_path, - ">>> path and last_wal_path check when replaying" + "path and last_wal_path check when replaying" ); // last_wal_sequence_number that comes from persisted snapshot is // holds the last wal number (inclusive) that has been snapshotted @@ -409,7 +409,7 @@ impl WalObjectStore { ?last, ?curr_num_files, ?self.snapshotted_wal_files_to_keep, - ">>> checking num wal files to delete" + "checking num wal files to delete" ); if curr_num_files > self.snapshotted_wal_files_to_keep { @@ -421,7 +421,7 @@ impl WalObjectStore { ?curr_num_files, ?num_files_to_delete, ?last_to_delete, - ">>> more wal files than num files to keep around" + "more wal files than num files to keep around" ); for idx in oldest..last_to_delete { @@ -429,7 +429,7 @@ impl WalObjectStore { &self.node_identifier_prefix, WalFileSequenceNumber::new(idx), ); - debug!(?path, ">>> deleting wal file"); + debug!(?path, "deleting wal file"); loop { // if there are errors in between we are changing oldest to @@ -475,7 +475,7 @@ fn oldest_wal_file_num(all_wal_file_paths: &[Path]) -> Option>> file name path and wal file name" + "file name path and wal file name" ); WalFileSequenceNumber::from_str(wal_file_name).ok() } @@ -1535,7 +1535,7 @@ mod tests { .await .unwrap(); - debug!(?all_paths, ">>> test: all paths in object store"); + debug!(?all_paths, "test: all paths in object store"); let wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), @@ -1593,10 +1593,7 @@ mod tests { .await .unwrap(); - debug!( - ?all_paths, - ">>> test: all paths in object store after removal" - ); + debug!(?all_paths, "test: all paths in object store after removal"); assert!(object_store.get(&path1).await.ok().is_some()); assert!(object_store.get(&path2).await.ok().is_some()); @@ -1625,10 +1622,7 @@ mod tests { .await .unwrap(); - debug!( - ?all_paths, - ">>> test: all paths in object store after removal" - ); + debug!(?all_paths, "test: all paths in object store after removal"); let err = object_store.get(&path1).await.err().unwrap(); @@ -1683,7 +1677,7 @@ mod tests { .await .unwrap(); - debug!(?all_paths, ">>> test: all paths in object store"); + debug!(?all_paths, "test: all paths in object store"); let wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), diff --git a/influxdb3_wal/src/snapshot_tracker.rs b/influxdb3_wal/src/snapshot_tracker.rs index 30f2cb4ff36..c675094e92e 100644 --- a/influxdb3_wal/src/snapshot_tracker.rs +++ b/influxdb3_wal/src/snapshot_tracker.rs @@ -61,7 +61,7 @@ impl SnapshotTracker { debug!( wal_periods_len = ?self.wal_periods.len(), num_snapshots_after = ?self.number_of_periods_to_snapshot_after(), - ">>> wal periods and snapshots" + "wal periods and snapshots" ); if !self.should_run_snapshot(force_snapshot) { @@ -113,7 +113,7 @@ impl SnapshotTracker { let t = self.wal_periods.last()?.max_time; // round the last timestamp down to the gen1_duration let t = t - (t.get() % self.gen1_duration.as_nanos()); - debug!(timestamp_ns = ?t, gen1_duration_ns = ?self.gen1_duration.as_nanos(), ">>> last timestamp"); + debug!(timestamp_ns = ?t, gen1_duration_ns = ?self.gen1_duration.as_nanos(), "last timestamp"); // any wal period that has data before this time can be snapshot let periods_to_snapshot = self @@ -122,7 +122,7 @@ impl SnapshotTracker { .take_while(|period| period.max_time < t) .cloned() .collect::>(); - debug!(?periods_to_snapshot, ">>> periods to snapshot"); + debug!(?periods_to_snapshot, "periods to snapshot"); let first_wal_file_number = periods_to_snapshot .iter() .peekable() diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 4af28cc2eb2..fc51ca3cddc 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -336,7 +336,7 @@ impl<'a> ChunkFilter<'a> { /// This method analyzes the incoming `exprs` to determine if there are any filters on the /// `time` column and attempt to derive the boundaries on `time` from the query. pub fn new(table_def: &Arc, exprs: &'a [Expr]) -> Result { - debug!(input = ?exprs, ">>> creating chunk filter"); + debug!(input = ?exprs, "creating chunk filter"); let mut time_interval: Option = None; let arrow_schema = table_def.schema.as_arrow(); let time_col_index = arrow_schema diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index aea9892f89c..79fbdeff2ee 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -12,7 +12,7 @@ use influxdb3_id::{DbId, TableId}; use influxdb3_types::http::FieldDataType; use influxdb3_wal::{Field, FieldData, Gen1Duration, Row, TableChunks, WriteBatch}; use iox_time::Time; -use observability_deps::tracing::debug; +use observability_deps::tracing::trace; use schema::TIME_COLUMN_NAME; use super::Error; @@ -97,7 +97,7 @@ impl WriteValidator { /// with name `db_name`. This initializes the database if it does not already exist. pub fn initialize(db_name: NamespaceName<'static>, catalog: Arc) -> Result { let txn = catalog.begin(db_name.as_str())?; - debug!(transaction = ?txn, ">>> initialize write validator"); + trace!(transaction = ?txn, "initialize write validator"); Ok(WriteValidator { state: Initialized { catalog, txn }, })