Skip to content
This repository was archived by the owner on Feb 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ members = [
"mutable_batch",
"object_store_mem_cache",
"object_store_metrics",
"observability_deps",
"panic_logging",
"parquet_file",
"predicate",
Expand Down
2 changes: 1 addition & 1 deletion arrow_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ comfy-table = { version = "7.1", default-features = false }
hashbrown = { workspace = true }
num-traits = "0.2"
parquet = { workspace = true }
regex = "1.11.1"
regex = "1.11.2"
snafu = "0.8"
uuid = "1"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
Expand Down
2 changes: 1 addition & 1 deletion authz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ assert_matches = "1.5.0"
parking_lot = "0.12.4"
paste = "1.0.15"
test_helpers_authz = { path = "../test_helpers_authz" }
tokio = "1.47.0"
tokio = "1.47.1"

[features]
http = ["dep:http"]
15 changes: 0 additions & 15 deletions backoff/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,6 @@ impl Backoff {
}
}

/// Fade this backoff over to a different backoff config.
pub fn fade_to(&mut self, config: &BackoffConfig) {
// Note: `new` won't have the same RNG, but this doesn't matter
let new = Self::new(config);

*self = Self {
init_backoff: new.init_backoff,
next_backoff_secs: self.next_backoff_secs,
max_backoff_secs: new.max_backoff_secs,
base: new.base,
deadline: new.deadline,
rng: self.rng.take(),
};
}

/// Perform an async operation that retries with a backoff
pub async fn retry_with_backoff<F, F1, B, E>(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions catalog_cache/benches/list_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ fn encode_partition_snapshot(i: usize) -> Bytes {
Default::default(),
Default::default(),
Default::default(),
None, // max_time
);
// Create associated Parquet files:
let parquet_files = vec![
Expand Down
6 changes: 3 additions & 3 deletions client_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ iox_http_util = { path = "../iox_http_util" }
reqwest = { workspace = true, features = ["stream", "rustls-tls-native-roots"] }
# This direct dependency on rustls can probably be removed when tonic is upgraded to 0.13+.
# See <https://github.com/influxdata/influxdb_iox/issues/14683> for more details.
rustls = { version = "0.23", default-features = false, features = ["ring", "std", "logging", "tls12"] }
thiserror = "2.0.12"
tonic = { version = "0.12", features = ["tls", "tls-native-roots"] }
rustls = { version = "0.23", default-features = false }
thiserror = "2.0.16"
tonic = { version = "0.12", features = ["gzip", "tls", "tls-native-roots", "zstd"] }
tower = { workspace = true }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

Expand Down
4 changes: 2 additions & 2 deletions data_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ generated_types = { path = "../generated_types" }
murmur3 = "0.5.2"
tracing = { workspace = true }
ordered-float = "5"
percent-encoding = "2.3.1"
percent-encoding = "2.3.2"
prost = { workspace = true }
schema = { path = "../schema" }
serde_json = "1.0"
Expand All @@ -33,7 +33,7 @@ sqlx = { workspace = true, features = [
"postgres",
"uuid",
] }
thiserror = "2.0.12"
thiserror = "2.0.16"
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

Expand Down
65 changes: 62 additions & 3 deletions data_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ pub use service_limits::*;

use generated_types::google::protobuf as google;
use generated_types::influxdata::iox::{
Target, catalog::v1 as catalog_proto, catalog_storage::v1 as catalog_storage_proto,
schema::v1 as schema_proto, skipped_compaction::v1 as skipped_compaction_proto,
table::v1 as table_proto,
Target, catalog::v1 as catalog_proto, catalog::v2 as catalog_v2_proto,
catalog_storage::v1 as catalog_storage_proto, schema::v1 as schema_proto,
skipped_compaction::v1 as skipped_compaction_proto, table::v1 as table_proto,
};
use schema::TIME_COLUMN_NAME;
use snafu::Snafu;
Expand Down Expand Up @@ -383,6 +383,13 @@ impl NamespaceVersion {
}
}

impl Add<i64> for NamespaceVersion {
type Output = Self;
fn add(self, rhs: i64) -> Self {
Self(self.0 + rhs)
}
}

/// Data object for a namespace
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct Namespace {
Expand Down Expand Up @@ -747,6 +754,8 @@ pub struct TableWithStorage {
pub deleted_at: Option<Timestamp>,
/// The total number of active columns in this table.
pub column_count: i64,
/// Whether this table is enabled for an iceberg export.
pub iceberg_enabled: bool,
}

/// Serialise a [`TableWithStorage`] object into its protobuf representation.
Expand All @@ -760,6 +769,7 @@ impl From<TableWithStorage> for catalog_storage_proto::TableWithStorage {
size_bytes: value.size_bytes,
deleted_at: value.deleted_at.map(google::Timestamp::from),
column_count: value.column_count,
iceberg_enabled: value.iceberg_enabled,
}
}
}
Expand All @@ -784,6 +794,7 @@ impl TryFrom<catalog_storage_proto::TableWithStorage> for TableWithStorage {
size_bytes: value.size_bytes,
deleted_at: value.deleted_at.map(Timestamp::from),
column_count: value.column_count,
iceberg_enabled: value.iceberg_enabled,
})
}
}
Expand Down Expand Up @@ -961,6 +972,52 @@ impl From<skipped_compaction_proto::SkippedCompaction> for SkippedCompaction {
}
}

