diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47b31d2f4e2d3..6a4f379e4d6d3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -54,7 +54,6 @@ use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::unnest::UnnestExec; -use crate::physical_plan::values::ValuesExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, @@ -466,7 +465,8 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - let value_exec = ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?; + let value_exec = + MemoryExec::try_new_as_values(SchemaRef::new(exec_schema), exprs)?; Arc::new(value_exec) } LogicalPlan::EmptyRelation(EmptyRelation { diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 67ecd44ff3176..5e8ee713703bf 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -24,14 +24,17 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::{ - common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, Statistics, + common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, + Statistics, }; use crate::execution_plan::{Boundedness, EmissionType}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, project_schema, Result}; +use arrow_array::RecordBatchOptions; +use arrow_schema::Schema; +use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue}; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; @@ -174,6 +177,96 @@ impl MemoryExec { }) } + /// Create a new execution plan from a list of constant values (`ValuesExec`) + pub fn try_new_as_values( + schema: SchemaRef, + data: Vec>>, + ) -> Result { + if data.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + let n_row = data.len(); + let n_col = schema.fields().len(); + + // We have this single row batch as a placeholder to satisfy evaluation argument + // and generate a single output row + let placeholder_schema = Arc::new(Schema::empty()); + let placeholder_batch = RecordBatch::try_new_with_options( + Arc::clone(&placeholder_schema), + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), + )?; + + // Evaluate each column + let arrays = (0..n_col) + .map(|j| { + (0..n_row) + .map(|i| { + let expr = &data[i][j]; + let result = expr.evaluate(&placeholder_batch)?; + + match result { + ColumnarValue::Scalar(scalar) => Ok(scalar), + ColumnarValue::Array(array) if array.len() == 1 => { + ScalarValue::try_from_array(&array, 0) + } + ColumnarValue::Array(_) => { + plan_err!("Cannot have array values in a values list") + } + } + }) + .collect::>>() + .and_then(ScalarValue::iter_to_array) + }) + .collect::>>()?; + + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + arrays, + &RecordBatchOptions::new().with_row_count(Some(n_row)), + )?; + + let partitions = vec![batch]; + Self::try_new_from_batches(Arc::clone(&schema), partitions) + } + + /// Create a new plan using the provided schema and batches. + /// + /// Errors if any of the batches don't match the provided schema, or if no + /// batches are provided. + pub fn try_new_from_batches( + schema: SchemaRef, + batches: Vec, + ) -> Result { + if batches.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + for batch in &batches { + let batch_schema = batch.schema(); + if batch_schema != schema { + return plan_err!( + "Batch has invalid schema. Expected: {}, got: {}", + schema, + batch_schema + ); + } + } + + let partitions = vec![batches]; + let cache = Self::compute_properties(Arc::clone(&schema), &[], &partitions); + Ok(Self { + partitions, + schema: Arc::clone(&schema), + projected_schema: Arc::clone(&schema), + projection: None, + sort_information: vec![], + cache, + show_sizes: true, + }) + } + /// Set `show_sizes` to determine whether to display partition sizes pub fn with_show_sizes(mut self, show_sizes: bool) -> Self { self.show_sizes = show_sizes; @@ -696,3 +789,96 @@ mod lazy_memory_tests { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::lit; + use crate::test::{self, make_partition}; + + use arrow_schema::{DataType, Field}; + use datafusion_common::stats::{ColumnStatistics, Precision}; + + #[tokio::test] + async fn values_empty_case() -> Result<()> { + let schema = test::aggr_test_schema(); + let empty = MemoryExec::try_new_as_values(schema, vec![]); + assert!(empty.is_err()); + Ok(()) + } + + #[test] + fn new_exec_with_batches() { + let batch = make_partition(7); + let schema = batch.schema(); + let batches = vec![batch.clone(), batch]; + let _exec = MemoryExec::try_new_from_batches(schema, batches).unwrap(); + } + + #[test] + fn new_exec_with_batches_empty() { + let batch = make_partition(7); + let schema = batch.schema(); + let _ = MemoryExec::try_new_from_batches(schema, Vec::new()).unwrap_err(); + } + + #[test] + fn new_exec_with_batches_invalid_schema() { + let batch = make_partition(7); + let batches = vec![batch.clone(), batch]; + + let invalid_schema = Arc::new(Schema::new(vec![ + Field::new("col0", DataType::UInt32, false), + Field::new("col1", DataType::Utf8, false), + ])); + let _ = MemoryExec::try_new_from_batches(invalid_schema, batches).unwrap_err(); + } + + // Test issue: https://github.com/apache/datafusion/issues/8763 + #[test] + fn new_exec_with_non_nullable_schema() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + let _ = MemoryExec::try_new_as_values(Arc::clone(&schema), vec![vec![lit(1u32)]]) + .unwrap(); + // Test that a null value is rejected + let _ = MemoryExec::try_new_as_values( + schema, + vec![vec![lit(ScalarValue::UInt32(None))]], + ) + .unwrap_err(); + } + + #[test] + fn values_stats_with_nulls_only() -> Result<()> { + let data = vec![ + vec![lit(ScalarValue::Null)], + vec![lit(ScalarValue::Null)], + vec![lit(ScalarValue::Null)], + ]; + let rows = data.len(); + let values = MemoryExec::try_new_as_values( + Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])), + data, + )?; + + assert_eq!( + values.statistics()?, + Statistics { + num_rows: Precision::Exact(rows), + total_byte_size: Precision::Exact(8), // not important + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(rows), // there are only nulls + distinct_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + },], + } + ); + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 5089b1e626d48..a30b8981fdd8a 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -34,6 +34,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; /// Execution plan for values list based relation (produces constant rows) +#[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new_as_values` instead")] #[derive(Debug, Clone)] pub struct ValuesExec { /// The schema @@ -44,6 +45,7 @@ pub struct ValuesExec { cache: PlanProperties, } +#[allow(deprecated)] impl ValuesExec { /// Create a new values exec from data as expr pub fn try_new( @@ -117,6 +119,7 @@ impl ValuesExec { } let cache = Self::compute_properties(Arc::clone(&schema)); + #[allow(deprecated)] Ok(ValuesExec { schema, data: batches, @@ -126,6 +129,7 @@ impl ValuesExec { /// Provides the data pub fn data(&self) -> Vec { + #[allow(deprecated)] self.data.clone() } @@ -140,6 +144,7 @@ impl ValuesExec { } } +#[allow(deprecated)] impl DisplayAs for ValuesExec { fn fmt_as( &self, @@ -154,6 +159,7 @@ impl DisplayAs for ValuesExec { } } +#[allow(deprecated)] impl ExecutionPlan for ValuesExec { fn name(&self) -> &'static str { "ValuesExec" @@ -165,6 +171,7 @@ impl ExecutionPlan for ValuesExec { } fn properties(&self) -> &PlanProperties { + #[allow(deprecated)] &self.cache } @@ -176,6 +183,7 @@ impl ExecutionPlan for ValuesExec { self: Arc, _: Vec>, ) -> Result> { + #[allow(deprecated)] ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone()) .map(|e| Arc::new(e) as _) } @@ -194,6 +202,7 @@ impl ExecutionPlan for ValuesExec { Ok(Box::pin(MemoryStream::try_new( self.data(), + #[allow(deprecated)] Arc::clone(&self.schema), None, )?)) @@ -203,6 +212,7 @@ impl ExecutionPlan for ValuesExec { let batch = self.data(); Ok(common::compute_record_batch_statistics( &[batch], + #[allow(deprecated)] &self.schema, None, )) @@ -221,6 +231,7 @@ mod tests { #[tokio::test] async fn values_empty_case() -> Result<()> { let schema = test::aggr_test_schema(); + #[allow(deprecated)] let empty = ValuesExec::try_new(schema, vec![]); assert!(empty.is_err()); Ok(()) @@ -231,7 +242,7 @@ mod tests { let batch = make_partition(7); let schema = batch.schema(); let batches = vec![batch.clone(), batch]; - + #[allow(deprecated)] let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap(); } @@ -239,6 +250,7 @@ mod tests { fn new_exec_with_batches_empty() { let batch = make_partition(7); let schema = batch.schema(); + #[allow(deprecated)] let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err(); } @@ -251,6 +263,7 @@ mod tests { Field::new("col0", DataType::UInt32, false), Field::new("col1", DataType::Utf8, false), ])); + #[allow(deprecated)] let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err(); } @@ -262,8 +275,10 @@ mod tests { DataType::UInt32, false, )])); + #[allow(deprecated)] let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap(); // Test that a null value is rejected + #[allow(deprecated)] let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]]) .unwrap_err(); } @@ -276,6 +291,7 @@ mod tests { vec![lit(ScalarValue::Null)], ]; let rows = data.len(); + #[allow(deprecated)] let values = ValuesExec::try_new( Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])), data, diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index edfc2ee75bd75..2a2aecf2f2cc3 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -128,7 +128,7 @@ physical_plan 01)DataSinkExec: sink=CsvSink(file_groups=[]) 02)--SortExec: expr=[a@0 ASC NULLS LAST, b@1 DESC], preserve_partitioning=[false] 03)----ProjectionExec: expr=[column1@0 as a, column2@1 as b] -04)------ValuesExec +04)------MemoryExec: partitions=1, partition_sizes=[1] query I INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5); diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index a46040aa532ed..8d96fe47f6b3d 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -786,7 +786,7 @@ physical_plan 08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 09)----------------AggregateExec: mode=Partial, gby=[t@0 as t], aggr=[] 10)------------------ProjectionExec: expr=[column1@0 as t] -11)--------------------ValuesExec +11)--------------------MemoryExec: partitions=1, partition_sizes=[1] 12)------ProjectionExec: expr=[1 as m, t@0 as t] 13)--------AggregateExec: mode=FinalPartitioned, gby=[t@0 as t], aggr=[] 14)----------CoalesceBatchesExec: target_batch_size=8192 @@ -794,7 +794,7 @@ physical_plan 16)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 17)----------------AggregateExec: mode=Partial, gby=[t@0 as t], aggr=[] 18)------------------ProjectionExec: expr=[column1@0 as t] -19)--------------------ValuesExec +19)--------------------MemoryExec: partitions=1, partition_sizes=[1] ##### # Multi column sorting with lists diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index c687429ae6ecf..a127463c2b274 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -424,19 +424,19 @@ query TT EXPLAIN VALUES (1, 'a', -1, 1.1),(NULL, 'b', -3, 0.5) ---- logical_plan Values: (Int64(1), Utf8("a"), Int64(-1), Float64(1.1)), (Int64(NULL), Utf8("b"), Int64(-3), Float64(0.5)) -physical_plan ValuesExec +physical_plan MemoryExec: partitions=1, partition_sizes=[1] query TT EXPLAIN VALUES ('1'::float) ---- logical_plan Values: (Float32(1) AS Utf8("1")) -physical_plan ValuesExec +physical_plan MemoryExec: partitions=1, partition_sizes=[1] query TT EXPLAIN VALUES (('1'||'2')::int unsigned) ---- logical_plan Values: (UInt32(12) AS Utf8("1") || Utf8("2")) -physical_plan ValuesExec +physical_plan MemoryExec: partitions=1, partition_sizes=[1] # all where empty