diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index b624fb362e656..f82f7ea2f869a 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -27,9 +27,8 @@ use crate::{ PhysicalExpr, }; -use arrow::array::new_null_array; -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::{internal_err, plan_err, DataFusionError, Result, ScalarValue}; use datafusion_execution::TaskContext; @@ -53,15 +52,14 @@ impl ValuesExec { } let n_row = data.len(); let n_col = schema.fields().len(); - // we have this single row, null, typed batch as a placeholder to satisfy evaluation argument - let batch = RecordBatch::try_new( - schema.clone(), - schema - .fields() - .iter() - .map(|field| new_null_array(field.data_type(), 1)) - .collect::>(), + // we have this single row batch as a placeholder to satisfy evaluation argument + // and generate a single output row + let batch = RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), )?; + let arr = (0..n_col) .map(|j| { (0..n_row) @@ -71,7 +69,7 @@ impl ValuesExec { match r { Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar), Ok(ColumnarValue::Array(a)) if a.len() == 1 => { - Ok(ScalarValue::List(a)) + ScalarValue::try_from_array(&a, 0) } Ok(ColumnarValue::Array(a)) => { plan_err!( @@ -174,7 +172,7 @@ impl ExecutionPlan for ValuesExec { partition: usize, _context: Arc, ) -> Result { - // GlobalLimitExec has a single output partition + // ValuesExec has a single output partition if 0 != partition { return internal_err!( "ValuesExec invalid partition {partition} (expected 0)" @@ -201,6 +199,7 @@ impl ExecutionPlan for ValuesExec { #[cfg(test)] mod tests { use super::*; + use crate::expressions::lit; use crate::test::{self, make_partition}; use arrow_schema::{DataType, Field, Schema}; @@ -240,4 +239,18 @@ mod tests { ])); let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err(); } + + // Test issue: https://github.com/apache/arrow-datafusion/issues/8763 + #[test] + fn new_exec_with_non_nullable_schema() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + let _ = ValuesExec::try_new(schema.clone(), vec![vec![lit(1u32)]]).unwrap(); + // Test that a null value is rejected + let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]]) + .unwrap_err(); + } } diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index ea570b99d4dd1..132bcdd246fe1 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -114,6 +114,14 @@ VALUES (1,2,3,4,5,6,7,8,9,10,11,12,13,NULL,'F',3.5) ---- 1 2 3 4 5 6 7 8 9 10 11 12 13 NULL F 3.5 +# Test non-literal expressions in VALUES +query II +VALUES (1, CASE WHEN RANDOM() > 0.5 THEN 1 ELSE 1 END), + (2, CASE WHEN RANDOM() > 0.5 THEN 2 ELSE 2 END); +---- +1 1 +2 2 + query IT SELECT * FROM (VALUES (1,'a'),(2,NULL)) AS t(c1, c2) ----