diff --git a/Cargo.toml b/Cargo.toml index b4a3d486..5ad039e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ members = [ "mutable_batch", "object_store_mem_cache", "object_store_metrics", - "observability_deps", "panic_logging", "parquet_file", "predicate", diff --git a/arrow_util/Cargo.toml b/arrow_util/Cargo.toml index 8458a19c..aea9d1ff 100644 --- a/arrow_util/Cargo.toml +++ b/arrow_util/Cargo.toml @@ -22,7 +22,7 @@ comfy-table = { version = "7.1", default-features = false } hashbrown = { workspace = true } num-traits = "0.2" parquet = { workspace = true } -regex = "1.11.1" +regex = "1.11.2" snafu = "0.8" uuid = "1" workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/authz/Cargo.toml b/authz/Cargo.toml index 7d377ebe..cc8333fd 100644 --- a/authz/Cargo.toml +++ b/authz/Cargo.toml @@ -28,7 +28,7 @@ assert_matches = "1.5.0" parking_lot = "0.12.4" paste = "1.0.15" test_helpers_authz = { path = "../test_helpers_authz" } -tokio = "1.47.0" +tokio = "1.47.1" [features] http = ["dep:http"] diff --git a/backoff/src/lib.rs b/backoff/src/lib.rs index 41e2aaf3..09321795 100644 --- a/backoff/src/lib.rs +++ b/backoff/src/lib.rs @@ -126,21 +126,6 @@ impl Backoff { } } - /// Fade this backoff over to a different backoff config. - pub fn fade_to(&mut self, config: &BackoffConfig) { - // Note: `new` won't have the same RNG, but this doesn't matter - let new = Self::new(config); - - *self = Self { - init_backoff: new.init_backoff, - next_backoff_secs: self.next_backoff_secs, - max_backoff_secs: new.max_backoff_secs, - base: new.base, - deadline: new.deadline, - rng: self.rng.take(), - }; - } - /// Perform an async operation that retries with a backoff pub async fn retry_with_backoff( &mut self, diff --git a/catalog_cache/benches/list_encode.rs b/catalog_cache/benches/list_encode.rs index 7740e5fc..17a4d009 100644 --- a/catalog_cache/benches/list_encode.rs +++ b/catalog_cache/benches/list_encode.rs @@ -131,6 +131,7 @@ fn encode_partition_snapshot(i: usize) -> Bytes { Default::default(), Default::default(), Default::default(), + None, // max_time ); // Create associated Parquet files: let parquet_files = vec![ diff --git a/client_util/Cargo.toml b/client_util/Cargo.toml index 8d5dc7a9..ff2c1341 100644 --- a/client_util/Cargo.toml +++ b/client_util/Cargo.toml @@ -15,9 +15,9 @@ iox_http_util = { path = "../iox_http_util" } reqwest = { workspace = true, features = ["stream", "rustls-tls-native-roots"] } # This direct dependency on rustls can probably be removed when tonic is upgraded to 0.13+. # See for more details. -rustls = { version = "0.23", default-features = false, features = ["ring", "std", "logging", "tls12"] } -thiserror = "2.0.12" -tonic = { version = "0.12", features = ["tls", "tls-native-roots"] } +rustls = { version = "0.23", default-features = false } +thiserror = "2.0.16" +tonic = { version = "0.12", features = ["gzip", "tls", "tls-native-roots", "zstd"] } tower = { workspace = true } workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index e2c9af59..bdfe3750 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -21,7 +21,7 @@ generated_types = { path = "../generated_types" } murmur3 = "0.5.2" tracing = { workspace = true } ordered-float = "5" -percent-encoding = "2.3.1" +percent-encoding = "2.3.2" prost = { workspace = true } schema = { path = "../schema" } serde_json = "1.0" @@ -33,7 +33,7 @@ sqlx = { workspace = true, features = [ "postgres", "uuid", ] } -thiserror = "2.0.12" +thiserror = "2.0.16" uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 7509ac96..e6473dcf 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -25,9 +25,9 @@ pub use service_limits::*; use generated_types::google::protobuf as google; use generated_types::influxdata::iox::{ - Target, catalog::v1 as catalog_proto, catalog_storage::v1 as catalog_storage_proto, - schema::v1 as schema_proto, skipped_compaction::v1 as skipped_compaction_proto, - table::v1 as table_proto, + Target, catalog::v1 as catalog_proto, catalog::v2 as catalog_v2_proto, + catalog_storage::v1 as catalog_storage_proto, schema::v1 as schema_proto, + skipped_compaction::v1 as skipped_compaction_proto, table::v1 as table_proto, }; use schema::TIME_COLUMN_NAME; use snafu::Snafu; @@ -383,6 +383,13 @@ impl NamespaceVersion { } } +impl Add for NamespaceVersion { + type Output = Self; + fn add(self, rhs: i64) -> Self { + Self(self.0 + rhs) + } +} + /// Data object for a namespace #[derive(Debug, Clone, PartialEq, sqlx::FromRow)] pub struct Namespace { @@ -747,6 +754,8 @@ pub struct TableWithStorage { pub deleted_at: Option, /// The total number of active columns in this table. pub column_count: i64, + /// Whether this table is enabled for an iceberg export. + pub iceberg_enabled: bool, } /// Serialise a [`TableWithStorage`] object into its protobuf representation. @@ -760,6 +769,7 @@ impl From for catalog_storage_proto::TableWithStorage { size_bytes: value.size_bytes, deleted_at: value.deleted_at.map(google::Timestamp::from), column_count: value.column_count, + iceberg_enabled: value.iceberg_enabled, } } } @@ -784,6 +794,7 @@ impl TryFrom for TableWithStorage { size_bytes: value.size_bytes, deleted_at: value.deleted_at.map(Timestamp::from), column_count: value.column_count, + iceberg_enabled: value.iceberg_enabled, }) } } @@ -961,6 +972,52 @@ impl From for SkippedCompaction { } } +/// Pre-computed retention info for efficient retention queries on a partition. +/// Stores the computed retention boundaries for both parquet files and partitions +/// to eliminate O(namespaces × partitions) retention query complexity. +#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::FromRow)] +pub struct PartitionRetention { + /// The partition this retention info applies to + pub partition_id: PartitionId, + /// The namespace this partition belongs to + pub namespace_id: NamespaceId, + /// Cached namespace retention period to avoid joins. + /// NULL when we don't want to slow down partition creation. + pub retention_period_ns: Option, + /// When files in this partition start becoming eligible for deletion. + /// NULL indicates this needs to be computed. + pub retention_start_time_ns: Option, + /// When files in this partition finish becoming eligible for deletion. + /// Currently always NULL for parquet file retention. + pub retention_end_time_ns: Option, +} + +impl From for catalog_v2_proto::PartitionRetention { + fn from(retention: PartitionRetention) -> Self { + Self { + partition_id: retention.partition_id.get(), + namespace_id: retention.namespace_id.get(), + retention_period_ns: retention.retention_period_ns, + retention_start_time_ns: retention.retention_start_time_ns, + retention_end_time_ns: retention.retention_end_time_ns, + } + } +} + +impl TryFrom for PartitionRetention { + type Error = &'static str; + + fn try_from(retention: catalog_v2_proto::PartitionRetention) -> Result { + Ok(Self { + partition_id: PartitionId::new(retention.partition_id), + namespace_id: NamespaceId::new(retention.namespace_id), + retention_period_ns: retention.retention_period_ns, + retention_start_time_ns: retention.retention_start_time_ns, + retention_end_time_ns: retention.retention_end_time_ns, + }) + } +} + /// Whether the file was created via bulk ingest or not (For now. This may be expanded to /// distinguish between ingester and compactor in the future). /// @@ -3440,6 +3497,7 @@ mod tests { size_bytes: 1, deleted_at: None, column_count: 1, + iceberg_enabled: false, }; let catalog_proto_table = catalog_storage_proto::TableWithStorage::from(table_active.clone()); @@ -3455,6 +3513,7 @@ mod tests { size_bytes: 2, deleted_at: Some(Timestamp::new(1_000_000_001)), column_count: 2, + iceberg_enabled: false, }; let catalog_proto_table = catalog_storage_proto::TableWithStorage::from(table_deleted.clone()); diff --git a/data_types/src/partition.rs b/data_types/src/partition.rs index 0e040d5a..5966a756 100644 --- a/data_types/src/partition.rs +++ b/data_types/src/partition.rs @@ -610,6 +610,10 @@ pub struct Partition { /// The time at which this partition was created, or `None` if this partition was created before /// this field existed. created_at: Option, + + /// The maximum time of data that can exist in this partition based on the partition template. + /// This is calculated when the partition is created and used for efficient retention queries. + pub max_time: Option, } impl Partition { @@ -627,6 +631,7 @@ impl Partition { new_file_at: Option, cold_compact_at: Option, created_at: Option, + max_time: Option, ) -> Self { Self { id, @@ -637,6 +642,7 @@ impl Partition { new_file_at, cold_compact_at, created_at, + max_time, } } diff --git a/data_types/src/partition_template.rs b/data_types/src/partition_template.rs index a4eb3ccd..45761892 100644 --- a/data_types/src/partition_template.rs +++ b/data_types/src/partition_template.rs @@ -180,7 +180,7 @@ use std::{ cmp::min, collections::{HashMap, HashSet}, fmt::{Display, Formatter, Write}, - ops::{Add, Range}, + ops::Range, sync::{Arc, LazyLock}, }; @@ -1470,7 +1470,7 @@ impl PartitionDuration { } } -impl Add for chrono::DateTime { +impl std::ops::Add for chrono::DateTime { type Output = Self; /// Add a [`PartitionDuration`] to a [`chrono::DateTime`]. fn add(self, rhs: PartitionDuration) -> Self::Output { diff --git a/data_types/src/snapshot/list.rs b/data_types/src/snapshot/list.rs index c494a26a..c2e59114 100644 --- a/data_types/src/snapshot/list.rs +++ b/data_types/src/snapshot/list.rs @@ -3,8 +3,11 @@ use bytes::Bytes; use prost::Message; use snafu::{Snafu, ensure}; -use std::marker::PhantomData; -use std::ops::Range; +use std::{ + cmp::Ordering, + marker::PhantomData, + ops::{Deref, Range}, +}; use generated_types::influxdata::iox::catalog_cache::v1 as generated; @@ -29,6 +32,51 @@ pub enum Error { }, } +/// How to get the `i64` identifier to enable `MessageList::get_by_id` +pub trait GetId { + /// The ID value for this instance that may be used for lookup by `MessageList::get_by_id` + fn id(&self) -> i64; +} + +/// Ensures a list is sorted by ID, which is necessary for the binary search by ID +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct SortedById { + values: Vec, +} + +impl SortedById { + /// Create a new instance sorted by the ID of the items, which `MessageList` relies on for + /// its implementation of `get_by_id`. + pub fn new(mut values: Vec) -> Self { + values.sort_unstable_by_key(|v| v.id()); + + SortedById { values } + } +} + +impl FromIterator for SortedById { + fn from_iter>(iter: I) -> Self { + Self::new(Vec::from_iter(iter)) + } +} + +impl Deref for SortedById { + type Target = [T]; + + fn deref(&self) -> &Self::Target { + self.values.deref() + } +} + +impl IntoIterator for SortedById { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.values.into_iter() + } +} + /// Error type for [`MessageList`] pub type Result = std::result::Result; @@ -42,22 +90,22 @@ pub type Result = std::result::Result; /// by arrow, that provides this and is designed to be combined with [`prost`]'s support /// for zero-copy decoding of [`Bytes`] #[derive(Clone, Eq, PartialEq)] -pub struct MessageList { +pub struct MessageList { len: usize, offsets: Bytes, values: Bytes, phantom: PhantomData, } -impl MessageList { +impl MessageList { /// Encode `values` to a [`MessageList`] - pub fn encode(values: &[T]) -> Result { + pub fn encode(values: SortedById) -> Result { let cap = (values.len() + 1) * 4; let mut offsets: Vec = Vec::with_capacity(cap); offsets.extend_from_slice(&0_u32.to_le_bytes()); let mut cap = 0; - for x in values { + for x in values.iter() { cap += x.encoded_len(); let offset = u32::try_from(cap).unwrap(); offsets.extend_from_slice(&offset.to_le_bytes()); @@ -99,9 +147,56 @@ impl MessageList { let data = self.values.slice(start..end); Ok(T::decode(data)?) } + + /// Returns the element with ID `id` found via binary search. Implemented by hand rather than + /// calling `std::slice::binary_search_by` to avoid needing to allocate a vector to create a + /// slice to call `binary_search_by` on. This code was largely copied from the stdlib's + /// implementation of `binary_search_by` as of Rust 1.89.0. + pub fn get_by_id(&self, id: i64) -> Result> { + let mut size = self.len(); + if size == 0 { + return Ok(None); + } + let mut base = 0usize; + + // This loop intentionally doesn't have an early exit if the comparison + // returns Equal. We want the number of loop iterations to depend *only* + // on the size of the input slice so that the CPU can reliably predict + // the loop count. + while size > 1 { + let half = size / 2; + let mid = base + half; + + let current_item = self.get(mid)?; + let cmp = current_item.id().cmp(&id); + + // Binary search interacts poorly with branch prediction, so force + // the compiler to use conditional moves if supported by the target + // architecture. + base = std::hint::select_unpredictable(cmp == Ordering::Greater, base, mid); + + // This is imprecise in the case where `size` is odd and the + // comparison returns Greater: the mid element still gets included + // by `size` even though it's known to be larger than the element + // being searched for. + // + // This is fine though: we gain more performance by keeping the + // loop iteration count invariant (and thus predictable) than we + // lose from considering one additional element. + size -= half; + } + + let base_item = self.get(base)?; + let cmp = base_item.id().cmp(&id); + if cmp == Ordering::Equal { + Ok(Some(base_item)) + } else { + Ok(None) + } + } } -impl std::fmt::Debug for MessageList { +impl std::fmt::Debug for MessageList { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut l = f.debug_list(); for idx in 0..self.len() { @@ -111,7 +206,7 @@ impl std::fmt::Debug for MessageList { } } -impl From for MessageList { +impl From for MessageList { fn from(proto: generated::MessageList) -> Self { let len = (proto.offsets.len() / 4).saturating_sub(1); Self { @@ -123,7 +218,7 @@ impl From for MessageList { } } -impl From> for generated::MessageList { +impl From> for generated::MessageList { fn from(value: MessageList) -> Self { Self { offsets: value.offsets, @@ -132,7 +227,7 @@ impl From> for generated::MessageList { } } -impl IntoIterator for MessageList { +impl IntoIterator for MessageList { type Item = Result; type IntoIter = MessageListIter; @@ -146,12 +241,12 @@ impl IntoIterator for MessageList { /// [`Iterator`] for [`MessageList`] #[derive(Debug)] -pub struct MessageListIter { +pub struct MessageListIter { iter: Range, list: MessageList, } -impl Iterator for MessageListIter { +impl Iterator for MessageListIter { type Item = Result; fn next(&mut self) -> Option { @@ -167,18 +262,49 @@ impl Iterator for MessageListIter { mod tests { use super::*; + /// As a simple hack for ease of test setup, the ID of a string is its length. + impl GetId for String { + fn id(&self) -> i64 { + self.len() as i64 + } + } + + impl GetId for i32 { + fn id(&self) -> i64 { + *self as i64 + } + } + #[test] fn test_simple() { - let strings = ["", "test", "foo", "abc", "", "skd"]; - let strings: Vec<_> = strings.into_iter().map(ToString::to_string).collect(); + // Note this list is not sorted by ID (`len`) + let strings = ["", "test", "foo", "abcde", "z", "skedaddle"]; + let strings: SortedById<_> = strings.into_iter().map(ToString::to_string).collect(); - let encoded = MessageList::encode(&strings).unwrap(); + let encoded = MessageList::encode(strings.clone()).unwrap(); - assert_eq!(encoded.get(5).unwrap().as_str(), "skd"); - assert_eq!(encoded.get(2).unwrap().as_str(), "foo"); assert_eq!(encoded.get(0).unwrap().as_str(), ""); + assert_eq!(encoded.get_by_id(0).unwrap().unwrap().as_str(), ""); + + assert_eq!(encoded.get(1).unwrap().as_str(), "z"); + assert_eq!(encoded.get_by_id(1).unwrap().unwrap().as_str(), "z"); + + // there is no value with ID 2 + assert!(encoded.get_by_id(2).unwrap().is_none()); + + assert_eq!(encoded.get(2).unwrap().as_str(), "foo"); + assert_eq!(encoded.get_by_id(3).unwrap().unwrap().as_str(), "foo"); + + assert_eq!(encoded.get(3).unwrap().as_str(), "test"); + assert_eq!(encoded.get_by_id(4).unwrap().unwrap().as_str(), "test"); + + assert_eq!(encoded.get(4).unwrap().as_str(), "abcde"); + assert_eq!(encoded.get_by_id(5).unwrap().unwrap().as_str(), "abcde"); + + assert_eq!(encoded.get(5).unwrap().as_str(), "skedaddle"); + assert_eq!(encoded.get_by_id(9).unwrap().unwrap().as_str(), "skedaddle"); - let decoded: Vec<_> = encoded.clone().into_iter().map(Result::unwrap).collect(); + let decoded: SortedById<_> = encoded.clone().into_iter().map(Result::unwrap).collect(); assert_eq!(strings, decoded); let proto = generated::MessageList::from(encoded.clone()); @@ -189,8 +315,8 @@ mod tests { let invalid = MessageList::::from(proto); invalid.get(2).unwrap_err(); - let strings: Vec = vec![]; - let encoded = MessageList::encode(&strings).unwrap(); + let strings: SortedById = SortedById::new(vec![]); + let encoded = MessageList::encode(strings).unwrap(); assert_eq!(encoded.len(), 0); assert!(encoded.is_empty()); diff --git a/data_types/src/snapshot/namespace.rs b/data_types/src/snapshot/namespace.rs index 50f36b7c..1496dbcf 100644 --- a/data_types/src/snapshot/namespace.rs +++ b/data_types/src/snapshot/namespace.rs @@ -12,7 +12,7 @@ use crate::{ use super::{ hash::{HashBuckets, HashBucketsEncoder}, - list::MessageList, + list::{GetId, MessageList, SortedById}, }; /// Error for [`NamespaceSnapshot`] @@ -57,6 +57,12 @@ pub enum Error { /// Result for [`NamespaceSnapshot`] pub type Result = std::result::Result; +impl GetId for proto::NamespaceTable { + fn id(&self) -> i64 { + self.id + } +} + /// A snapshot of a namespace #[derive(Debug, Clone)] pub struct NamespaceSnapshot { @@ -80,7 +86,7 @@ impl NamespaceSnapshot { tables: impl IntoIterator, generation: u64, ) -> Result { - let mut tables: Vec<_> = tables + let tables: SortedById<_> = tables .into_iter() .map(|t| proto::NamespaceTable { id: t.id.get(), @@ -88,8 +94,6 @@ impl NamespaceSnapshot { deleted_at: t.deleted_at.map(|ts| ts.get()), }) .collect(); - // TODO(marco): wire up binary search to find table by ID - tables.sort_unstable_by_key(|t| t.id); let mut table_names = HashBucketsEncoder::new(tables.len()); for (index, table) in tables.iter().enumerate() { @@ -108,7 +112,7 @@ impl NamespaceSnapshot { deleted_at: namespace.deleted_at, partition_template: namespace.partition_template.as_proto().cloned(), router_version: namespace.router_version, - tables: MessageList::encode(&tables).context(TableEncodeSnafu)?, + tables: MessageList::encode(tables).context(TableEncodeSnafu)?, table_names: table_names.finish(), generation, }) @@ -170,10 +174,30 @@ impl NamespaceSnapshot { }) } + /// Look up a [`NamespaceSnapshotTable`] by `TableId` using binary search of the list of + /// tables. _Does_ include soft-deleted entries. + /// + /// Hard-deleted tables may still appear in the table cache, but should NOT appear in + /// the namespace snapshot's tables, so this method must be used to check actual presence or + /// absence before looking up additional table information in the table cache. + /// + /// # Performance + /// + /// This method decodes each record the binary search needs to check, so may not be appropriate + /// for performance-sensitive use cases. + pub fn lookup_table_by_id(&self, id: TableId) -> Result> { + // This requires that the tables are sorted by ID, which `encode` does. + Ok(self + .tables + .get_by_id(id.get()) + .context(TableDecodeSnafu)? + .map(|t| t.into())) + } + /// Lookup a [`NamespaceSnapshotTable`] by name. Does not include deleted entries. pub fn lookup_table_by_name(&self, name: &str) -> Result> { for idx in self.table_names.lookup(name.as_bytes()) { - let table = self.tables.get(idx).context(TableEncodeSnafu)?; + let table = self.tables.get(idx).context(TableDecodeSnafu)?; if table.name == name.as_bytes() { return Ok(Some(table.into())); } @@ -198,7 +222,7 @@ impl NamespaceSnapshot { .map(|x| Duration::from_nanos(x as _)) } - /// When this file was deleted if any + /// When this namespace was deleted if any pub fn deleted_at(&self) -> Option { self.deleted_at } diff --git a/data_types/src/snapshot/partition.rs b/data_types/src/snapshot/partition.rs index 4644427c..23e30de5 100644 --- a/data_types/src/snapshot/partition.rs +++ b/data_types/src/snapshot/partition.rs @@ -1,11 +1,13 @@ //! Snapshot definition for partitions -use crate::snapshot::list::MessageList; -use crate::snapshot::mask::{BitMask, BitMaskBuilder}; use crate::{ ColumnId, ColumnSet, CompactionLevelProtoError, NamespaceId, ObjectStoreId, ParquetFile, ParquetFileId, ParquetFileSource, Partition, PartitionHashId, PartitionHashIdError, PartitionId, PartitionKey, SkippedCompaction, SortKeyIds, TableId, Timestamp, + snapshot::{ + list::{GetId, MessageList, SortedById}, + mask::{BitMask, BitMaskBuilder}, + }, }; use bytes::Bytes; use generated_types::influxdata::iox::{ @@ -43,6 +45,12 @@ pub enum Error { /// Result for [`PartitionSnapshot`] pub type Result = std::result::Result; +impl GetId for proto::PartitionFile { + fn id(&self) -> i64 { + self.id + } +} + /// A snapshot of a partition /// /// # Soft Deletion @@ -94,7 +102,7 @@ impl PartitionSnapshot { acc }); - let files = files + let files: SortedById<_> = files .into_iter() .map(|file| { let mut mask = BitMaskBuilder::new(columns.len()); @@ -117,7 +125,7 @@ impl PartitionSnapshot { use_numeric_partition_id: Some(file.partition_hash_id.is_none()), } }) - .collect::>(); + .collect(); Ok(Self { generation, @@ -126,7 +134,7 @@ impl PartitionSnapshot { partition_id: partition.id, partition_hash_id: partition.hash_id().cloned(), key: partition.partition_key.as_bytes().to_vec().into(), - files: MessageList::encode(&files).context(FileEncodeSnafu)?, + files: MessageList::encode(files).context(FileEncodeSnafu)?, sort_key: partition.sort_key_ids().cloned().unwrap_or_default(), table_id: partition.table_id, new_file_at: partition.new_file_at, @@ -248,6 +256,7 @@ impl PartitionSnapshot { self.new_file_at, self.cold_compact_at, self.created_at, + None, // max_time - not stored in snapshot (can be computed from partition key) )) } @@ -333,6 +342,7 @@ mod tests { Default::default(), Default::default(), Default::default(), + None, // max_time ); // Create associated Parquet files: let parquet_files = vec![ @@ -409,13 +419,13 @@ mod tests { use_numeric_partition_id: Some(true), ..parquet_file_missing_new_numeric_id_field_proto.clone() }; - - let files = MessageList::encode(&[ + let parquet_files = SortedById::new(vec![ parquet_file_missing_new_numeric_id_field_proto, parquet_file_new_numeric_id_field_false_proto, parquet_file_new_numeric_id_field_true_proto, - ]) - .unwrap(); + ]); + + let files = MessageList::encode(parquet_files).unwrap(); let files_proto: proto::MessageList = files.into(); // Create cached proto for two different Partitions: diff --git a/data_types/src/snapshot/root.rs b/data_types/src/snapshot/root.rs index e72d8835..fa75874a 100644 --- a/data_types/src/snapshot/root.rs +++ b/data_types/src/snapshot/root.rs @@ -7,7 +7,7 @@ use crate::{Namespace, NamespaceId, Timestamp}; use super::{ hash::{HashBuckets, HashBucketsEncoder}, - list::MessageList, + list::{GetId, MessageList, SortedById}, }; /// Error for [`RootSnapshot`] @@ -33,6 +33,12 @@ pub enum Error { /// Result for [`RootSnapshot`] pub type Result = std::result::Result; +impl GetId for proto::RootNamespace { + fn id(&self) -> i64 { + self.id + } +} + /// A snapshot of root. /// /// # Soft Deletion @@ -52,7 +58,7 @@ impl RootSnapshot { namespaces: impl IntoIterator, generation: u64, ) -> Result { - let mut namespaces: Vec<_> = namespaces + let namespaces: SortedById<_> = namespaces .into_iter() .map(|ns| proto::RootNamespace { id: ns.id.get(), @@ -61,9 +67,6 @@ impl RootSnapshot { }) .collect(); - // This sort is required for the binary search in `lookup_namespace_by_id` to function. - namespaces.sort_unstable_by_key(|ns| ns.id); - let mut namespace_names = HashBucketsEncoder::new(namespaces.len()); for (index, ns) in namespaces.iter().enumerate() { // exclude soft-deleted entries from name table @@ -73,7 +76,7 @@ impl RootSnapshot { } Ok(Self { - namespaces: MessageList::encode(&namespaces).context(NamespaceEncodeSnafu)?, + namespaces: MessageList::encode(namespaces).context(NamespaceEncodeSnafu)?, namespace_names: namespace_names.finish(), generation, }) @@ -113,31 +116,11 @@ impl RootSnapshot { /// for performance-sensitive use cases. pub fn lookup_namespace_by_id(&self, id: NamespaceId) -> Result> { // This requires that the namespaces are sorted by ID, which `encode` does. - - // Search through a slice of indices, as there isn't a way to get a slice of `&T` from - // `MessageList` without decoding everything - let indices_to_search: Vec<_> = (0..self.namespaces.len()).collect(); - - let element_idx = indices_to_search.binary_search_by_key(&id.get(), |&idx| { - let namespace_snapshot = self - .namespaces - .get(idx) - // The binary search APIs expect the comparator functions to be infallible. If - // decoding of cache records fails, we have bigger problems than only not being - // able to do a binary search, so go ahead and panic. - .expect("decoding root namespaces for binary search should succeed"); - namespace_snapshot.id - }); - - element_idx - .ok() - .map(|idx| { - self.namespaces - .get(idx) - .context(NamespaceDecodeSnafu) - .map(|t| t.into()) - }) - .transpose() + Ok(self + .namespaces + .get_by_id(id.get()) + .context(NamespaceDecodeSnafu)? + .map(|ns| ns.into())) } /// Lookup a [`RootSnapshotNamespace`] by name. Does not include deleted entries. diff --git a/data_types/src/snapshot/table.rs b/data_types/src/snapshot/table.rs index d69cb4fb..afce4ac2 100644 --- a/data_types/src/snapshot/table.rs +++ b/data_types/src/snapshot/table.rs @@ -1,8 +1,8 @@ //! Snapshot definition for tables -use crate::snapshot::list::MessageList; use crate::{ Column, ColumnId, ColumnTypeProtoError, NamespaceId, Partition, PartitionId, PartitionKey, Table, TableId, Timestamp, + snapshot::list::{GetId, MessageList, SortedById}, }; use bytes::Bytes; use generated_types::influxdata::iox::catalog_cache::v1 as proto; @@ -52,6 +52,18 @@ pub enum Error { /// Result for [`TableSnapshot`] pub type Result = std::result::Result; +impl GetId for proto::TablePartition { + fn id(&self) -> i64 { + self.id + } +} + +impl GetId for proto::TableColumn { + fn id(&self) -> i64 { + self.id + } +} + /// A snapshot of a table #[derive(Debug, Clone)] pub struct TableSnapshot { @@ -74,7 +86,7 @@ impl TableSnapshot { columns: Vec, generation: u64, ) -> Result { - let columns: Vec<_> = columns + let columns: SortedById<_> = columns .into_iter() .map(|c| proto::TableColumn { id: c.id.get(), @@ -83,7 +95,7 @@ impl TableSnapshot { }) .collect(); - let partitions: Vec<_> = partitions + let partitions: SortedById<_> = partitions .into_iter() .map(|p| proto::TablePartition { id: p.id.get(), @@ -95,8 +107,8 @@ impl TableSnapshot { table_id: table.id, namespace_id: table.namespace_id, table_name: table.name.into(), - partitions: MessageList::encode(&partitions).context(PartitionEncodeSnafu)?, - columns: MessageList::encode(&columns).context(ColumnEncodeSnafu)?, + partitions: MessageList::encode(partitions).context(PartitionEncodeSnafu)?, + columns: MessageList::encode(columns).context(ColumnEncodeSnafu)?, partition_template: table.partition_template.as_proto().cloned(), iceberg_enabled: table.iceberg_enabled, generation, @@ -193,6 +205,11 @@ impl TableSnapshot { pub fn table_id(&self) -> TableId { self.table_id } + + /// When this table was deleted if any + pub fn deleted_at(&self) -> Option { + self.deleted_at + } } /// Partition information stored within [`TableSnapshot`] diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 2d8987e3..b98f85e9 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -1,6 +1,6 @@ //! IOx FlightSQL Command structures -use std::{collections::HashMap, fmt::Display}; +use std::{collections::HashMap, fmt::Display, slice}; use arrow::{ array::RecordBatch, @@ -143,7 +143,7 @@ impl Display for PreparedStatementHandle { write!( f, ",{}", - pretty_format_batches(&[batch.clone()]).map_err(|_| std::fmt::Error)? + pretty_format_batches(slice::from_ref(batch)).map_err(|_| std::fmt::Error)? )? }; write!(f, ")") diff --git a/generated_types/Cargo.toml b/generated_types/Cargo.toml index d4ae3445..1c96ac30 100644 --- a/generated_types/Cargo.toml +++ b/generated_types/Cargo.toml @@ -14,7 +14,7 @@ pbjson-types = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } serde = { version = "1.0", features = ["derive"] } -tonic = { version = "0.12", features = ["tls", "tls-roots"] } +tonic = { version = "0.12", features = ["gzip", "tls", "tls-roots", "zstd"] } tracing = { workspace = true } uuid = { version = "1" } workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/generated_types/protos/influxdata/iox/catalog/v2/service.proto b/generated_types/protos/influxdata/iox/catalog/v2/service.proto index ddf45513..fdd3e31b 100644 --- a/generated_types/protos/influxdata/iox/catalog/v2/service.proto +++ b/generated_types/protos/influxdata/iox/catalog/v2/service.proto @@ -29,7 +29,12 @@ service CatalogService { rpc NamespaceDeleteOld(NamespaceDeleteOldRequest) returns (NamespaceDeleteOldResponse); rpc TableCreate(TableCreateRequest) returns (TableCreateResponse); - rpc TableGetById(TableGetByIdRequest) returns (TableGetByIdResponse); + rpc TableGetByIdAndNamespace(TableGetByIdAndNamespaceRequest) returns (TableGetByIdAndNamespaceResponse); + // Returns a table record regardless of its soft-deleted status. Essentially deprecated in favor + // of `TableGetByIdAndNamespace`. + rpc TableGetById(TableGetByIdRequest) returns (TableGetByIdResponse) { + option deprecated = true; + }; rpc TableGetByNamespaceAndName(TableGetByNamespaceAndNameRequest) returns (TableGetByNamespaceAndNameResponse); rpc TableListByNamespaceId(TableListByNamespaceIdRequest) returns (stream TableListByNamespaceIdResponse); rpc TableList(TableListRequest) returns (stream TableListResponse); @@ -48,12 +53,14 @@ service CatalogService { rpc ColumnList(ColumnListRequest) returns (stream ColumnListResponse); rpc PartitionCreateOrGet(PartitionCreateOrGetRequest) returns (PartitionCreateOrGetResponse); + rpc PartitionCreateOrGetBatch(PartitionCreateOrGetBatchRequest) returns (stream PartitionCreateOrGetBatchResponse); rpc PartitionGetByIdBatch(PartitionGetByIdBatchRequest) returns (stream PartitionGetByIdBatchResponse); rpc PartitionListByTableId(PartitionListByTableIdRequest) returns (stream PartitionListByTableIdResponse); rpc PartitionListIds(PartitionListIdsRequest) returns (stream PartitionListIdsResponse); rpc PartitionCasSortKey(PartitionCasSortKeyRequest) returns (PartitionCasSortKeyResponse); rpc PartitionRecordSkippedCompaction(PartitionRecordSkippedCompactionRequest) returns (PartitionRecordSkippedCompactionResponse); rpc PartitionGetInSkippedCompactions(PartitionGetInSkippedCompactionsRequest) returns (stream PartitionGetInSkippedCompactionsResponse); + rpc PartitionGetInSkippedCompactionsV2(stream PartitionGetInSkippedCompactionsV2Request) returns (stream PartitionGetInSkippedCompactionsV2Response); rpc PartitionListSkippedCompactions(PartitionListSkippedCompactionsRequest) returns (stream PartitionListSkippedCompactionsResponse); rpc PartitionDeleteSkippedCompactions(PartitionDeleteSkippedCompactionsRequest) returns (PartitionDeleteSkippedCompactionsResponse); rpc PartitionMostRecentN(PartitionMostRecentNRequest) returns (stream PartitionMostRecentNResponse); @@ -65,6 +72,15 @@ service CatalogService { rpc PartitionSnapshot(PartitionSnapshotRequest) returns (PartitionSnapshotResponse); rpc PartitionMigrate(PartitionMigrateRequest) returns (PartitionMigrateResponse); + // Partition retention methods + rpc PartitionRetentionUpsert(PartitionRetentionUpsertRequest) returns (PartitionRetentionUpsertResponse); + rpc PartitionRetentionGetByPartitionId(PartitionRetentionGetByPartitionIdRequest) returns (PartitionRetentionGetByPartitionIdResponse); + rpc PartitionRetentionListPartitionsNeedingComputation(PartitionRetentionListPartitionsNeedingComputationRequest) returns (stream PartitionRetentionListPartitionsNeedingComputationResponse); + rpc PartitionRetentionReconcileMissingPartitions(PartitionRetentionReconcileMissingPartitionsRequest) returns (PartitionRetentionReconcileMissingPartitionsResponse); + rpc PartitionRetentionListPartitionsForParquetRetention(PartitionRetentionListPartitionsForParquetRetentionRequest) returns (stream PartitionRetentionListPartitionsForParquetRetentionResponse); + rpc PartitionRetentionListPartitionsForPartitionRetention(PartitionRetentionListPartitionsForPartitionRetentionRequest) returns (stream PartitionRetentionListPartitionsForPartitionRetentionResponse); + rpc PartitionRetentionInvalidateByNamespaceId(PartitionRetentionInvalidateByNamespaceIdRequest) returns (PartitionRetentionInvalidateByNamespaceIdResponse); + rpc ParquetFileFlagForDeleteByRetention(ParquetFileFlagForDeleteByRetentionRequest) returns (stream ParquetFileFlagForDeleteByRetentionResponse); rpc ParquetFileDeleteOldIdsOnly(ParquetFileDeleteOldIdsOnlyRequest) returns (stream ParquetFileDeleteOldIdsOnlyResponse); rpc ParquetFileDeleteOldIdsCount(ParquetFileDeleteOldIdsCountRequest) returns (ParquetFileDeleteOldIdsCountResponse); @@ -265,6 +281,16 @@ message TableCreateResponse { Table table = 1; } +message TableGetByIdAndNamespaceRequest { + int64 table_id = 1; + int64 namespace_id = 2; + SoftDeletedRows deleted = 3; +} + +message TableGetByIdAndNamespaceResponse { + Table table = 1; +} + message TableGetByIdRequest { int64 id = 1; } @@ -334,6 +360,7 @@ message TableSoftDeleteRequest { message TableSoftDeleteResponse { Table table = 1; + int64 router_version = 2; } message TableRenameRequest { @@ -343,6 +370,7 @@ message TableRenameRequest { message TableRenameResponse { Table table = 1; + int64 router_version = 2; } message TableUndeleteRequest { @@ -351,6 +379,7 @@ message TableUndeleteRequest { message TableUndeleteResponse { Table table = 1; + int64 router_version = 2; } message ColumnCreateOrGetRequest { @@ -405,6 +434,15 @@ message PartitionCreateOrGetResponse { Partition partition = 1; } +message PartitionCreateOrGetBatchRequest { + repeated string keys = 1; + int64 table_id = 2; +} + +message PartitionCreateOrGetBatchResponse { + Partition partition = 1; +} + message PartitionGetByIdBatchRequest { repeated int64 partition_ids = 1; } @@ -464,10 +502,18 @@ message PartitionGetInSkippedCompactionsRequest { repeated int64 partition_ids = 1; } +message PartitionGetInSkippedCompactionsV2Request { + int64 partition_id = 1; +} + message PartitionGetInSkippedCompactionsResponse { SkippedCompaction skipped_compaction = 1; } +message PartitionGetInSkippedCompactionsV2Response { + SkippedCompaction skipped_compaction = 1; +} + message PartitionListSkippedCompactionsRequest {} message PartitionListSkippedCompactionsResponse { @@ -735,6 +781,7 @@ message Partition { optional int64 new_file_at = 6; optional int64 cold_compact_at = 7; optional int64 created_at = 8; + optional int64 max_time = 9; } message SkippedCompaction { @@ -810,3 +857,74 @@ message ParquetFile { int64 max_l0_created_at = 14; ParquetFileSource source = 17; } + +// Partition retention messages +message PartitionRetention { + int64 partition_id = 1; + int64 namespace_id = 2; + optional int64 retention_period_ns = 3; + optional int64 retention_start_time_ns = 4; + optional int64 retention_end_time_ns = 5; +} + +message PartitionRetentionUpsertRequest { + int64 partition_id = 1; + int64 namespace_id = 2; + optional int64 retention_period_ns = 3; + optional int64 retention_start_time_ns = 4; + optional int64 retention_end_time_ns = 5; +} + +message PartitionRetentionUpsertResponse { + PartitionRetention partition_retention = 1; +} + +message PartitionRetentionGetByPartitionIdRequest { + int64 partition_id = 1; +} + +message PartitionRetentionGetByPartitionIdResponse { + optional PartitionRetention partition_retention = 1; +} + +message PartitionRetentionListPartitionsNeedingComputationRequest { + int32 limit = 1; +} + +message PartitionRetentionListPartitionsNeedingComputationResponse { + int64 partition_id = 1; +} + +message PartitionRetentionReconcileMissingPartitionsRequest { + // Empty - no parameters needed +} + +message PartitionRetentionReconcileMissingPartitionsResponse { + int32 reconciled_count = 1; +} + +message PartitionRetentionListPartitionsForParquetRetentionRequest { + int64 cutoff_time_ns = 1; + int32 limit = 2; +} + +message PartitionRetentionListPartitionsForParquetRetentionResponse { + int64 partition_id = 1; +} + +message PartitionRetentionListPartitionsForPartitionRetentionRequest { + int64 cutoff_time_ns = 1; + int32 limit = 2; +} + +message PartitionRetentionListPartitionsForPartitionRetentionResponse { + int64 partition_id = 1; +} + +message PartitionRetentionInvalidateByNamespaceIdRequest { + int64 namespace_id = 1; +} + +message PartitionRetentionInvalidateByNamespaceIdResponse { + int32 invalidated_count = 1; +} diff --git a/generated_types/protos/influxdata/iox/catalog_storage/v1/service.proto b/generated_types/protos/influxdata/iox/catalog_storage/v1/service.proto index 8cee3822..66ad9ff7 100644 --- a/generated_types/protos/influxdata/iox/catalog_storage/v1/service.proto +++ b/generated_types/protos/influxdata/iox/catalog_storage/v1/service.proto @@ -219,4 +219,6 @@ message TableWithStorage { // Total number of active columns in this table. int64 column_count = 7; + + bool iceberg_enabled = 8; } diff --git a/generated_types/protos/influxdata/iox/table/v1/service.proto b/generated_types/protos/influxdata/iox/table/v1/service.proto index 4bf358a2..842918ef 100644 --- a/generated_types/protos/influxdata/iox/table/v1/service.proto +++ b/generated_types/protos/influxdata/iox/table/v1/service.proto @@ -84,15 +84,54 @@ message Table { optional google.protobuf.Timestamp deleted_at = 6; } +enum TableStatusFilter { + TABLE_STATUS_FILTER_UNSPECIFIED = 0; + TABLE_STATUS_FILTER_ACTIVE = 1; + TABLE_STATUS_FILTER_DELETED = 2; +} + +message TableStatusFilterList { + repeated TableStatusFilter inner = 1; +} + message GetTablesRequest { oneof target { // Name of the namespace to list tables for. string namespace_name = 1 [deprecated = true]; // The unique ID of the namespace to list tables for. int64 namespace_id = 2; - } -} + }; + // How to filter deleted tables in the response of this request + // If there is nothing in this field, the returned tables aren't filtered at all; + // everything is returned. + // If there are items in this field, however, the only tables that are returned + // are those whose statuses are contained in this field. + // + // So, by default, no filtering is done. + // + // But if someone specified [Active, Deleted] in this field, that would return all + // tables which are either active or deleted. At time of writing, that includes all + // tables that can exist. However, at some point in the future, we may want to extend + // the granularity of this endpoint and allow customers to see all tables that are in + // the process of being deleted, or maybe all tables that are in the process of being + // created, or maybe we'll need to add some other status like 'hidden'. Who knows. + // + // With the `SoftDeleted` method of filtering, adding more statuses would cause + // exponential blowup with the number of variants that could occur, since we'd want to + // support any combination of statuses. With this, however, that extension becomes way + // easier and more backwards-compatible. + // + // We also want this to be, essentialy, an optional list because it allows us to be + // be sure that the client is always explicitly specifying their list of filters. If + // they leave a repeated field unspecified, we can't tell whether they just don't know + // about the filters or if they explicitly want no filters. But making it an optional + // allows us to force people to specify their filters, which is very nice for ensuring + // everyone is doing what they want. + // + // It also, for now, allows us to keep backwards-compatibilty, which is important. + optional TableStatusFilterList filters = 3; +} message GetTableRequest { oneof namespace_target { // Name of the namespace to get a table from. @@ -183,6 +222,9 @@ message DisableIcebergResponse {} message DeleteTableRequest { int64 table_id = 1; + // ID of the namespace this table is in. If not specified, the request may be slower as it will + // need to do more lookups. + int64 namespace_id = 2; } message DeleteTableResponse { @@ -192,6 +234,9 @@ message DeleteTableResponse { message RenameTableRequest { int64 table_id = 1; string new_name = 2; + // ID of the namespace this table is in. If not specified, the request may be slower as it will + // need to do more lookups. + int64 namespace_id = 3; } message RenameTableResponse { diff --git a/generated_types/src/lib.rs b/generated_types/src/lib.rs index 4dfcabb7..19bbafe4 100644 --- a/generated_types/src/lib.rs +++ b/generated_types/src/lib.rs @@ -7,6 +7,7 @@ clippy::large_enum_variant, clippy::use_self, clippy::allow_attributes, + clippy::uninlined_format_args, missing_copy_implementations )] diff --git a/influxdb2_client/Cargo.toml b/influxdb2_client/Cargo.toml index 090e0dc7..ddf07547 100644 --- a/influxdb2_client/Cargo.toml +++ b/influxdb2_client/Cargo.toml @@ -13,7 +13,7 @@ bytes = "1.10" futures = { version = "0.3", default-features = false } reqwest = { workspace = true, features = ["stream", "json", "rustls-tls-native-roots"] } serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.141" +serde_json = "1.0.143" snafu = "0.8" url = "2.5.3" uuid = { version = "1", features = ["v4"] } diff --git a/influxdb_influxql_parser/Cargo.toml b/influxdb_influxql_parser/Cargo.toml index 5cf47680..cbdd4057 100644 --- a/influxdb_influxql_parser/Cargo.toml +++ b/influxdb_influxql_parser/Cargo.toml @@ -18,7 +18,7 @@ num-integer = { version = "0.1", default-features = false, features = [ "std", ] } num-traits = "0.2" -thiserror = "2.0.12" +thiserror = "2.0.16" workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] # In alphabetical order diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 67053ac0..cce819fa 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -42,14 +42,14 @@ prost = { workspace = true, optional = true } rand = { version = "0.9.2", optional = true } reqwest = { workspace = true, features = ["stream", "rustls-tls-native-roots"] } schema = { path = "../schema", optional = true } -serde_json = { version = "1.0.141", optional = true } +serde_json = { version = "1.0.143", optional = true } tokio = { version = "1.47", features = [ "macros", "parking_lot", "rt-multi-thread", ] } tokio-stream = "0.1.17" -thiserror = "2.0.12" +thiserror = "2.0.16" tonic-reflection = { version = "0.12" } [dev-dependencies] diff --git a/influxdb_iox_client/src/client/flight/mod.rs b/influxdb_iox_client/src/client/flight/mod.rs index 30248a59..d2a4000b 100644 --- a/influxdb_iox_client/src/client/flight/mod.rs +++ b/influxdb_iox_client/src/client/flight/mod.rs @@ -28,6 +28,8 @@ pub mod generated_types { pub use generated_types::influxdata::iox::querier::v1::*; } +pub use ::generated_types::tonic::codec::CompressionEncoding; + /// Error responses when querying an IOx namespace using the IOx Flight API. #[derive(Debug, Error)] pub enum Error { @@ -238,6 +240,20 @@ impl Client { Ok(self.inner.add_header(key, value)?) } + /// Accept compression. + /// + /// This is a set and the server will pick one option from it (plus the option "no compression"). + pub fn accept_compressed(&mut self, encoding: CompressionEncoding) { + *self.inner.inner_mut() = self.inner.inner_mut().clone().accept_compressed(encoding); + } + + /// Send compression. + /// + /// Only ONE compression can be send. Ask your server operator which encodings are supported. + pub fn send_compressed(&mut self, encoding: CompressionEncoding) { + *self.inner.inner_mut() = self.inner.inner_mut().clone().send_compressed(encoding); + } + /// Create a new [`QueryBuilder`] to construct a query, optionally with parameters, on the /// given namespace pub fn query(&mut self, database: impl Into + Send) -> QueryBuilder<'_, NoQuery> { diff --git a/influxdb_iox_client/src/client/table.rs b/influxdb_iox_client/src/client/table.rs index 2cc543e4..0fef4244 100644 --- a/influxdb_iox_client/src/client/table.rs +++ b/influxdb_iox_client/src/client/table.rs @@ -9,6 +9,7 @@ use ::generated_types::influxdata::iox::Target; /// Re-export generated_types pub mod generated_types { pub use generated_types::influxdata::iox::{ + common::v1::SoftDeleted, partition_template::v1::{template_part::*, *}, table::v1::*, }; @@ -32,11 +33,15 @@ impl Client { pub async fn get_tables( &mut self, namespace: impl Into + Send, + filters: Option>, ) -> Result, Error> { Ok(self .inner .get_tables(GetTablesRequest { target: Some(namespace.into().into()), + filters: filters.map(|filters| TableStatusFilterList { + inner: filters.into_iter().map(|f| f as i32).collect(), + }), }) .await? .into_inner() @@ -81,10 +86,17 @@ impl Client { } /// Soft delete a table - pub async fn soft_delete_table(&mut self, table_id: i64) -> Result { + pub async fn soft_delete_table( + &mut self, + table_id: i64, + namespace_id: i64, + ) -> Result { let response = self .inner - .delete_table(DeleteTableRequest { table_id }) + .delete_table(DeleteTableRequest { + table_id, + namespace_id, + }) .await?; Ok(response.into_inner().table.unwrap_field("table")?) diff --git a/influxdb_line_protocol/src/v3/mod.rs b/influxdb_line_protocol/src/v3/mod.rs index 315d3ed4..ee2b077b 100644 --- a/influxdb_line_protocol/src/v3/mod.rs +++ b/influxdb_line_protocol/src/v3/mod.rs @@ -263,10 +263,10 @@ fn field_family_normal_char(i: &str) -> IResult<&str, &str> { if c == ':' { // Peek to see if the next char is also a colon, and if not, // keep consuming. - if let Some((_, next_ch)) = iter.peek() { - if *next_ch != ':' { - continue; - } + if let Some((_, next_ch)) = iter.peek() + && *next_ch != ':' + { + continue; } } diff --git a/iox_query/Cargo.toml b/iox_query/Cargo.toml index 54936c13..ad969a4e 100644 --- a/iox_query/Cargo.toml +++ b/iox_query/Cargo.toml @@ -29,7 +29,7 @@ datafusion_util = { path = "../datafusion_util" } executor = { path = "../executor" } futures = "0.3" hashbrown = { workspace = true } -indexmap = { version = "2.10", features = ["std"] } +indexmap = { version = "2.11", features = ["std"] } influxdb-line-protocol = { path = "../influxdb_line_protocol" } itertools = "0.13.0" iox_query_params = { path = "../iox_query_params" } diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 4cd99051..a0f69bcf 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -661,11 +661,6 @@ impl IOxSessionContext { ) } - /// Record an event on the span recorder - pub fn record_event(&mut self, name: &'static str) { - self.recorder.event(SpanEvent::new(name)); - } - /// Record an event on the span recorder pub fn set_metadata(&mut self, name: &'static str, value: impl Into) { self.recorder.set_metadata(name, value); diff --git a/iox_query/src/exec/gapfill/mod.rs b/iox_query/src/exec/gapfill/mod.rs index 6b52ad48..88f846bd 100644 --- a/iox_query/src/exec/gapfill/mod.rs +++ b/iox_query/src/exec/gapfill/mod.rs @@ -35,7 +35,7 @@ use datafusion::{ }; use datafusion_util::ThenWithOpt; pub use gap_expander::{ExpandedValue, GapExpander}; -use std::cmp::Ordering; +use std::{cmp::Ordering, slice}; use std::{ convert::Infallible, fmt::{self, Debug}, @@ -550,7 +550,7 @@ impl GapFillExec { let eq_properties = match input.properties().output_ordering() { None => EquivalenceProperties::new(schema), Some(output_ordering) => { - EquivalenceProperties::new_with_orderings(schema, &[output_ordering.clone()]) + EquivalenceProperties::new_with_orderings(schema, slice::from_ref(output_ordering)) } }; diff --git a/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs b/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs index 7d90a1ac..6d43a14e 100644 --- a/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs +++ b/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs @@ -73,16 +73,15 @@ impl TreeNodeRewriter for InfluxRegexToDataFusionRegex { let name = func.name(); if (args.len() == 2) && ((name == REGEX_MATCH_UDF_NAME) || (name == REGEX_NOT_MATCH_UDF_NAME)) + && let Expr::Literal(ScalarValue::Utf8(Some(s)), _) = &args[1] { - if let Expr::Literal(ScalarValue::Utf8(Some(s)), _) = &args[1] { - let s = clean_non_meta_escapes(s); - let op = match name { - REGEX_MATCH_UDF_NAME => Operator::RegexMatch, - REGEX_NOT_MATCH_UDF_NAME => Operator::RegexNotMatch, - _ => unreachable!(), - }; - return Ok(Transformed::yes(binary_expr(args.remove(0), op, lit(s)))); - } + let s = clean_non_meta_escapes(s); + let op = match name { + REGEX_MATCH_UDF_NAME => Operator::RegexMatch, + REGEX_NOT_MATCH_UDF_NAME => Operator::RegexNotMatch, + _ => unreachable!(), + }; + return Ok(Transformed::yes(binary_expr(args.remove(0), op, lit(s)))); } Ok(Transformed::yes(Expr::ScalarFunction(ScalarFunction { diff --git a/iox_query/src/physical_optimizer/sort/merge_partitions.rs b/iox_query/src/physical_optimizer/sort/merge_partitions.rs index 46bd3b0e..44cb8d4d 100644 --- a/iox_query/src/physical_optimizer/sort/merge_partitions.rs +++ b/iox_query/src/physical_optimizer/sort/merge_partitions.rs @@ -100,17 +100,16 @@ pub fn merge_partitions_after_parallelized_sorting( } else { // If all lexical ranges are the same, then the partitions are a result of repartitioning. Insert an SPM above the sort. if let Some(lexical_ranges) = extract_ranges_from_plan(ordering_req, &ctx.plan)? + && lexical_ranges.iter().dedup().collect_vec().len() == 1 { - if lexical_ranges.iter().dedup().collect_vec().len() == 1 { - let plan = add_sort_preserving_merge( - Arc::clone(&ctx.plan), - sort_exec.expr(), - sort_exec.fetch(), - )?; - let mut new_ctx = MergePartitionsContext::new_default(plan); - new_ctx.data.has_merged_parallelized_sort = true; - return Ok(Transformed::yes(new_ctx)); - } + let plan = add_sort_preserving_merge( + Arc::clone(&ctx.plan), + sort_exec.expr(), + sort_exec.fetch(), + )?; + let mut new_ctx = MergePartitionsContext::new_default(plan); + new_ctx.data.has_merged_parallelized_sort = true; + return Ok(Transformed::yes(new_ctx)); }; Ok(Transformed::no(ctx)) diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 646a735d..17142b7d 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -310,6 +310,8 @@ impl TableProvider for ChunkTableProvider { #[cfg(test)] mod test { + use std::slice; + use super::*; use crate::{ exec::IOxSessionContext, @@ -546,7 +548,7 @@ mod test { // simple plan let plan = provider - .scan(&state, None, &[pred.clone()], None) + .scan(&state, None, slice::from_ref(&pred), None) .await .unwrap(); insta::assert_yaml_snapshot!( @@ -563,7 +565,7 @@ mod test { // projection let plan = provider - .scan(&state, Some(&vec![1, 3]), &[pred.clone()], None) + .scan(&state, Some(&vec![1, 3]), slice::from_ref(&pred), None) .await .unwrap(); insta::assert_yaml_snapshot!( diff --git a/iox_query/src/provider/deduplicate.rs b/iox_query/src/provider/deduplicate.rs index c63294a8..ea837c6f 100644 --- a/iox_query/src/provider/deduplicate.rs +++ b/iox_query/src/provider/deduplicate.rs @@ -1,7 +1,7 @@ //! Implemention of DeduplicateExec operator (resolves primary key conflicts) plumbing and tests mod algo; -use std::collections::HashMap; +use std::{collections::HashMap, slice}; use std::{collections::HashSet, fmt, sync::Arc}; use arrow::{error::ArrowError, record_batch::RecordBatch}; @@ -189,7 +189,7 @@ impl DeduplicateExec { ) -> PlanProperties { trace!("Deduplicate output ordering: {:?}", sort_keys); let eq_properties = - EquivalenceProperties::new_with_orderings(input.schema(), &[sort_keys.clone()]); + EquivalenceProperties::new_with_orderings(input.schema(), slice::from_ref(sort_keys)); let output_partitioning = Partitioning::UnknownPartitioning(1); @@ -416,7 +416,7 @@ mod test { use arrow::array::{DictionaryArray, Int64Array}; use arrow_util::display::pretty_format_batches; use schema::TIME_DATA_TIMEZONE; - use std::iter::FromIterator; + use std::{iter::FromIterator, slice}; #[tokio::test] async fn test_single_tag() { @@ -921,7 +921,7 @@ mod test { "| | | 1.0 |", "+----+----+-----+", ]; - assert_batches_eq!(&expected_input_batch, &[b1.clone()]); + assert_batches_eq!(&expected_input_batch, slice::from_ref(&b1)); // sort on t1, t2 let sort_keys = vec![ @@ -1084,7 +1084,7 @@ mod test { "| b | | 1.0 |", "+----+----+-----+", ]; - assert_batches_eq!(&expected_input_batch, &[b1.clone()]); + assert_batches_eq!(&expected_input_batch, slice::from_ref(&b1)); // sort on t1, t2 let sort_keys = vec![ @@ -1251,7 +1251,7 @@ mod test { "| b | a | 1.0 |", "+----+----+-----+", ]; - assert_batches_eq!(&expected_input_batch, &[b1.clone()]); + assert_batches_eq!(&expected_input_batch, slice::from_ref(&b1)); // sort on t1, t2 let sort_keys = vec![ diff --git a/iox_query_influxql/Cargo.toml b/iox_query_influxql/Cargo.toml index c5f5f124..e3a01c20 100644 --- a/iox_query_influxql/Cargo.toml +++ b/iox_query_influxql/Cargo.toml @@ -24,7 +24,7 @@ predicate = { path = "../predicate" } query_functions = { path = "../query_functions" } regex = "1" schema = { path = "../schema" } -serde_json = "1.0.141" +serde_json = "1.0.143" thiserror = "2.0" workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/iox_query_influxql/src/frontend/planner.rs b/iox_query_influxql/src/frontend/planner.rs index 3808e453..763487dc 100644 --- a/iox_query_influxql/src/frontend/planner.rs +++ b/iox_query_influxql/src/frontend/planner.rs @@ -6,12 +6,12 @@ use influxdb_influxql_parser::show_measurements::ShowMeasurementsStatement; use influxdb_influxql_parser::show_tag_keys::ShowTagKeysStatement; use influxdb_influxql_parser::show_tag_values::ShowTagValuesStatement; use iox_query_params::StatementParams; -use std::any::Any; use std::collections::{HashMap, HashSet}; use std::fmt; use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; +use std::{any::Any, slice}; use crate::plan::{InfluxQLToLogicalPlan, SchemaProvider, parse_regex}; use datafusion::datasource::provider_as_source; @@ -98,7 +98,7 @@ impl SchemaExec { let eq_properties = match input.properties().output_ordering() { None => EquivalenceProperties::new(schema), Some(output_ordering) => { - EquivalenceProperties::new_with_orderings(schema, &[output_ordering.clone()]) + EquivalenceProperties::new_with_orderings(schema, slice::from_ref(output_ordering)) } }; diff --git a/iox_query_influxql/src/plan/ir.rs b/iox_query_influxql/src/plan/ir.rs index 74279e2a..94f9b557 100644 --- a/iox_query_influxql/src/plan/ir.rs +++ b/iox_query_influxql/src/plan/ir.rs @@ -85,9 +85,6 @@ pub(super) struct Select { /// /// [time_zone_clause]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#the-time-zone-clause pub(super) timezone: Option, - - /// `true` when the projection contains an `INTEGRAL` function. - pub(super) has_integral: bool, } impl From