diff --git a/Cargo.lock b/Cargo.lock index 5cc401f5c3e05..e24a3937e52cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1846,6 +1846,7 @@ dependencies = [ "datafusion-macros", "datafusion-optimizer", "datafusion-physical-expr", + "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", "datafusion-physical-optimizer", "datafusion-physical-plan", @@ -2038,6 +2039,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-physical-expr", + "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-session", @@ -2144,6 +2146,7 @@ dependencies = [ "datafusion-expr", "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-adapter", "datafusion-physical-expr-common", "datafusion-physical-optimizer", "datafusion-physical-plan", @@ -2177,6 +2180,7 @@ dependencies = [ "dashmap", "datafusion", "datafusion-ffi", + "datafusion-physical-expr-adapter", "datafusion-proto", "env_logger", "futures", @@ -2456,6 +2460,21 @@ dependencies = [ "rstest", ] +[[package]] +name = "datafusion-physical-expr-adapter" +version = "49.0.0" +dependencies = [ + "arrow", + "datafusion-common", + "datafusion-expr", + "datafusion-functions", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "insta", + "itertools 0.14.0", + "rstest", +] + [[package]] name = "datafusion-physical-expr-common" version = "49.0.0" diff --git a/Cargo.toml b/Cargo.toml index 1335361708788..35662fdad0827 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "datafusion/functions-window-common", "datafusion/optimizer", "datafusion/physical-expr", + "datafusion/physical-expr-adapter", "datafusion/physical-expr-common", "datafusion/physical-optimizer", "datafusion/pruning", @@ -134,6 +135,7 @@ datafusion-functions-window-common = { path = "datafusion/functions-window-commo datafusion-macros = { path = "datafusion/macros", version = "49.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "49.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "49.0.0", default-features = false } +datafusion-physical-expr-adapter = { path = "datafusion/physical-expr-adapter", version = "49.0.0", default-features = false } datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "49.0.0", default-features = false } datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "49.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "49.0.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 409fc12bcbc5b..f12bd9202ed84 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -68,6 +68,7 @@ dashmap = { workspace = true } base64 = "0.22.1" datafusion = { workspace = true, default-features = true } datafusion-ffi = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-proto = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } diff --git a/datafusion-examples/examples/custom_file_casts.rs b/datafusion-examples/examples/custom_file_casts.rs index 847aa8ad7f52a..a787c07c2b5b5 100644 --- a/datafusion-examples/examples/custom_file_casts.rs +++ b/datafusion-examples/examples/custom_file_casts.rs @@ -31,11 +31,11 @@ use datafusion::execution::context::SessionContext; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::ArrowWriter; use datafusion::physical_expr::expressions::CastExpr; -use datafusion::physical_expr::schema_rewriter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, -}; use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::SessionConfig; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; use object_store::memory::InMemory; use object_store::path::Path; use object_store::{ObjectStore, PutPayload}; diff --git a/datafusion-examples/examples/default_column_values.rs b/datafusion-examples/examples/default_column_values.rs index b504ef3aad6f1..43e2d4ca09884 100644 --- a/datafusion-examples/examples/default_column_values.rs +++ b/datafusion-examples/examples/default_column_values.rs @@ -38,12 +38,12 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion::parquet::arrow::ArrowWriter; use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_expr::expressions::{CastExpr, Column, Literal}; -use datafusion::physical_expr::schema_rewriter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, -}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{lit, SessionConfig}; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; use futures::StreamExt; use object_store::memory::InMemory; use object_store::path::Path; diff --git a/datafusion-examples/examples/json_shredding.rs b/datafusion-examples/examples/json_shredding.rs index 866e4a8a152c3..9b57749490fd3 100644 --- a/datafusion-examples/examples/json_shredding.rs +++ b/datafusion-examples/examples/json_shredding.rs @@ -40,14 +40,14 @@ use datafusion::logical_expr::{ }; use datafusion::parquet::arrow::ArrowWriter; use datafusion::parquet::file::properties::WriterProperties; -use datafusion::physical_expr::schema_rewriter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, -}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_expr::{expressions, ScalarFunctionExpr}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{lit, SessionConfig}; use datafusion::scalar::ScalarValue; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; use futures::StreamExt; use object_store::memory::InMemory; use object_store::path::Path; diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 3a0259ec64bbf..c83112e90ed11 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -129,6 +129,7 @@ datafusion-functions-table = { workspace = true } datafusion-functions-window = { workspace = true } datafusion-optimizer = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 121ab46730b5b..5741060bf2351 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -48,7 +48,7 @@ use datafusion_execution::{ use datafusion_expr::{ dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType, }; -use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; use futures::{future, stream, Stream, StreamExt, TryStreamExt}; diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index f9a46f2e240fb..f685ccdc9fb9e 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -34,10 +34,11 @@ use datafusion_datasource::schema_adapter::{ use datafusion_datasource::ListingTableUrl; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::expressions::{self, Column}; -use datafusion_physical_expr::schema_rewriter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, + PhysicalExprAdapterFactory, }; -use datafusion_physical_expr::{DefaultPhysicalExprAdapter, PhysicalExpr}; use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index 6bccd76b60fc7..ae67f9118486a 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -42,6 +42,7 @@ datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-aggregate = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 62dc0fccc21a1..734e25a8aead9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -35,8 +35,8 @@ use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_datasource::PartitionedFile; -use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ is_dynamic_physical_expr, PhysicalExpr, }; @@ -585,9 +585,9 @@ mod test { }; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ - expressions::DynamicFilterPhysicalExpr, planner::logical2physical, - schema_rewriter::DefaultPhysicalExprAdapterFactory, PhysicalExpr, + expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr, }; + use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use futures::{Stream, StreamExt}; use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 366d42700fcfd..caec7db0ce0ba 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -41,7 +41,7 @@ use datafusion_common::{DataFusionError, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; -use datafusion_physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory; +use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::PushedDown; diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index f43894de37160..6b2c6cbd405cd 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -53,6 +53,7 @@ datafusion-common-runtime = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 95cc9e24b6451..f1938add09524 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -53,8 +53,8 @@ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; diff --git a/datafusion/physical-expr-adapter/Cargo.toml b/datafusion/physical-expr-adapter/Cargo.toml new file mode 100644 index 0000000000000..c076024b45ee0 --- /dev/null +++ b/datafusion/physical-expr-adapter/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "datafusion-physical-expr-adapter" +description = "Physical expression schema adaptation utilities for DataFusion" +keywords = ["datafusion", "query", "sql"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lib] +name = "datafusion_physical_expr_adapter" +path = "src/lib.rs" + +[dependencies] +arrow = { workspace = true } +datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +itertools = { workspace = true } + +[dev-dependencies] +insta = { workspace = true } +rstest = { workspace = true } diff --git a/datafusion/physical-expr-adapter/README.md b/datafusion/physical-expr-adapter/README.md new file mode 100644 index 0000000000000..beecd53875f93 --- /dev/null +++ b/datafusion/physical-expr-adapter/README.md @@ -0,0 +1,8 @@ +# DataFusion Physical Expression Adapter + +This crate provides utilities for adapting physical expressions to different schemas in DataFusion. + +It handles schema differences in file scans by rewriting expressions to match the physical schema, +including type casting, missing columns, and partition values. + +For detailed documentation, see the [`PhysicalExprAdapter`] trait documentation. diff --git a/datafusion/physical-expr-adapter/src/lib.rs b/datafusion/physical-expr-adapter/src/lib.rs new file mode 100644 index 0000000000000..025f1b4b6385a --- /dev/null +++ b/datafusion/physical-expr-adapter/src/lib.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg", + html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg" +)] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +//! Physical expression schema adaptation utilities for DataFusion + +pub mod schema_rewriter; + +pub use schema_rewriter::{ + DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, + PhysicalExprAdapterFactory, +}; diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs similarity index 68% rename from datafusion/physical-expr/src/schema_rewriter.rs rename to datafusion/physical-expr-adapter/src/schema_rewriter.rs index d622ce4bc01eb..3bdff1bdfb2e4 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -20,27 +20,50 @@ use std::sync::Arc; use arrow::compute::can_cast_types; -use arrow::datatypes::{FieldRef, Schema, SchemaRef}; +use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef}; use datafusion_common::{ exec_err, tree_node::{Transformed, TransformedResult, TreeNode}, Result, ScalarValue, }; +use datafusion_functions::core::getfield::GetFieldFunc; +use datafusion_physical_expr::{ + expressions::{self, CastExpr, Column}, + ScalarFunctionExpr, +}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use crate::expressions::{self, CastExpr, Column}; - /// Trait for adapting physical expressions to match a target schema. /// /// This is used in file scans to rewrite expressions so that they can be evaluated /// against the physical schema of the file being scanned. It allows for handling /// differences between logical and physical schemas, such as type mismatches or missing columns. /// -/// You can create a custom implemention of this trait to handle specific rewriting logic. +/// ## Overview +/// +/// The `PhysicalExprAdapter` allows rewriting physical expressions to match different schemas, including: +/// +/// - **Type casting**: When logical and physical schemas have different types, expressions are +/// automatically wrapped with cast operations. For example, `lit(ScalarValue::Int32(123)) = int64_column` +/// gets rewritten to `lit(ScalarValue::Int32(123)) = cast(int64_column, 'Int32')`. +/// Note that this does not attempt to simplify such expressions - that is done by shared simplifiers. +/// +/// - **Missing columns**: When a column exists in the logical schema but not in the physical schema, +/// references to it are replaced with null literals. +/// +/// - **Struct field access**: Expressions like `struct_column.field_that_is_missing_in_schema` are +/// rewritten to `null` when the field doesn't exist in the physical schema. +/// +/// - **Partition columns**: Partition column references can be replaced with their literal values +/// when scanning specific partitions. +/// +/// ## Custom Implementations +/// +/// You can create a custom implementation of this trait to handle specific rewriting logic. /// For example, to fill in missing columns with default values instead of nulls: /// /// ```rust -/// use datafusion_physical_expr::schema_rewriter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; +/// use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; /// use arrow::datatypes::{Schema, Field, DataType, FieldRef, SchemaRef}; /// use datafusion_physical_expr_common::physical_expr::PhysicalExpr; /// use datafusion_common::{Result, ScalarValue, tree_node::{Transformed, TransformedResult, TreeNode}}; @@ -151,7 +174,7 @@ impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory { /// # Example /// /// ```rust -/// use datafusion_physical_expr::schema_rewriter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory}; +/// use datafusion_physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory}; /// use arrow::datatypes::Schema; /// use std::sync::Arc; /// @@ -220,6 +243,10 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { &self, expr: Arc, ) -> Result>> { + if let Some(transformed) = self.try_rewrite_struct_field_access(&expr)? { + return Ok(Transformed::yes(transformed)); + } + if let Some(column) = expr.as_any().downcast_ref::() { return self.rewrite_column(Arc::clone(&expr), column); } @@ -227,6 +254,88 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { Ok(Transformed::no(expr)) } + /// Attempt to rewrite struct field access expressions to return null if the field does not exist in the physical schema. + /// Note that this does *not* handle nested struct fields, only top-level struct field access. + /// See for more details. + fn try_rewrite_struct_field_access( + &self, + expr: &Arc, + ) -> Result>> { + let get_field_expr = + match ScalarFunctionExpr::try_downcast_func::(expr.as_ref()) { + Some(expr) => expr, + None => return Ok(None), + }; + + let source_expr = match get_field_expr.args().first() { + Some(expr) => expr, + None => return Ok(None), + }; + + let field_name_expr = match get_field_expr.args().get(1) { + Some(expr) => expr, + None => return Ok(None), + }; + + let lit = match field_name_expr + .as_any() + .downcast_ref::() + { + Some(lit) => lit, + None => return Ok(None), + }; + + let field_name = match lit.value().try_as_str().flatten() { + Some(name) => name, + None => return Ok(None), + }; + + let column = match source_expr.as_any().downcast_ref::() { + Some(column) => column, + None => return Ok(None), + }; + + let physical_field = + match self.physical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(_) => return Ok(None), + }; + + let physical_struct_fields = match physical_field.data_type() { + DataType::Struct(fields) => fields, + _ => return Ok(None), + }; + + if physical_struct_fields + .iter() + .any(|f| f.name() == field_name) + { + return Ok(None); + } + + let logical_field = match self.logical_file_schema.field_with_name(column.name()) + { + Ok(field) => field, + Err(_) => return Ok(None), + }; + + let logical_struct_fields = match logical_field.data_type() { + DataType::Struct(fields) => fields, + _ => return Ok(None), + }; + + let logical_struct_field = match logical_struct_fields + .iter() + .find(|f| f.name() == field_name) + { + Some(field) => field, + None => return Ok(None), + }; + + let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?; + Ok(Some(expressions::lit(null_value))) + } + fn rewrite_column( &self, expr: Arc, @@ -304,7 +413,9 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { // TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123` // since that's much cheaper to evalaute. // See https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928 - if !can_cast_types(physical_field.data_type(), logical_field.data_type()) { + let is_compatible = + can_cast_types(physical_field.data_type(), logical_field.data_type()); + if !is_compatible { return exec_err!( "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", column.name(), @@ -332,15 +443,13 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { #[cfg(test)] mod tests { - use crate::expressions::{col, lit}; - use super::*; - use arrow::{ - array::{RecordBatch, RecordBatchOptions}, - datatypes::{DataType, Field, Schema, SchemaRef}, - }; - use datafusion_common::{record_batch, ScalarValue}; + use arrow::array::{RecordBatch, RecordBatchOptions}; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion_common::{assert_contains, record_batch, Result, ScalarValue}; use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{col, lit, CastExpr, Column, Literal}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use itertools::Itertools; use std::sync::Arc; @@ -413,7 +522,7 @@ mod tests { Arc::new(expected), Operator::Or, Arc::new(expressions::BinaryExpr::new( - lit(ScalarValue::Null), + lit(ScalarValue::Float64(None)), // c is missing, so it becomes null Operator::Gt, Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))), )), @@ -426,6 +535,75 @@ mod tests { ); } + #[test] + fn test_rewrite_struct_column_incompatible() { + let physical_schema = Schema::new(vec![Field::new( + "data", + DataType::Struct(vec![Field::new("field1", DataType::Binary, true)].into()), + true, + )]); + + let logical_schema = Schema::new(vec![Field::new( + "data", + DataType::Struct(vec![Field::new("field1", DataType::Int32, true)].into()), + true, + )]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let column_expr = Arc::new(Column::new("data", 0)); + + let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string(); + assert_contains!(error_msg, "Cannot cast column 'data'"); + } + + #[test] + fn test_rewrite_struct_compatible_cast() { + let physical_schema = Schema::new(vec![Field::new( + "data", + DataType::Struct( + vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ] + .into(), + ), + false, + )]); + + let logical_schema = Schema::new(vec![Field::new( + "data", + DataType::Struct( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8View, true), + ] + .into(), + ), + false, + )]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let column_expr = Arc::new(Column::new("data", 0)); + + let result = adapter.rewrite(column_expr).unwrap(); + + let expected = Arc::new(CastExpr::new( + Arc::new(Column::new("data", 0)), + DataType::Struct( + vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8View, true), + ] + .into(), + ), + None, + )) as Arc; + + assert_eq!(result.to_string(), expected.to_string()); + } + #[test] fn test_rewrite_missing_column() -> Result<()> { let (physical_schema, logical_schema) = create_test_schema(); @@ -446,6 +624,42 @@ mod tests { Ok(()) } + #[test] + fn test_rewrite_missing_column_non_nullable_error() { + let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let logical_schema = Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, false), // Missing and non-nullable + ]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let column_expr = Arc::new(Column::new("b", 1)); + + let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string(); + assert_contains!(error_msg, "Non-nullable column 'b' is missing"); + } + + #[test] + fn test_rewrite_missing_column_nullable() { + let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let logical_schema = Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, true), // Missing but nullable + ]); + + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let column_expr = Arc::new(Column::new("b", 1)); + + let result = adapter.rewrite(column_expr).unwrap(); + + let expected = + Arc::new(Literal::new(ScalarValue::Utf8(None))) as Arc; + + assert_eq!(result.to_string(), expected.to_string()); + } + #[test] fn test_rewrite_partition_column() -> Result<()> { let (physical_schema, logical_schema) = create_test_schema(); @@ -509,13 +723,13 @@ mod tests { let result = adapter.rewrite(column_expr); assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Non-nullable column 'b' is missing")); + assert_contains!( + result.unwrap_err().to_string(), + "Non-nullable column 'b' is missing from the physical schema" + ); } - /// Roughly stolen from ProjectionExec + /// Helper function to project expressions onto a RecordBatch fn batch_project( expr: Vec>, batch: &RecordBatch, @@ -606,4 +820,43 @@ mod tests { vec![Some(1), None, Some(3)] ); } + + #[test] + fn test_try_rewrite_struct_field_access() { + // Test the core logic of try_rewrite_struct_field_access + let physical_schema = Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![Field::new("existing_field", DataType::Int32, true)].into(), + ), + true, + )]); + + let logical_schema = Schema::new(vec![Field::new( + "struct_col", + DataType::Struct( + vec![ + Field::new("existing_field", DataType::Int32, true), + Field::new("missing_field", DataType::Utf8, true), + ] + .into(), + ), + true, + )]); + + let rewriter = DefaultPhysicalExprAdapterRewriter { + logical_file_schema: &logical_schema, + physical_file_schema: &physical_schema, + partition_fields: &[], + }; + + // Test that when a field exists in physical schema, it returns None + let column = Arc::new(Column::new("struct_col", 0)) as Arc; + let result = rewriter.try_rewrite_struct_field_access(&column).unwrap(); + assert!(result.is_none()); + + // The actual test for the get_field expression would require creating a proper ScalarFunctionExpr + // with ScalarUDF, which is complex to set up in a unit test. The integration tests in + // datafusion/core/tests/parquet/schema_adapter.rs provide better coverage for this functionality. + } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 845c358d7e58b..46f7b30d01aad 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -38,7 +38,6 @@ mod partitioning; mod physical_expr; pub mod planner; mod scalar_function; -pub mod schema_rewriter; pub mod simplifier; pub mod statistics; pub mod utils; @@ -70,7 +69,7 @@ pub use datafusion_physical_expr_common::sort_expr::{ pub use planner::{create_physical_expr, create_physical_exprs}; pub use scalar_function::ScalarFunctionExpr; -pub use schema_rewriter::DefaultPhysicalExprAdapter; +pub use simplifier::PhysicalExprSimplifier; pub use utils::{conjunction, conjunction_opt, split_conjunction}; // For backwards compatibility diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index e185cca75269c..7a3c463f2510c 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -165,6 +165,29 @@ impl ScalarFunctionExpr { pub fn config_options(&self) -> &ConfigOptions { &self.config_options } + + /// Given an arbitrary PhysicalExpr attempt to downcast it to a ScalarFunctionExpr + /// and verify that its inner function is of type T. + /// If the downcast fails, or the function is not of type T, returns `None`. + /// Otherwise returns `Some(ScalarFunctionExpr)`. + pub fn try_downcast_func(expr: &dyn PhysicalExpr) -> Option<&ScalarFunctionExpr> + where + T: 'static, + { + match expr.as_any().downcast_ref::() { + Some(scalar_expr) + if scalar_expr + .fun() + .inner() + .as_any() + .downcast_ref::() + .is_some() => + { + Some(scalar_expr) + } + _ => None, + } + } } impl fmt::Display for ScalarFunctionExpr { diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 58167238fe2aa..25f014f4cb653 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -136,6 +136,24 @@ impl AsyncScalarUDFImpl for AskLLM { # */ ``` +### Schema Rewriter Module Moved to New Crate + +The `schema_rewriter` module and its associated symbols have been moved from `datafusion_physical_expr` to a new crate `datafusion_physical_expr_adapter`. This affects the following symbols: + +- `DefaultPhysicalExprAdapter` +- `DefaultPhysicalExprAdapterFactory` +- `PhysicalExprAdapter` +- `PhysicalExprAdapterFactory` + +To upgrade, change your imports to: + +```rust +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory, + PhysicalExprAdapter, PhysicalExprAdapterFactory +}; +``` + ### Upgrade to arrow `56.0.0` and parquet `56.0.0` This version of DataFusion upgrades the underlying Apache Arrow implementation