From 47f2a927b2ae3850ed8c0c00a3e967762593f27a Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 15 Oct 2024 13:33:41 -0700 Subject: [PATCH 1/4] test: reproducer of error with aliased now field --- datafusion/sqllogictest/src/test_context.rs | 19 +++++++++++++++---- .../sqllogictest/test_files/metadata.slt | 14 ++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 9a0db1c41c71b..2143b3089ee5a 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -319,11 +319,17 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { String::from("metadata_key"), String::from("the l_name field"), )])); + let ts = Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false) + .with_metadata(HashMap::from([( + String::from("metadata_key"), + String::from("ts non-nullable field"), + )])); - let schema = Schema::new(vec![id, name, l_name]).with_metadata(HashMap::from([( - String::from("metadata_key"), - String::from("the entire schema"), - )])); + let schema = + Schema::new(vec![id, name, l_name, ts]).with_metadata(HashMap::from([( + String::from("metadata_key"), + String::from("the entire schema"), + )])); let batch = RecordBatch::try_new( Arc::new(schema), @@ -331,6 +337,11 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as _, Arc::new(StringArray::from(vec![None, Some("bar"), Some("baz")])) as _, Arc::new(StringArray::from(vec![None, Some("l_bar"), Some("l_baz")])) as _, + Arc::new(TimestampNanosecondArray::from(vec![ + 1599572549190855123, + 1599572549190855123, + 1599572549190855123, + ])) as _, ], ) .unwrap(); diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index d0853b9e4983b..0cf8bf8361403 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -125,5 +125,19 @@ NULL NULL l_bar +statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +SELECT ts +FROM (( + SELECT now() AS ts + FROM table_with_metadata +) UNION ALL ( + SELECT ts + FROM table_with_metadata +)) +GROUP BY ts +ORDER BY ts +LIMIT 1; + + statement ok drop table table_with_metadata; From 9aef3fc9237d30281b4f759bd2a7c4bda0b6d9c1 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 15 Oct 2024 10:19:41 -0700 Subject: [PATCH 2/4] fix: now() UDF is not nullable --- datafusion/functions/src/datetime/now.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index b2221215b94b7..74eb5aea4255c 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -21,7 +21,7 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::Nanosecond; -use datafusion_common::{internal_err, Result, ScalarValue}; +use datafusion_common::{internal_err, ExprSchema, Result, ScalarValue}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility}; @@ -84,4 +84,8 @@ impl ScalarUDFImpl for NowFunc { ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())), ))) } + + fn is_nullable(&self, _args: &[Expr], _schema: &dyn ExprSchema) -> bool { + false + } } From 8c59264f86619d20010692fdfba752fd15b76221 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 11 Oct 2024 12:30:18 -0700 Subject: [PATCH 3/4] fix: when extracting metadata from expr, handle CastExpr --- datafusion/physical-plan/src/projection.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 49bf059642263..a28328fb5d439 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -40,7 +40,7 @@ use datafusion_common::stats::Precision; use datafusion_common::Result; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::Literal; +use datafusion_physical_expr::expressions::{CastExpr, Literal}; use crate::execution_plan::CardinalityEffect; use futures::stream::{Stream, StreamExt}; @@ -246,6 +246,10 @@ pub(crate) fn get_field_metadata( e: &Arc, input_schema: &Schema, ) -> Option> { + if let Some(cast) = e.as_any().downcast_ref::() { + return get_field_metadata(cast.expr(), input_schema); + } + // Look up field by index in schema (not NAME as there can be more than one // column with the same name) e.as_any() From e45782c58ab13686d1acd66b29f0868204acbac8 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 15 Oct 2024 13:44:48 -0700 Subject: [PATCH 4/4] test: update tests now that the fixes are working --- datafusion/sqllogictest/test_files/metadata.slt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 0cf8bf8361403..588a36e3d515a 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -125,7 +125,7 @@ NULL NULL l_bar -statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +query P rowsort SELECT ts FROM (( SELECT now() AS ts @@ -137,6 +137,8 @@ FROM (( GROUP BY ts ORDER BY ts LIMIT 1; +---- +2020-09-08T13:42:29.190855123Z statement ok