Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ pub async fn command(config: Config) -> Result<()> {
.register_node(
&config.node_identifier_prefix,
num_cpus as u64,
influxdb3_catalog::log::NodeMode::Core,
vec![influxdb3_catalog::log::NodeMode::Core],
)
.await?;
let node_def = catalog
Expand Down
27 changes: 7 additions & 20 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use iox_time::{Time, TimeProvider};
use object_store::ObjectStore;
use observability_deps::tracing::{debug, info, warn};
use observability_deps::tracing::{debug, info, trace, warn};
use parking_lot::RwLock;
use schema::{Schema, SchemaBuilder};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Catalog {
// will be the sequence number that the catalog is updated to.
let mut permit = CATALOG_WRITE_PERMIT.lock().await;
*permit = self.sequence_number().next();
debug!(
trace!(
next_sequence = permit.get(),
"got permit to write to catalog"
);
Expand All @@ -198,10 +198,6 @@ impl Catalog {
) -> CatalogBatch {
let batch_sequence = batch.sequence_number().get();
let current_sequence = self.sequence_number().get();
debug!(
batch_sequence,
current_sequence, "apply ordered catalog batch"
);
assert_eq!(
batch_sequence,
current_sequence + 1,
Expand Down Expand Up @@ -421,7 +417,7 @@ impl InnerCatalog {
sequence: CatalogSequenceNumber,
) -> Result<Option<OrderedCatalogBatch>> {
debug!(
?catalog_batch,
n_ops = catalog_batch.n_ops(),
current_sequence = self.sequence_number().get(),
verified_sequence = sequence.get(),
"verify catalog batch"
Expand All @@ -437,7 +433,7 @@ impl InnerCatalog {
sequence: CatalogSequenceNumber,
) -> Result<Option<OrderedCatalogBatch>> {
debug!(
?catalog_batch,
n_ops = catalog_batch.n_ops(),
current_sequence = self.sequence_number().get(),
applied_sequence = sequence.get(),
"apply catalog batch"
Expand All @@ -464,16 +460,7 @@ impl InnerCatalog {

Ok(updated.then(|| {
if apply_changes {
debug!(
sequence = sequence.get(),
"catalog batch applied, updating sequence"
);
self.sequence = sequence;
} else {
debug!(
sequence = sequence.get(),
"catalog batch verified, will update sequence"
);
}
OrderedCatalogBatch::new(catalog_batch.clone(), sequence)
}))
Expand All @@ -497,7 +484,7 @@ impl InnerCatalog {
let new_node = Arc::new(NodeDefinition {
node_id: Arc::clone(node_id),
instance_id: Arc::clone(instance_id),
mode: *mode,
mode: mode.clone(),
core_count: *core_count,
state: NodeState::Running {
registered_time_ns: *registered_time_ns,
Expand Down Expand Up @@ -585,7 +572,7 @@ fn check_overall_table_count(
pub struct NodeDefinition {
pub(crate) node_id: Arc<str>,
pub(crate) instance_id: Arc<str>,
pub(crate) mode: NodeMode,
pub(crate) mode: Vec<NodeMode>,
pub(crate) core_count: u64,
pub(crate) state: NodeState,
}
Expand Down Expand Up @@ -641,7 +628,7 @@ impl DatabaseSchema {
db_schema: &DatabaseSchema,
database_batch: &DatabaseBatch,
) -> Result<Option<Self>> {
debug!(
trace!(
name = ?db_schema.name,
deleted = ?db_schema.deleted,
full_batch = ?database_batch,
Expand Down
30 changes: 18 additions & 12 deletions influxdb3_catalog/src/catalog/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use hashbrown::HashMap;
use influxdb3_id::ColumnId;
use observability_deps::tracing::{debug, error, info, warn};
use observability_deps::tracing::{debug, error, info, trace, warn};
use schema::{InfluxColumnType, InfluxFieldType};
use uuid::Uuid;

Expand Down Expand Up @@ -91,7 +91,7 @@ impl Catalog {
&self,
node_id: &str,
core_count: u64,
mode: NodeMode,
mode: Vec<NodeMode>,
) -> Result<Option<OrderedCatalogBatch>> {
self.catalog_update_with_retry(|| {
let instance_id = if let Some(node) = self.node(node_id) {
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Catalog {
instance_id,
registered_time_ns: time_ns,
core_count,
mode,
mode: mode.clone(),
})],
))
})
Expand Down Expand Up @@ -607,25 +607,22 @@ impl Catalog {
if let Some((ordered_batch, permit)) =
self.get_permit_and_verify_catalog_batch(&batch).await?
{
debug!(?ordered_batch, "applied batch and got permit");
match self
.persist_ordered_batch_to_object_store(&ordered_batch, &permit)
.await?
{
UpdatePrompt::Retry => {
debug!("retry after catalog persist attempt");
continue;
}
UpdatePrompt::Applied => {
debug!("catalog persist attempt was applied");
self.apply_ordered_catalog_batch(&ordered_batch, &permit);
self.background_checkpoint(&ordered_batch);
self.broadcast_update(ordered_batch.clone().into_batch());
return Ok(Some(ordered_batch));
}
}
} else {
debug!("applying batch did nothing");
debug!("verified batch but it does not change the catalog");
return Ok(None);
}
}
Expand All @@ -649,7 +646,7 @@ impl Catalog {
ordered_batch: &OrderedCatalogBatch,
permit: &CatalogWritePermit,
) -> Result<UpdatePrompt> {
debug!(?ordered_batch, "persisting ordered batch to store");
trace!(?ordered_batch, "persisting ordered batch to store");
// TODO: maybe just an error?
assert_eq!(
ordered_batch.sequence_number(),
Expand Down Expand Up @@ -704,14 +701,14 @@ impl Catalog {
/// Broadcast a `CatalogUpdate` to all subscribed components in the system.
fn broadcast_update(&self, update: impl Into<CatalogUpdate>) {
if let Err(send_error) = self.channel.send(Arc::new(update.into())) {
warn!(?send_error, "nothing listening for catalog updates");
info!("nothing listening for catalog updates");
trace!(?send_error, "nothing listening for catalog updates");
}
}

/// Persist the catalog as a checkpoint in the background if we are at the _n_th sequence
/// number.
fn background_checkpoint(&self, ordered_batch: &OrderedCatalogBatch) {
debug!("background checkpoint");
if ordered_batch.sequence_number().get() % self.store.checkpoint_interval != 0 {
return;
}
Expand Down Expand Up @@ -791,11 +788,15 @@ impl DatabaseCatalogTransaction {
match self.database_schema.table_definition(table_name) {
Some(def) => Ok(def),
None => {
debug!(table_name, "create new table");
let database_id = self.database_schema.id;
let database_name = Arc::clone(&self.database_schema.name);
let db_schema = Arc::make_mut(&mut self.database_schema);
let table_def = db_schema.create_new_empty_table(table_name)?;
debug!(
table_name,
table_id = table_def.table_id.get(),
"create new table"
);
self.ops
.push(DatabaseCatalogOp::CreateTable(CreateTableLog {
database_id,
Expand Down Expand Up @@ -834,7 +835,12 @@ impl DatabaseCatalogTransaction {
let table_name = Arc::clone(&table_def.table_name);
let mut table_def = table_def.as_ref().clone();
let new_col_id = table_def.add_column(column_name.into(), column_type.into())?;
debug!(next_col_id = table_def.next_column_id.get(), "next col id");
debug!(
table_name = table_name.as_ref(),
column_name,
column_id = new_col_id.get(),
"create new column"
);
self.ops.push(DatabaseCatalogOp::AddFields(AddFieldsLog {
database_name,
database_id,
Expand Down
9 changes: 8 additions & 1 deletion influxdb3_catalog/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ impl CatalogBatch {
})
}

pub fn n_ops(&self) -> usize {
match self {
CatalogBatch::Node(node_batch) => node_batch.ops.len(),
CatalogBatch::Database(database_batch) => database_batch.ops.len(),
}
}

pub fn as_database(&self) -> Option<&DatabaseBatch> {
match self {
CatalogBatch::Node(_) => None,
Expand Down Expand Up @@ -158,7 +165,7 @@ pub struct RegisterNodeLog {
pub instance_id: Arc<str>,
pub registered_time_ns: i64,
pub core_count: u64,
pub mode: NodeMode,
pub mode: Vec<NodeMode>,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
11 changes: 8 additions & 3 deletions influxdb3_catalog/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bytes::Bytes;
use futures::{StreamExt, stream::FuturesOrdered};
use object_store::ObjectStore;
use object_store::{PutOptions, path::Path as ObjPath};
use observability_deps::tracing::{debug, error, info, warn};
use observability_deps::tracing::{debug, error, info, trace, warn};
use uuid::Uuid;

use crate::catalog::InnerCatalog;
Expand Down Expand Up @@ -150,15 +150,20 @@ impl ObjectStoreCatalog {
);

for ordered_catalog_batch in catalog_files {
debug!(?ordered_catalog_batch, "processing catalog file");
debug!(
sequence = ordered_catalog_batch.sequence_number().get(),
"processing catalog file"
);
trace!(?ordered_catalog_batch, "processing catalog file");
inner_catalog
.apply_catalog_batch(
ordered_catalog_batch.batch(),
ordered_catalog_batch.sequence_number(),
)
.context("failed to apply persisted catalog batch")?;
}
debug!(loaded_catalog = ?inner_catalog, "loaded the catalog");
debug!("loaded the catalog");
trace!(loaded_catalog = ?inner_catalog, "loaded the catalog");
Ok(Some(inner_catalog))
}

Expand Down
4 changes: 2 additions & 2 deletions influxdb3_catalog/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl From<CatalogSnapshot> for InnerCatalog {
struct NodeSnapshot {
node_id: Arc<str>,
instance_id: Arc<str>,
mode: NodeMode,
mode: Vec<NodeMode>,
state: NodeState,
core_count: u64,
}
Expand All @@ -126,7 +126,7 @@ impl From<&NodeDefinition> for NodeSnapshot {
Self {
node_id: Arc::clone(&node.node_id),
instance_id: Arc::clone(&node.instance_id),
mode: node.mode,
mode: node.mode.clone(),
state: node.state,
core_count: node.core_count,
}
Expand Down
26 changes: 10 additions & 16 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl WalObjectStore {
last_wal_sequence_number: Option<WalFileSequenceNumber>,
all_wal_file_paths: &[Path],
) -> crate::Result<()> {
debug!(">>> replaying");
debug!("replaying");
let paths = self.load_existing_wal_file_paths(last_wal_sequence_number, all_wal_file_paths);

let last_snapshot_sequence_number = {
Expand Down Expand Up @@ -378,7 +378,7 @@ impl WalObjectStore {
debug!(
?path,
?last_wal_path,
">>> path and last_wal_path check when replaying"
"path and last_wal_path check when replaying"
);
// last_wal_sequence_number that comes from persisted snapshot is
// holds the last wal number (inclusive) that has been snapshotted
Expand Down Expand Up @@ -409,7 +409,7 @@ impl WalObjectStore {
?last,
?curr_num_files,
?self.snapshotted_wal_files_to_keep,
">>> checking num wal files to delete"
"checking num wal files to delete"
);

if curr_num_files > self.snapshotted_wal_files_to_keep {
Expand All @@ -421,15 +421,15 @@ impl WalObjectStore {
?curr_num_files,
?num_files_to_delete,
?last_to_delete,
">>> more wal files than num files to keep around"
"more wal files than num files to keep around"
);

for idx in oldest..last_to_delete {
let path = wal_path(
&self.node_identifier_prefix,
WalFileSequenceNumber::new(idx),
);
debug!(?path, ">>> deleting wal file");
debug!(?path, "deleting wal file");

loop {
// if there are errors in between we are changing oldest to
Expand Down Expand Up @@ -475,7 +475,7 @@ fn oldest_wal_file_num(all_wal_file_paths: &[Path]) -> Option<WalFileSequenceNum
debug!(
?file_name_with_path,
?wal_file_name,
">>> file name path and wal file name"
"file name path and wal file name"
);
WalFileSequenceNumber::from_str(wal_file_name).ok()
}
Expand Down Expand Up @@ -1535,7 +1535,7 @@ mod tests {
.await
.unwrap();

debug!(?all_paths, ">>> test: all paths in object store");
debug!(?all_paths, "test: all paths in object store");

let wal = WalObjectStore::new_without_replay(
Arc::clone(&time_provider),
Expand Down Expand Up @@ -1593,10 +1593,7 @@ mod tests {
.await
.unwrap();

debug!(
?all_paths,
">>> test: all paths in object store after removal"
);
debug!(?all_paths, "test: all paths in object store after removal");

assert!(object_store.get(&path1).await.ok().is_some());
assert!(object_store.get(&path2).await.ok().is_some());
Expand Down Expand Up @@ -1625,10 +1622,7 @@ mod tests {
.await
.unwrap();

debug!(
?all_paths,
">>> test: all paths in object store after removal"
);
debug!(?all_paths, "test: all paths in object store after removal");

let err = object_store.get(&path1).await.err().unwrap();

Expand Down Expand Up @@ -1683,7 +1677,7 @@ mod tests {
.await
.unwrap();

debug!(?all_paths, ">>> test: all paths in object store");
debug!(?all_paths, "test: all paths in object store");

let wal = WalObjectStore::new_without_replay(
Arc::clone(&time_provider),
Expand Down
Loading