/// Pre-computed retention info for efficient retention queries on a partition.
/// Stores the computed retention boundaries for both parquet files and partitions
/// to eliminate O(namespaces × partitions) retention query complexity.
#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::FromRow)]
pub struct PartitionRetention {
/// The partition this retention info applies to
pub partition_id: PartitionId,
/// The namespace this partition belongs to
pub namespace_id: NamespaceId,
/// Cached namespace retention period to avoid joins.
/// NULL when we don't want to slow down partition creation.
pub retention_period_ns: Option<i64>,
/// When files in this partition start becoming eligible for deletion.
/// NULL indicates this needs to be computed.
pub retention_start_time_ns: Option<i64>,
/// When files in this partition finish becoming eligible for deletion.
/// Currently always NULL for parquet file retention.
pub retention_end_time_ns: Option<i64>,
}

impl From<PartitionRetention> for catalog_v2_proto::PartitionRetention {
fn from(retention: PartitionRetention) -> Self {
Self {
partition_id: retention.partition_id.get(),
namespace_id: retention.namespace_id.get(),
retention_period_ns: retention.retention_period_ns,
retention_start_time_ns: retention.retention_start_time_ns,
retention_end_time_ns: retention.retention_end_time_ns,
}
}
}

impl TryFrom<catalog_v2_proto::PartitionRetention> for PartitionRetention {
type Error = &'static str;

fn try_from(retention: catalog_v2_proto::PartitionRetention) -> Result<Self, Self::Error> {
Ok(Self {
partition_id: PartitionId::new(retention.partition_id),
namespace_id: NamespaceId::new(retention.namespace_id),
retention_period_ns: retention.retention_period_ns,
retention_start_time_ns: retention.retention_start_time_ns,
retention_end_time_ns: retention.retention_end_time_ns,
})
}
}

/// Whether the file was created via bulk ingest or not (For now. This may be expanded to
/// distinguish between ingester and compactor in the future).
///
Expand Down Expand Up @@ -3440,6 +3497,7 @@ mod tests {
size_bytes: 1,
deleted_at: None,
column_count: 1,
iceberg_enabled: false,
};
let catalog_proto_table =
catalog_storage_proto::TableWithStorage::from(table_active.clone());
Expand All @@ -3455,6 +3513,7 @@ mod tests {
size_bytes: 2,
deleted_at: Some(Timestamp::new(1_000_000_001)),
column_count: 2,
iceberg_enabled: false,
};
let catalog_proto_table =
catalog_storage_proto::TableWithStorage::from(table_deleted.clone());
Expand Down
6 changes: 6 additions & 0 deletions data_types/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,10 @@ pub struct Partition {
/// The time at which this partition was created, or `None` if this partition was created before
/// this field existed.
created_at: Option<Timestamp>,

/// The maximum time of data that can exist in this partition based on the partition template.
/// This is calculated when the partition is created and used for efficient retention queries.
pub max_time: Option<i64>,
}

impl Partition {
Expand All @@ -627,6 +631,7 @@ impl Partition {
new_file_at: Option<Timestamp>,
cold_compact_at: Option<Timestamp>,
created_at: Option<Timestamp>,
max_time: Option<i64>,
) -> Self {
Self {
id,
Expand All @@ -637,6 +642,7 @@ impl Partition {
new_file_at,
cold_compact_at,
created_at,
max_time,
}
}

Expand Down
4 changes: 2 additions & 2 deletions data_types/src/partition_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ use std::{
cmp::min,
collections::{HashMap, HashSet},
fmt::{Display, Formatter, Write},
ops::{Add, Range},
ops::Range,
sync::{Arc, LazyLock},
};

Expand Down Expand Up @@ -1470,7 +1470,7 @@ impl PartitionDuration {
}
}

impl<Tz: chrono::TimeZone> Add<PartitionDuration> for chrono::DateTime<Tz> {
impl<Tz: chrono::TimeZone> std::ops::Add<PartitionDuration> for chrono::DateTime<Tz> {
type Output = Self;
/// Add a [`PartitionDuration`] to a [`chrono::DateTime`].
fn add(self, rhs: PartitionDuration) -> Self::Output {
Expand Down
Loading