From 0bdebc8edb328e10b1877cab06d82be626ba68ec Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 20 Mar 2025 12:19:46 -0500 Subject: [PATCH 1/3] Add hooks to SchemaAdapter to add custom column generators --- datafusion-testing | 2 +- datafusion/core/src/datasource/mod.rs | 10 +- .../core/src/datasource/physical_plan/mod.rs | 370 +++++++++++++++++- datafusion/datasource-parquet/src/source.rs | 2 +- datafusion/datasource/src/schema_adapter.rs | 217 ++++++++-- 5 files changed, 559 insertions(+), 42 deletions(-) diff --git a/datafusion-testing b/datafusion-testing index 3462eaa787459..5b424aefd7f6b 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 3462eaa787459957e38df267a4a21f5bea605807 +Subproject commit 5b424aefd7f6bf198220c37f59d39dbb25b47695 diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 3602a603cd984..f5cd43ce45f74 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -191,14 +191,8 @@ mod tests { ]); let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); - let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); - assert_eq!(indices, vec![0]); - - let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); - - // Mapping fails because it tries to fill in a non-nullable column with nulls - let err = mapper.map_batch(file_batch).unwrap_err().to_string(); - assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}"); + let err = adapter.map_schema(&file_schema).unwrap_err().to_string(); + assert!(err.contains("Error during planning: Column a is missing from the file schema, cannot be generated, and is non-nullable")); } #[derive(Debug)] diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index cae04e5ee6b82..834bd522789ba 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -89,7 +89,7 @@ mod tests { Field::new("c3", DataType::Float64, true), ])); - let adapter = DefaultSchemaAdapterFactory + let adapter = DefaultSchemaAdapterFactory::default() .create(table_schema.clone(), table_schema.clone()); let file_schema = Schema::new(vec![ @@ -147,7 +147,8 @@ mod tests { let indices = vec![1, 2, 4]; let schema = SchemaRef::from(table_schema.project(&indices).unwrap()); - let adapter = DefaultSchemaAdapterFactory.create(schema, table_schema.clone()); + let adapter = + DefaultSchemaAdapterFactory::default().create(schema, table_schema.clone()); let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); @@ -197,4 +198,369 @@ mod tests { assert_eq!(c4.value(1), 2.0_f32); assert_eq!(c4.value(2), 3.0_f32); } + + #[test] + fn schema_adapter_with_column_generator() { + use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, MissingColumnGenerator, + MissingColumnGeneratorFactory, + }; + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::Int32Type; + use std::fmt; + + // A simple generator that produces a constant value + #[derive(Debug)] + struct ConstantGenerator(Int32Array); + + impl MissingColumnGenerator for ConstantGenerator { + fn generate( + &self, + _batch: RecordBatch, + ) -> datafusion_common::Result { + Ok(Arc::new(self.0.clone())) + } + + fn dependencies(&self) -> Vec { + vec![] + } + } + + // A factory that produces a constant generator for a specific field + #[derive(Debug)] + struct ConstantGeneratorFactory { + field_name: String, + value: i32, + } + + impl fmt::Display for ConstantGeneratorFactory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "ConstantGeneratorFactory({}={})", + self.field_name, self.value + ) + } + } + + impl MissingColumnGeneratorFactory for ConstantGeneratorFactory { + fn create( + &self, + field: &Field, + _file_schema: &Schema, + ) -> Option> { + if field.name() == &self.field_name + && field.data_type() == &DataType::Int32 + { + let array = Int32Array::from(vec![self.value; 3]); + Some(Arc::new(ConstantGenerator(array))) + } else { + None + } + } + } + + // A generator that depends on another column + #[derive(Debug)] + struct MultiplyByTwoGenerator { + dependency: String, + } + + impl MissingColumnGenerator for MultiplyByTwoGenerator { + fn generate( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + let idx = batch + .schema() + .index_of(&self.dependency) + .expect("dependency should exist"); + let col = batch.column(idx); + let col = col.as_primitive::(); + + let result: Int32Array = col.iter().map(|v| v.map(|x| x * 2)).collect(); + Ok(Arc::new(result)) + } + + fn dependencies(&self) -> Vec { + vec![self.dependency.clone()] + } + } + + #[derive(Debug)] + struct MultiplyByTwoGeneratorFactory; + + impl fmt::Display for MultiplyByTwoGeneratorFactory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "MultiplyByTwoGeneratorFactory") + } + } + + impl MissingColumnGeneratorFactory for MultiplyByTwoGeneratorFactory { + fn create( + &self, + field: &Field, + file_schema: &Schema, + ) -> Option> { + if field.name() == "doubled_id" && field.data_type() == &DataType::Int32 { + // Look for id column to use as our dependency + if file_schema.column_with_name("id").is_some() { + Some(Arc::new(MultiplyByTwoGenerator { + dependency: "id".to_string(), + })) + } else { + None + } + } else { + None + } + } + } + + // Test with a constant generator + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("c1", DataType::Boolean, true), + Field::new("missing_column", DataType::Int32, true), + ])); + + let file_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("c1", DataType::Boolean, true), + ]); + + // Create a factory that will generate a constant value for the missing column + let generator_factory = Arc::new(ConstantGeneratorFactory { + field_name: "missing_column".to_string(), + value: 42, + }); + + let adapter = DefaultSchemaAdapterFactory::default() + .with_column_generator(generator_factory) + .create(table_schema.clone(), table_schema.clone()); + + let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); + + // Create a batch to test + let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]); + let batch = RecordBatch::try_new( + Arc::new(file_schema.clone()), + vec![Arc::new(id), Arc::new(c1)], + ) + .unwrap(); + + let projected = batch.project(&projection).unwrap(); + let mapped_batch = mapping.map_batch(projected).unwrap(); + + // Verify the result + assert_eq!(mapped_batch.schema(), table_schema); + assert_eq!(mapped_batch.num_columns(), 3); + assert_eq!(mapped_batch.num_rows(), 3); + + // Check the missing column was generated with constant value + let missing_col = mapped_batch.column(2).as_primitive::(); + assert_eq!(missing_col.value(0), 42); + assert_eq!(missing_col.value(1), 42); + assert_eq!(missing_col.value(2), 42); + + // Test with a generator that depends on another column + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("doubled_id", DataType::Int32, true), + ])); + + let file_schema = Schema::new(vec![Field::new("id", DataType::Int32, true)]); + + // Set up the generator factory + let adapter = DefaultSchemaAdapterFactory::default() + .with_column_generator(Arc::new(MultiplyByTwoGeneratorFactory)) + .create(table_schema.clone(), table_schema.clone()); + + let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); + + // Create a batch with just an id column + let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let batch = + RecordBatch::try_new(Arc::new(file_schema.clone()), vec![Arc::new(id)]) + .unwrap(); + + let projected = batch.project(&projection).unwrap(); + let mapped_batch = mapping.map_batch(projected).unwrap(); + + // Verify the result + assert_eq!(mapped_batch.schema(), table_schema); + assert_eq!(mapped_batch.num_columns(), 2); + assert_eq!(mapped_batch.num_rows(), 3); + + // Check the doubled_id column was generated correctly + let id_col = mapped_batch.column(0).as_primitive::(); + let doubled_col = mapped_batch.column(1).as_primitive::(); + + assert_eq!(doubled_col.value(0), id_col.value(0) * 2); + assert_eq!(doubled_col.value(1), id_col.value(1) * 2); + assert_eq!(doubled_col.value(2), id_col.value(2) * 2); + } + + #[test] + fn schema_adapter_with_multiple_generators() { + use crate::datasource::schema_adapter::{ + DefaultSchemaAdapterFactory, MissingColumnGenerator, + MissingColumnGeneratorFactory, + }; + use arrow::array::{ArrayRef, Int32Array, StringArray}; + use arrow::datatypes::Int32Type; + use std::fmt; + + // A generator for creating a description from an id + #[derive(Debug)] + struct IdToDescriptionGenerator; + + impl MissingColumnGenerator for IdToDescriptionGenerator { + fn generate( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + let idx = batch.schema().index_of("id").expect("id should exist"); + let col = batch.column(idx); + let col = col.as_primitive::(); + + let result: StringArray = col + .iter() + .map(|v| { + v.map(|id| match id { + 1 => "Product One", + 2 => "Product Two", + 3 => "Product Three", + _ => "Unknown Product", + }) + }) + .collect(); + Ok(Arc::new(result)) + } + + fn dependencies(&self) -> Vec { + vec!["id".to_string()] + } + } + + #[derive(Debug)] + struct IdToDescriptionGeneratorFactory; + + impl fmt::Display for IdToDescriptionGeneratorFactory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "IdToDescriptionGeneratorFactory") + } + } + + impl MissingColumnGeneratorFactory for IdToDescriptionGeneratorFactory { + fn create( + &self, + field: &Field, + file_schema: &Schema, + ) -> Option> { + if field.name() == "description" && field.data_type() == &DataType::Utf8 { + if file_schema.column_with_name("id").is_some() { + Some(Arc::new(IdToDescriptionGenerator)) + } else { + None + } + } else { + None + } + } + } + + // A generator for creating a score column with constant value + #[derive(Debug)] + struct ScoreGenerator(i32); + + impl MissingColumnGenerator for ScoreGenerator { + fn generate( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + let len = batch.num_rows(); + Ok(Arc::new(Int32Array::from(vec![self.0; len]))) + } + + fn dependencies(&self) -> Vec { + vec![] + } + } + + #[derive(Debug)] + struct ScoreGeneratorFactory; + + impl fmt::Display for ScoreGeneratorFactory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ScoreGeneratorFactory") + } + } + + impl MissingColumnGeneratorFactory for ScoreGeneratorFactory { + fn create( + &self, + field: &Field, + _file_schema: &Schema, + ) -> Option> { + if field.name() == "score" && field.data_type() == &DataType::Int32 { + Some(Arc::new(ScoreGenerator(100))) + } else { + None + } + } + } + + // Set up the schema with multiple missing columns + let table_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("description", DataType::Utf8, true), + Field::new("score", DataType::Int32, true), + ])); + + let file_schema = Schema::new(vec![Field::new("id", DataType::Int32, true)]); + + // Create factory that will generate multiple missing columns + let adapter = DefaultSchemaAdapterFactory::default() + .with_column_generator(Arc::new(IdToDescriptionGeneratorFactory)) + .with_column_generator(Arc::new(ScoreGeneratorFactory)) + .create(table_schema.clone(), table_schema.clone()); + + let (mapping, projection) = adapter.map_schema(&file_schema).unwrap(); + + // Create a batch to test + let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let batch = + RecordBatch::try_new(Arc::new(file_schema.clone()), vec![Arc::new(id)]) + .unwrap(); + + let projected = batch.project(&projection).unwrap(); + let mapped_batch = mapping.map_batch(projected).unwrap(); + + // Verify the result + assert_eq!(mapped_batch.schema(), table_schema); + assert_eq!(mapped_batch.num_columns(), 3); + assert_eq!(mapped_batch.num_rows(), 3); + + // Check both missing columns were generated correctly + let id_col = mapped_batch.column(0).as_primitive::(); + let description_col = mapped_batch.column(1).as_string::(); + let score_col = mapped_batch.column(2).as_primitive::(); + + // Verify id column + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 2); + assert_eq!(id_col.value(2), 3); + + // Verify description column generated from id + assert_eq!(description_col.value(0), "Product One"); + assert_eq!(description_col.value(1), "Product Two"); + assert_eq!(description_col.value(2), "Product Three"); + + // Verify score column has constant value + assert_eq!(score_col.value(0), 100); + assert_eq!(score_col.value(1), 100); + assert_eq!(score_col.value(2), 100); + } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 47e692cb966dc..c47b3be80396c 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -474,7 +474,7 @@ impl FileSource for ParquetSource { let schema_adapter_factory = self .schema_adapter_factory .clone() - .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); + .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default())); let parquet_file_reader_factory = self.parquet_file_reader_factory.clone().unwrap_or_else(|| { diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 4164cda8cba11..64362ac43ddfb 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -21,10 +21,12 @@ //! physical format into how they should be used by DataFusion. For instance, a schema //! can be stored external to a parquet file that maps parquet logical types to arrow types. -use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions}; +use arrow::array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::compute::{can_cast_types, cast}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::plan_err; +use itertools::Itertools; +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -98,6 +100,46 @@ pub trait SchemaMapper: Debug + Send + Sync { fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; } +pub trait MissingColumnGeneratorFactory: Debug + Send + Sync { + /// Create a [`MissingColumnGenerator`] for the given `field` and `file_schema`. + /// Returns None if the column cannot be generated by this generator. + /// Otherwise, returns a [`MissingColumnGenerator`] that can generate the missing column. + fn create( + &self, + field: &Field, + file_schema: &Schema, + ) -> Option>; +} + +pub trait MissingColumnGenerator: Debug + Send + Sync { + /// Generate a missing column for the given `field` from the provided `batch`. + /// When this method is called `batch` will contain all of the columns declared as dependencies in `dependencies`. + /// If the column cannot be generated, this method should return an error. + /// Otherwise, it should return the generated column as an `ArrayRef`. + /// No casting or post processing is done by this method, so the generated column should match the data type + /// of the `field` it is being generated for. + /// The order of + fn generate(&self, batch: RecordBatch) -> datafusion_common::Result; + + /// Returns a list of column names that this generator depends on to generate the missing column. + /// This is used when creating the `RecordBatch` to ensure that all dependencies are present before calling `generate`. + /// The dependencies do not need to be declared in any particular order. + fn dependencies(&self) -> Vec; +} + +pub type ColumnGeneratorFactories = + Vec>; + +#[derive(Debug)] +enum FieldSource { + /// The field is present in the (projected) file schema at the given index + Table(usize), + /// The field is generated by the given generator + Generated(Arc), + /// The field will be populated with nulls + Null, +} + /// Default [`SchemaAdapterFactory`] for mapping schemas. /// /// This can be used to adapt file-level record batches to a table schema and @@ -184,7 +226,13 @@ pub trait SchemaMapper: Debug + Send + Sync { /// assert_eq!(mapped_batch, expected_batch); /// ``` #[derive(Clone, Debug, Default)] -pub struct DefaultSchemaAdapterFactory; +pub struct DefaultSchemaAdapterFactory { + /// Optional generator for missing columns + /// + /// This is used to fill in missing columns with a default value other than null. + /// If this is `None`, then missing columns will be filled with nulls. + column_generators: ColumnGeneratorFactories, +} impl DefaultSchemaAdapterFactory { /// Create a new factory for mapping batches from a file schema to a table @@ -194,7 +242,27 @@ impl DefaultSchemaAdapterFactory { /// the same schema for both the projected table schema and the table /// schema. pub fn from_schema(table_schema: SchemaRef) -> Box { - Self.create(Arc::clone(&table_schema), table_schema) + Self { + column_generators: vec![], + } + .create(Arc::clone(&table_schema), table_schema) + } + + pub fn with_column_generator( + self, + generator: Arc, + ) -> Self { + let mut generators = self.column_generators; + generators.push(generator); + Self { + column_generators: generators, + } + } + + pub fn with_column_generators(self, generators: ColumnGeneratorFactories) -> Self { + Self { + column_generators: generators, + } } } @@ -206,6 +274,7 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { ) -> Box { Box::new(DefaultSchemaAdapter { projected_table_schema, + column_generators: self.column_generators.clone(), }) } } @@ -217,6 +286,8 @@ pub(crate) struct DefaultSchemaAdapter { /// The schema for the table, projected to include only the fields being output (projected) by the /// associated ParquetSource projected_table_schema: SchemaRef, + /// The column generators to use when a column is missing + column_generators: ColumnGeneratorFactories, } impl SchemaAdapter for DefaultSchemaAdapter { @@ -242,40 +313,121 @@ impl SchemaAdapter for DefaultSchemaAdapter { &self, file_schema: &Schema, ) -> datafusion_common::Result<(Arc, Vec)> { + // Projection is the indexes into the file schema that we need to read + // Note that readers will NOT respect the order of the columns in projection let mut projection = Vec::with_capacity(file_schema.fields().len()); - let mut field_mappings = vec![None; self.projected_table_schema.fields().len()]; - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if let Some((table_idx, table_field)) = - self.projected_table_schema.fields().find(file_field.name()) - { - match can_cast_types(file_field.data_type(), table_field.data_type()) { + // Additions to the projection which will be needed to generate missing columns + let mut dependency_projection = Vec::with_capacity(file_schema.fields().len()); + + let mut field_sources = + Vec::with_capacity(self.projected_table_schema.fields().len()); + + for field in self.projected_table_schema.fields() { + if let Some((file_idx, file_field)) = file_schema.fields.find(field.name()) { + // If the field exists in the file schema, check if we can cast it to the table schema + match can_cast_types(file_field.data_type(), field.data_type()) { true => { - field_mappings[table_idx] = Some(projection.len()); projection.push(file_idx); + field_sources.push(FieldSource::Table(projection.len() - 1)); } false => { return plan_err!( "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", file_field.name(), file_field.data_type(), - table_field.data_type() + field.data_type() ) } } + } else if let Some(generator) = self + .column_generators + .iter() + .find_map(|factory| factory.create(field, file_schema)) + { + let dependencies = generator.dependencies(); + let mut dependency_indices = Vec::with_capacity(dependencies.len()); + for dep in &dependencies { + if let Ok(dep_idx) = file_schema.index_of(dep) { + dependency_indices.push(dep_idx); + } else { + return plan_err!( + "Generated column {} depends on column {} but column {} is not present in the file schema, columns present: {:?}", + field.name(), + dep, + dep, + file_schema.fields().iter().map(|f| f.name()).collect::>() + ); + } + } + dependency_projection.extend(dependency_indices); + field_sources.push(FieldSource::Generated(generator)); + } else if field.is_nullable() { + field_sources.push(FieldSource::Null); + } else { + return plan_err!( + "Column {} is missing from the file schema, cannot be generated, and is non-nullable", + field.name() + ); + } + } + + for dep in dependency_projection { + if !projection.contains(&dep) { + projection.push(dep); } } + let (sorted_projection, field_sources) = + sort_projections_and_sources(&projection, field_sources); + + debug_assert!(sorted_projection.is_sorted()); + debug_assert!(sorted_projection.iter().all_unique()); + Ok(( Arc::new(SchemaMapping { projected_table_schema: Arc::clone(&self.projected_table_schema), - field_mappings, + field_sources, }), - projection, + sorted_projection, )) } } +/// The parquet reader needs projections to be sorted (it does not respect the order of the columns in the projection, only the values) +/// This function adjusts the projections and mappings so that they all reference sorted projections +fn sort_projections_and_sources( + projection: &[usize], + mut field_sources: Vec, +) -> (Vec, Vec) { + // Sort projection and create a mapping from old to new positions + let mut sorted_projection = projection.to_vec(); + sorted_projection.sort_unstable(); + + // Create a mapping from old projection values to their new positions + let mut new_position_map = HashMap::new(); + for (new_pos, &proj_val) in sorted_projection.iter().enumerate() { + new_position_map.insert(proj_val, new_pos); + } + + // Create a mapping from old positions to new positions in the projected schema + let mut position_mapping = vec![0; projection.len()]; + for (old_pos, &proj_val) in projection.iter().enumerate() { + position_mapping[old_pos] = *new_position_map + .get(&proj_val) + .expect("should always be present"); + } + + // Update field_sources to reflect the new positions + for source in &mut field_sources { + if let FieldSource::Table(pos) = source { + *pos = position_mapping[*pos]; + } + } + + (sorted_projection, field_sources) +} + /// The SchemaMapping struct holds a mapping from the file schema to the table /// schema and any necessary type conversions. /// @@ -290,12 +442,9 @@ pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion /// and it should match the schema of the query result. projected_table_schema: SchemaRef, - /// Mapping from field index in `projected_table_schema` to index in - /// projected file_schema. - /// - /// They are Options instead of just plain `usize`s because the table could - /// have fields that don't exist in the file. - field_mappings: Vec>, + /// A mapping from the fields in `projected_table_schema`` to the way to materialize + /// them from the projected file schema. + field_sources: Vec, } impl SchemaMapper for SchemaMapping { @@ -306,6 +455,11 @@ impl SchemaMapper for SchemaMapping { let batch_rows = batch.num_rows(); let batch_cols = batch.columns().to_vec(); + debug_assert_eq!( + self.projected_table_schema.fields().len(), + self.field_sources.len() + ); + let cols = self .projected_table_schema // go through each field in the projected schema @@ -313,17 +467,20 @@ impl SchemaMapper for SchemaMapping { .iter() // and zip it with the index that maps fields from the projected table schema to the // projected file schema in `batch` - .zip(&self.field_mappings) + .zip(&self.field_sources) // and for each one... - .map(|(field, file_idx)| { - file_idx.map_or_else( - // If this field only exists in the table, and not in the file, then we know - // that it's null, so just return that. - || Ok(new_null_array(field.data_type(), batch_rows)), - // However, if it does exist in both, then try to cast it to the correct output - // type - |batch_idx| cast(&batch_cols[batch_idx], field.data_type()), - ) + .map(|(field, source)| -> datafusion_common::Result<_> { + let column = match source { + FieldSource::Table(file_idx) => batch_cols[*file_idx].clone(), + FieldSource::Generated(generator) => { + generator.generate(batch.clone())? + } + FieldSource::Null => { + debug_assert!(field.is_nullable()); + new_null_array(field.data_type(), batch_rows) + } + }; + Ok(cast(&column, field.data_type())?) }) .collect::, _>>()?; From 508679ee9b081742d167700b70645722a5cd4a94 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 20 Mar 2025 12:29:27 -0500 Subject: [PATCH 2/3] fix merger --- datafusion/datasource-parquet/src/row_filter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index da6bf114d71dd..b1779137a965f 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -544,7 +544,7 @@ mod test { let expr = col("int64_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); let table_schema = Arc::new(table_schema.clone()); let candidate = FilterCandidateBuilder::new( @@ -583,7 +583,7 @@ mod test { ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), )); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); let table_schema = Arc::new(table_schema.clone()); let candidate = FilterCandidateBuilder::new( expr, @@ -623,7 +623,7 @@ mod test { ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))), )); let expr = logical2physical(&expr, &table_schema); - let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory::default()); let candidate = FilterCandidateBuilder::new( expr, file_schema, From 862fae0b4dbd47db9ad3d9aba2f8f2e49cc28385 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 20 Mar 2025 12:43:45 -0500 Subject: [PATCH 3/3] fix lint --- datafusion/datasource/src/schema_adapter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 64362ac43ddfb..5daf1811bfb5e 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -471,7 +471,7 @@ impl SchemaMapper for SchemaMapping { // and for each one... .map(|(field, source)| -> datafusion_common::Result<_> { let column = match source { - FieldSource::Table(file_idx) => batch_cols[*file_idx].clone(), + FieldSource::Table(file_idx) => Arc::clone(&batch_cols[*file_idx]), FieldSource::Generated(generator) => { generator.generate(batch.clone())? }