From 43cca68c5ee758a396600b8c6b4e43172dcd81ab Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 21 Jul 2025 09:56:14 -0700 Subject: [PATCH 1/3] Make physical now adds parquet field Id This reverts commit b0320b6bbf183cb7a98b727759ab90f42327016b. --- kernel/src/scan/log_replay.rs | 6 +- kernel/src/scan/mod.rs | 73 +++---- kernel/src/schema/mod.rs | 323 +++++++++++++++++++++++++------ kernel/src/table_changes/scan.rs | 3 +- 4 files changed, 306 insertions(+), 99 deletions(-) diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index fdccf143f..6f34cac7b 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -418,7 +418,8 @@ mod tests { add_batch_simple, add_batch_with_partition_col, add_batch_with_remove, run_with_validate_callback, }; - use crate::scan::{get_state_info, Scan}; + use crate::scan::{Scan, StateInfo}; + use crate::table_features::ColumnMappingMode; use crate::Expression as Expr; use crate::{ engine::sync::SyncEngine, @@ -503,7 +504,8 @@ mod tests { StructField::new("date", DataType::DATE, true), ])); let partition_cols = ["date".to_string()]; - let state_info = get_state_info(schema.as_ref(), &partition_cols).unwrap(); + let state_info = + StateInfo::try_new(schema.as_ref(), &partition_cols, ColumnMappingMode::None).unwrap(); let static_transform = Some(Arc::new(Scan::get_static_transform(&state_info.all_fields))); let batch = vec![add_batch_with_partition_col()]; let iter = scan_action_iter( diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 06449af86..762df8423 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -112,9 +112,10 @@ impl ScanBuilder { pub fn build(self) -> DeltaResult { // if no schema is provided, use snapshot's entire schema (e.g. SELECT *) let logical_schema = self.schema.unwrap_or_else(|| self.snapshot.schema()); - let state_info = get_state_info( + let state_info = StateInfo::try_new( logical_schema.as_ref(), &self.snapshot.metadata().partition_columns, + self.snapshot.table_configuration().column_mapping_mode(), )?; let physical_predicate = match self.predicate { @@ -818,39 +819,45 @@ struct StateInfo { have_partition_cols: bool, } -/// Get the state needed to process a scan, see [`StateInfo`] for details. -fn get_state_info(logical_schema: &Schema, partition_columns: &[String]) -> DeltaResult { - let mut have_partition_cols = false; - let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); - // Loop over all selected fields and note if they are columns that will be read from the - // parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to - // be filled in by evaluating an expression ([`ColumnType::Partition`]) - let all_fields = logical_schema - .fields() - .enumerate() - .map(|(index, logical_field)| -> DeltaResult<_> { - if partition_columns.contains(logical_field.name()) { - // Store the index into the schema for this field. When we turn it into an - // expression in the inner loop, we will index into the schema and get the name and - // data type, which we need to properly materialize the column. - have_partition_cols = true; - Ok(ColumnType::Partition(index)) - } else { - // Add to read schema, store field so we can build a `Column` expression later - // if needed (i.e. if we have partition columns) - let physical_field = logical_field.make_physical(); - debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); - let physical_name = physical_field.name.clone(); - read_fields.push(physical_field); - Ok(ColumnType::Selected(physical_name)) - } +impl StateInfo { + /// Get the state needed to process a scan. + fn try_new( + logical_schema: &Schema, + partition_columns: &[String], + column_mapping_mode: ColumnMappingMode, + ) -> DeltaResult { + let mut have_partition_cols = false; + let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); + // Loop over all selected fields and note if they are columns that will be read from the + // parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to + // be filled in by evaluating an expression ([`ColumnType::Partition`]) + let all_fields = logical_schema + .fields() + .enumerate() + .map(|(index, logical_field)| -> DeltaResult<_> { + if partition_columns.contains(logical_field.name()) { + // Store the index into the schema for this field. When we turn it into an + // expression in the inner loop, we will index into the schema and get the name and + // data type, which we need to properly materialize the column. + have_partition_cols = true; + Ok(ColumnType::Partition(index)) + } else { + // Add to read schema, store field so we can build a `Column` expression later + // if needed (i.e. if we have partition columns) + let physical_field = logical_field.make_physical(column_mapping_mode); + debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); + let physical_name = physical_field.name.clone(); + read_fields.push(physical_field); + Ok(ColumnType::Selected(physical_name)) + } + }) + .try_collect()?; + Ok(StateInfo { + all_fields, + read_fields, + have_partition_cols, }) - .try_collect()?; - Ok(StateInfo { - all_fields, - read_fields, - have_partition_cols, - }) + } } pub fn selection_vector( diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index dbaed4678..761bbeb89 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -8,9 +8,11 @@ use std::sync::Arc; use indexmap::IndexMap; use itertools::Itertools; use serde::{Deserialize, Serialize}; +use tracing::warn; // re-export because many call sites that use schemas do not necessarily use expressions pub(crate) use crate::expressions::{column_name, ColumnName}; +use crate::table_features::ColumnMappingMode; use crate::utils::{require, CowExt as _}; use crate::{DeltaResult, Error}; use delta_kernel_derive::internal_api; @@ -91,6 +93,7 @@ impl From for MetadataValue { pub enum ColumnMetadataKey { ColumnMappingId, ColumnMappingPhysicalName, + ParquetFieldId, GenerationExpression, IdentityStart, IdentityStep, @@ -104,6 +107,7 @@ impl AsRef for ColumnMetadataKey { match self { Self::ColumnMappingId => "delta.columnMapping.id", Self::ColumnMappingPhysicalName => "delta.columnMapping.physicalName", + Self::ParquetFieldId => "parquet.field.id", Self::GenerationExpression => "delta.generationExpression", Self::IdentityAllowExplicitInsert => "delta.identity.allowExplicitInsert", Self::IdentityHighWaterMark => "delta.identity.highWaterMark", @@ -229,34 +233,122 @@ impl StructField { .collect() } - /// Applies physical name mappings to this field + /// Applies physical name and field ID mappings to this field. /// - /// NOTE: Caller affirms that the schema was already validated by - /// [`crate::table_features::validate_schema_column_mapping`], to ensure that annotations are - /// always and only present when column mapping mode is enabled. - pub fn make_physical(&self) -> Self { - struct MakePhysical; + /// This function sets the field ID for the physical [`StructField`] only if the + /// `column_mapping_mode` is `Id`. The field ID is specified using the + /// [`ColumnMetadataKey::ParquetFieldId`] metadata field. Readers should use + /// [`ColumnMetadataKey::ParquetFieldId`] to match fields to the Parquet schema. + /// If a physical StructField contains a field ID, the reader must resolve columns + /// with that ID. Otherwise, the physical StructField's name is used. For details, + /// see [`read_parquet_files`]. + /// + /// This function also sets the physical name of a field. If `column_mapping_mode` is + /// `Id` or `Name`, this is specified in [`ColumnMetadataKey::ColumnMappingPhysicalName`]. + /// Otherwise, the field's logical name is used. + /// + /// If the `column_mapping_mode` is `None`, then all column mapping metadata is removed. + /// If the `column_mapping_mode` is `Name`, then all Id mode column mapping metadata is + /// removed. + /// + /// NOTE: The caller must ensure that the schema has been validated by + /// [`crate::table_features::validate_schema_column_mapping`] to ensure that annotations are + /// present only when column mapping mode is enabled. + /// + /// [`read_parquet_files`]: crate::ParquetHandler::read_parquet_files + #[internal_api] + pub(crate) fn make_physical(&self, column_mapping_mode: ColumnMappingMode) -> Self { + struct MakePhysical { + column_mapping_mode: ColumnMappingMode, + } impl<'a> SchemaTransform<'a> for MakePhysical { fn transform_struct_field( &mut self, field: &'a StructField, ) -> Option> { let field = self.recurse_into_struct_field(field)?; - Some(Cow::Owned(field.with_name(field.physical_name()))) + + let metadata = field.logical_to_physical_metadata(self.column_mapping_mode); + let name = match self.column_mapping_mode { + ColumnMappingMode::None => field.name().to_owned(), + ColumnMappingMode::Id | ColumnMappingMode::Name => { + // Assert that the physical name is present + debug_assert!(field + .metadata + .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) + .is_some_and(|x| matches!(x, MetadataValue::String(_)))); + field.physical_name().to_owned() + } + }; + + Some(Cow::Owned(field.with_name(name).with_metadata(metadata))) } } // NOTE: unwrap is safe because the transformer is incapable of returning None #[allow(clippy::unwrap_used)] - MakePhysical - .transform_struct_field(self) - .unwrap() - .into_owned() + MakePhysical { + column_mapping_mode, + } + .transform_struct_field(self) + .unwrap() + .into_owned() } fn has_invariants(&self) -> bool { self.metadata .contains_key(ColumnMetadataKey::Invariants.as_ref()) } + + /// Converts logical schema StructField metadata to physical schema metadata + /// based on the specified `column_mapping_mode`. + /// + /// NOTE: Caller affirms that the schema was already validated by + /// [`crate::table_features::validate_schema_column_mapping`], to ensure that annotations are + /// always and only present when column mapping mode is enabled. + fn logical_to_physical_metadata( + &self, + column_mapping_mode: ColumnMappingMode, + ) -> HashMap { + let mut base_metadata = self.metadata.clone(); + let physical_name_key = ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(); + let field_id_key = ColumnMetadataKey::ColumnMappingId.as_ref(); + let parquet_field_id_key = ColumnMetadataKey::ParquetFieldId.as_ref(); + let field_id = base_metadata.get(ColumnMetadataKey::ColumnMappingId.as_ref()); + match column_mapping_mode { + ColumnMappingMode::Id => { + let Some(MetadataValue::Number(fid)) = field_id else { + // `validate_schema_column_mapping` should have verified that this has a field Id + warn!("StructField with name {} is missing field id in the Id column mapping mode", self.name()); + debug_assert!(false); + return base_metadata; + }; + // Insert the parquet field id matching the column mapping id + base_metadata.insert( + parquet_field_id_key.to_string(), + MetadataValue::Number(*fid), + ); + // Ensure that physical name is present + debug_assert!(base_metadata.contains_key(physical_name_key)); + } + ColumnMappingMode::Name => { + // Logical metadata should have the column mapping metadata keys + debug_assert!(base_metadata.contains_key(physical_name_key)); + debug_assert!(base_metadata.contains_key(field_id_key)); + + // Remove all id mode related metadata keys + base_metadata.remove(field_id_key); + base_metadata.remove(parquet_field_id_key); + // TODO(#1070): Remove nested column ids when they are supported in kernel + } + ColumnMappingMode::None => { + base_metadata.remove(physical_name_key); + base_metadata.remove(field_id_key); + base_metadata.remove(parquet_field_id_key); + // TODO(#1070): Remove nested column ids when they are supported in kernel + } + } + base_metadata + } } /// A struct is used to represent both the top-level schema of the table @@ -333,12 +425,29 @@ impl StructType { /// /// NOTE: This method only traverses through `StructType` fields; `MapType` and `ArrayType` /// fields are considered leaves even if they contain `StructType` entries/elements. + #[allow(unused)] #[internal_api] pub(crate) fn leaves<'s>(&self, own_name: impl Into>) -> ColumnNamesAndTypes { let mut get_leaves = GetSchemaLeaves::new(own_name.into()); let _ = get_leaves.transform_struct(self); (get_leaves.names, get_leaves.types).into() } + + /// Applies physical name mappings to this field. If the `column_mapping_mode` is + /// [`ColumnMappingMode::Id`], then each StructField will have its parquet field id in the + /// [`ColumnMetadataKey::ParquetFieldId`] metadata field. + /// + /// NOTE: Caller affirms that the schema was already validated by + /// [`crate::table_features::validate_schema_column_mapping`], to ensure that annotations are + /// always and only present when column mapping mode is enabled. + #[allow(unused)] + #[internal_api] + pub(crate) fn make_physical(&self, column_mapping_mode: ColumnMappingMode) -> Self { + let fields = self + .fields() + .map(|field| field.make_physical(column_mapping_mode)); + Self::new(fields) + } } #[derive(Debug, Default)] @@ -1073,6 +1182,37 @@ mod tests { use super::*; use serde_json; + fn example_schema_metadata() -> &'static str { + r#" + { + "name": "e", + "type": { + "type": "array", + "elementType": { + "type": "struct", + "fields": [ + { + "name": "d", + "type": "integer", + "nullable": false, + "metadata": { + "delta.columnMapping.id": 5, + "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49" + } + } + ] + }, + "containsNull": true + }, + "nullable": true, + "metadata": { + "delta.columnMapping.id": 4, + "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1", + "delta.identity.start": 2147483648 + } + }"# + } + #[test] fn test_serde_data_types() { let data = r#" @@ -1209,66 +1349,123 @@ mod tests { } #[test] - fn test_field_metadata() { - let data = r#" - { - "name": "e", - "type": { - "type": "array", - "elementType": { - "type": "struct", - "fields": [ - { - "name": "d", - "type": "integer", - "nullable": false, - "metadata": { - "delta.columnMapping.id": 5, - "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49" - } - } - ] - }, - "containsNull": true - }, - "nullable": true, - "metadata": { - "delta.columnMapping.id": 4, - "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1", - "delta.identity.start": 2147483648 - } - } - "#; - + fn test_make_physical_no_column_mapping() { + let data = example_schema_metadata(); let field: StructField = serde_json::from_str(data).unwrap(); + let physical_field = field.make_physical(ColumnMappingMode::None); + + let assert_field_metadata_is_wiped = |field: &StructField| { + assert!(field + .get_config_value(&ColumnMetadataKey::ColumnMappingId) + .is_none()); + assert!(field + .get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) + .is_none()); + assert!(field + .get_config_value(&ColumnMetadataKey::ParquetFieldId) + .is_none()); + }; + assert_eq!(physical_field.name, "e"); + assert_field_metadata_is_wiped(&physical_field); - let col_id = field - .get_config_value(&ColumnMetadataKey::ColumnMappingId) - .unwrap(); - let id_start = field - .get_config_value(&ColumnMetadataKey::IdentityStart) - .unwrap(); - assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4)); - assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64)); - assert_eq!( - field.physical_name(), - "col-5f422f40-de70-45b2-88ab-1d5c90e94db1" - ); - let physical_field = field.make_physical(); - assert_eq!( - physical_field.name, - "col-5f422f40-de70-45b2-88ab-1d5c90e94db1" - ); let DataType::Array(atype) = physical_field.data_type else { panic!("Expected an Array"); }; let DataType::Struct(stype) = atype.element_type else { panic!("Expected a Struct"); }; - assert_eq!( - stype.fields.get_index(0).unwrap().1.name, - "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49" - ); + let struct_field = stype.fields.get_index(0).unwrap().1; + assert_eq!(struct_field.name, "d"); + assert_field_metadata_is_wiped(struct_field); + } + + #[test] + fn test_make_physical_column_mapping() { + [ColumnMappingMode::Name, ColumnMappingMode::Id] + .into_iter() + .for_each(|mode| { + let data = example_schema_metadata(); + + let field: StructField = serde_json::from_str(data).unwrap(); + + let col_id = field + .get_config_value(&ColumnMetadataKey::ColumnMappingId) + .unwrap(); + let id_start = field + .get_config_value(&ColumnMetadataKey::IdentityStart) + .unwrap(); + assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4)); + assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64)); + assert_eq!( + field.physical_name(), + "col-5f422f40-de70-45b2-88ab-1d5c90e94db1" + ); + let physical_field = field.make_physical(mode); + + // Parquet field id should only be present in id column mapping mode + match mode { + ColumnMappingMode::Id => { + assert!(matches!( + physical_field.get_config_value(&ColumnMetadataKey::ParquetFieldId), + Some(MetadataValue::Number(4)) + )); + + assert!(matches!( + physical_field.get_config_value(&ColumnMetadataKey::ColumnMappingId), + Some(MetadataValue::Number(4)) + )); + } + ColumnMappingMode::Name => { + assert!(physical_field + .get_config_value(&ColumnMetadataKey::ParquetFieldId) + .is_none()); + assert!(physical_field + .get_config_value(&ColumnMetadataKey::ColumnMappingId) + .is_none(),); + } + ColumnMappingMode::None => panic!("unexpected column mapping mode"), + } + + assert_eq!( + physical_field.name, + "col-5f422f40-de70-45b2-88ab-1d5c90e94db1" + ); + let DataType::Array(atype) = physical_field.data_type else { + panic!("Expected an Array"); + }; + let DataType::Struct(stype) = atype.element_type else { + panic!("Expected a Struct"); + }; + + let struct_field = stype.fields.get_index(0).unwrap().1; + assert_eq!( + struct_field.name, + "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49" + ); + + // The subfield should also have ParquetFieldId present it column mapping id mode + match mode { + ColumnMappingMode::Id => { + assert!(matches!( + struct_field.get_config_value(&ColumnMetadataKey::ParquetFieldId), + Some(MetadataValue::Number(5)) + )); + assert!(matches!( + struct_field.get_config_value(&ColumnMetadataKey::ColumnMappingId), + Some(MetadataValue::Number(5)) + )); + } + ColumnMappingMode::Name => { + assert!(struct_field + .get_config_value(&ColumnMetadataKey::ParquetFieldId) + .is_none()); + assert!(struct_field + .get_config_value(&ColumnMetadataKey::ColumnMappingId) + .is_none()); + } + ColumnMappingMode::None => panic!("unexpected column mapping mode"), + } + }); } #[test] diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index d1ad9f046..ec956ef0b 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -154,7 +154,8 @@ impl TableChangesScanBuilder { } else { // Add to read schema, store field so we can build a `Column` expression later // if needed (i.e. if we have partition columns) - let physical_field = logical_field.make_physical(); + let physical_field = logical_field + .make_physical(self.table_changes.end_snapshot.column_mapping_mode()); debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); let physical_name = physical_field.name.clone(); read_fields.push(physical_field); From e4e8129070e5dcff08b2c575efd6d5c6d53ea825 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 29 Jul 2025 14:29:32 -0700 Subject: [PATCH 2/3] Fix test --- kernel/src/schema/mod.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index 761bbeb89..06db75c4c 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -272,11 +272,12 @@ impl StructField { let name = match self.column_mapping_mode { ColumnMappingMode::None => field.name().to_owned(), ColumnMappingMode::Id | ColumnMappingMode::Name => { - // Assert that the physical name is present - debug_assert!(field - .metadata - .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) - .is_some_and(|x| matches!(x, MetadataValue::String(_)))); + // TODO(#1128): uncomment this once unshredded variant supports column maping metadata + // // Assert that the physical name is present + // debug_assert!(field + // .metadata + // .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) + // .is_some_and(|x| matches!(x, MetadataValue::String(_)))); field.physical_name().to_owned() } }; @@ -331,9 +332,11 @@ impl StructField { debug_assert!(base_metadata.contains_key(physical_name_key)); } ColumnMappingMode::Name => { - // Logical metadata should have the column mapping metadata keys - debug_assert!(base_metadata.contains_key(physical_name_key)); - debug_assert!(base_metadata.contains_key(field_id_key)); + // TODO(#1128): Uncomment the debug assertions once unshredded variant can be constructed + // with column mapping metadata. + // // Logical metadata should have the column mapping metadata keys + // debug_assert!(base_metadata.contains_key(physical_name_key)); + // debug_assert!(base_metadata.contains_key(field_id_key)); // Remove all id mode related metadata keys base_metadata.remove(field_id_key); From 8ce715093b7f5460c6baeefc17da70bae09b3090 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Wed, 30 Jul 2025 09:38:23 -0700 Subject: [PATCH 3/3] fix variant --- kernel/src/schema/mod.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index 06db75c4c..001f932f4 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -272,18 +272,23 @@ impl StructField { let name = match self.column_mapping_mode { ColumnMappingMode::None => field.name().to_owned(), ColumnMappingMode::Id | ColumnMappingMode::Name => { - // TODO(#1128): uncomment this once unshredded variant supports column maping metadata - // // Assert that the physical name is present - // debug_assert!(field - // .metadata - // .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) - // .is_some_and(|x| matches!(x, MetadataValue::String(_)))); + // Assert that the physical name is present + debug_assert!(field + .metadata + .get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()) + .is_some_and(|x| matches!(x, MetadataValue::String(_)))); field.physical_name().to_owned() } }; Some(Cow::Owned(field.with_name(name).with_metadata(metadata))) } + + fn transform_variant(&mut self, stype: &'a StructType) -> Option> { + // There is no column mapping metadata inside the struct fields of a variant, so + // we do not recurse into the variant fields + Some(Cow::Borrowed(stype)) + } } // NOTE: unwrap is safe because the transformer is incapable of returning None #[allow(clippy::unwrap_used)] @@ -332,11 +337,9 @@ impl StructField { debug_assert!(base_metadata.contains_key(physical_name_key)); } ColumnMappingMode::Name => { - // TODO(#1128): Uncomment the debug assertions once unshredded variant can be constructed - // with column mapping metadata. - // // Logical metadata should have the column mapping metadata keys - // debug_assert!(base_metadata.contains_key(physical_name_key)); - // debug_assert!(base_metadata.contains_key(field_id_key)); + // Logical metadata should have the column mapping metadata keys + debug_assert!(base_metadata.contains_key(physical_name_key)); + debug_assert!(base_metadata.contains_key(field_id_key)); // Remove all id mode related metadata keys base_metadata.remove(field_id_key);