From 1f1e7255fa786367df5a13c933cb17668ef235c4 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Tue, 11 Mar 2025 09:01:30 +0100 Subject: [PATCH 1/7] Saner handling of nulls inside arrays --- datafusion/expr-common/src/signature.rs | 39 ++++- .../expr/src/type_coercion/functions.rs | 156 +++++++---------- .../functions-nested/src/cardinality.rs | 30 ++-- datafusion/functions-nested/src/concat.rs | 162 +++++++++-------- datafusion/functions-nested/src/dimension.rs | 84 ++++----- datafusion/functions-nested/src/distance.rs | 44 +++-- datafusion/functions-nested/src/extract.rs | 23 ++- datafusion/functions-nested/src/make_array.rs | 81 +++------ datafusion/functions-nested/src/set_ops.rs | 163 ++++++++++-------- datafusion/functions-nested/src/sort.rs | 35 ++-- datafusion/sqllogictest/test_files/array.slt | 61 ++++--- datafusion/sqllogictest/test_files/unnest.slt | 16 +- 12 files changed, 464 insertions(+), 430 deletions(-) diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index 063417a254be..32a094e8e9d3 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -843,6 +843,7 @@ impl Signature { volatility, } } + /// Any one of a list of [TypeSignature]s. pub fn one_of(type_signatures: Vec, volatility: Volatility) -> Self { Signature { @@ -850,6 +851,7 @@ impl Signature { volatility, } } + /// Specialized Signature for ArrayAppend and similar functions pub fn array_and_element(volatility: Volatility) -> Self { Signature { @@ -865,6 +867,39 @@ impl Signature { volatility, } } + + /// Specialized Signature for ArrayPrepend and similar functions + pub fn element_and_array(volatility: Volatility) -> Self { + Signature { + type_signature: TypeSignature::ArraySignature( + ArrayFunctionSignature::Array { + arguments: vec![ + ArrayFunctionArgument::Element, + ArrayFunctionArgument::Array, + ], + array_coercion: Some(ListCoercion::FixedSizedListToList), + }, + ), + volatility, + } + } + + /// Specialized Signature for ArrayUnion and similar functions + pub fn array_and_array(volatility: Volatility) -> Self { + Signature { + type_signature: TypeSignature::ArraySignature( + ArrayFunctionSignature::Array { + arguments: vec![ + ArrayFunctionArgument::Array, + ArrayFunctionArgument::Array, + ], + array_coercion: Some(ListCoercion::FixedSizedListToList), + }, + ), + volatility, + } + } + /// Specialized Signature for Array functions with an optional index pub fn array_and_element_and_optional_index(volatility: Volatility) -> Self { Signature { @@ -898,7 +933,7 @@ impl Signature { ArrayFunctionArgument::Array, ArrayFunctionArgument::Index, ], - array_coercion: None, + array_coercion: Some(ListCoercion::FixedSizedListToList), }, ), volatility, @@ -910,7 +945,7 @@ impl Signature { type_signature: TypeSignature::ArraySignature( ArrayFunctionSignature::Array { arguments: vec![ArrayFunctionArgument::Array], - array_coercion: None, + array_coercion: Some(ListCoercion::FixedSizedListToList), }, ), volatility, diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 3b34718062eb..64da3d74119f 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -15,19 +15,21 @@ // specific language governing permissions and limitations // under the License. -use super::binary::{binary_numeric_coercion, comparison_coercion}; +use super::binary::binary_numeric_coercion; use crate::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF}; use arrow::{ compute::can_cast_types, - datatypes::{DataType, Field, TimeUnit}, + datatypes::{DataType, TimeUnit}, }; use datafusion_common::types::LogicalType; -use datafusion_common::utils::{coerced_fixed_size_list_to_list, ListCoercion}; +use datafusion_common::utils::{ + base_type, coerced_fixed_size_list_to_list, ListCoercion, +}; use datafusion_common::{ - exec_err, internal_datafusion_err, internal_err, plan_err, types::NativeType, - utils::list_ndims, Result, + exec_err, internal_err, plan_err, types::NativeType, utils::list_ndims, Result, }; use datafusion_expr_common::signature::ArrayFunctionArgument; +use datafusion_expr_common::type_coercion::binary::type_union_resolution; use datafusion_expr_common::{ signature::{ArrayFunctionSignature, FIXED_SIZE_LIST_WILDCARD, TIMEZONE_WILDCARD}, type_coercion::binary::comparison_coercion_numeric, @@ -364,98 +366,73 @@ fn get_valid_types( return Ok(vec![vec![]]); } - let array_idx = arguments.iter().enumerate().find_map(|(idx, arg)| { - if *arg == ArrayFunctionArgument::Array { - Some(idx) - } else { - None - } - }); - let Some(array_idx) = array_idx else { - return Err(internal_datafusion_err!("Function '{function_name}' expected at least one argument array argument")); - }; - let Some(array_type) = array(¤t_types[array_idx]) else { - return Ok(vec![vec![]]); - }; + let mut fixed_size = None; + let mut large_list = false; + let mut element_types = Vec::with_capacity(arguments.len()); + for (argument, current_type) in arguments.iter().zip(current_types.iter()) { + match argument { + ArrayFunctionArgument::Array => match current_type { + DataType::FixedSizeList(field, size) => { + match array_coercion { + Some(ListCoercion::FixedSizedListToList) => (), + None if fixed_size.is_none() => fixed_size = Some(*size), + None if fixed_size == Some(*size) => (), + None => fixed_size = None, + } - // We need to find the coerced base type, mainly for cases like: - // `array_append(List(null), i64)` -> `List(i64)` - let mut new_base_type = datafusion_common::utils::base_type(&array_type); - for (current_type, argument_type) in current_types.iter().zip(arguments.iter()) { - match argument_type { - ArrayFunctionArgument::Element | ArrayFunctionArgument::Array => { - new_base_type = - coerce_array_types(function_name, current_type, &new_base_type)?; + element_types.push(field.data_type().clone()) + } + DataType::List(field) => { + fixed_size = None; + element_types.push(field.data_type().clone()) + } + DataType::LargeList(field) => { + fixed_size = None; + large_list = true; + element_types.push(field.data_type().clone()) + } + DataType::Null => { + fixed_size = None; + element_types.push(DataType::Null) + } + arg_type => { + return plan_err!( + "{function_name} does not support an argument of type {arg_type}" + ) + } + }, + ArrayFunctionArgument::Element => { + element_types.push(current_type.clone()) } - ArrayFunctionArgument::Index | ArrayFunctionArgument::String => {} + ArrayFunctionArgument::Index | ArrayFunctionArgument::String => (), } } - let new_array_type = datafusion_common::utils::coerced_type_with_base_type_only( - &array_type, - &new_base_type, - array_coercion, - ); - let new_elem_type = match new_array_type { - DataType::List(ref field) - | DataType::LargeList(ref field) - | DataType::FixedSizeList(ref field, _) => field.data_type(), - _ => return Ok(vec![vec![]]), + let Some(element_type) = type_union_resolution(&element_types) else { + return plan_err!( + "Failed to unify argument types of {function_name}: {current_types:?}." + ); }; - let mut valid_types = Vec::with_capacity(arguments.len()); - for (current_type, argument_type) in current_types.iter().zip(arguments.iter()) { - let valid_type = match argument_type { - ArrayFunctionArgument::Element => new_elem_type.clone(), + let array_type = if large_list { + DataType::new_large_list(element_type.clone(), true) + } else if let Some(size) = fixed_size { + DataType::new_fixed_size_list(element_type.clone(), size, true) + } else { + DataType::new_list(element_type.clone(), true) + }; + + let valid_types = arguments.iter().zip(current_types.iter()).map( + |(argument_type, current_type)| match argument_type { + ArrayFunctionArgument::Array if current_type.is_null() => DataType::Null, + ArrayFunctionArgument::Array => array_type.clone(), + ArrayFunctionArgument::Element => element_type.clone(), ArrayFunctionArgument::Index => DataType::Int64, ArrayFunctionArgument::String => DataType::Utf8, - ArrayFunctionArgument::Array => { - let Some(current_type) = array(current_type) else { - return Ok(vec![vec![]]); - }; - let new_type = - datafusion_common::utils::coerced_type_with_base_type_only( - ¤t_type, - &new_base_type, - array_coercion, - ); - // All array arguments must be coercible to the same type - if new_type != new_array_type { - return Ok(vec![vec![]]); - } - new_type - } - }; - valid_types.push(valid_type); - } - - Ok(vec![valid_types]) - } - - fn array(array_type: &DataType) -> Option { - match array_type { - DataType::List(_) | DataType::LargeList(_) => Some(array_type.clone()), - DataType::FixedSizeList(field, _) => Some(DataType::List(Arc::clone(field))), - DataType::Null => Some(DataType::List(Arc::new(Field::new_list_field( - DataType::Int64, - true, - )))), - _ => None, - } - } + }, + ); - fn coerce_array_types( - function_name: &str, - current_type: &DataType, - base_type: &DataType, - ) -> Result { - let current_base_type = datafusion_common::utils::base_type(current_type); - let new_base_type = comparison_coercion(base_type, ¤t_base_type); - new_base_type.ok_or_else(|| { - internal_datafusion_err!( - "Function '{function_name}' does not support coercion from {base_type:?} to {current_base_type:?}" - ) - }) + Ok(vec![valid_types.collect()]) } fn recursive_array(array_type: &DataType) -> Option { @@ -800,7 +777,7 @@ pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool { /// /// Expect uni-directional coercion, for example, i32 is coerced to i64, but i64 is not coerced to i32. /// -/// Unlike [comparison_coercion], the coerced type is usually `wider` for lossless conversion. +/// Unlike [crate::binary::comparison_coercion], the coerced type is usually `wider` for lossless conversion. fn coerced_from<'a>( type_into: &'a DataType, type_from: &'a DataType, @@ -867,7 +844,7 @@ fn coerced_from<'a>( // Only accept list and largelist with the same number of dimensions unless the type is Null. // List or LargeList with different dimensions should be handled in TypeSignature or other places before this (List(_) | LargeList(_), _) - if datafusion_common::utils::base_type(type_from).eq(&Null) + if base_type(type_from).is_null() || list_ndims(type_from) == list_ndims(type_into) => { Some(type_into.clone()) @@ -906,7 +883,6 @@ fn coerced_from<'a>( #[cfg(test)] mod tests { - use crate::Volatility; use super::*; diff --git a/datafusion/functions-nested/src/cardinality.rs b/datafusion/functions-nested/src/cardinality.rs index f2f23841586c..f76361ec8c1b 100644 --- a/datafusion/functions-nested/src/cardinality.rs +++ b/datafusion/functions-nested/src/cardinality.rs @@ -23,12 +23,12 @@ use arrow::array::{ }; use arrow::datatypes::{ DataType, - DataType::{FixedSizeList, LargeList, List, Map, UInt64}, + DataType::{LargeList, List, Map, Null, UInt64}, }; use datafusion_common::cast::{as_large_list_array, as_list_array, as_map_array}; -use datafusion_common::utils::take_function_args; +use datafusion_common::exec_err; +use datafusion_common::utils::{take_function_args, ListCoercion}; use datafusion_common::Result; -use datafusion_common::{exec_err, plan_err}; use datafusion_expr::{ ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -52,7 +52,7 @@ impl Cardinality { vec![ TypeSignature::ArraySignature(ArrayFunctionSignature::Array { arguments: vec![ArrayFunctionArgument::Array], - array_coercion: None, + array_coercion: Some(ListCoercion::FixedSizedListToList), }), TypeSignature::ArraySignature(ArrayFunctionSignature::MapArray), ], @@ -103,13 +103,8 @@ impl ScalarUDFImpl for Cardinality { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) | Map(_, _) => UInt64, - _ => { - return plan_err!("The cardinality function can only accept List/LargeList/FixedSizeList/Map."); - } - }) + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(UInt64) } fn invoke_with_args( @@ -131,21 +126,22 @@ impl ScalarUDFImpl for Cardinality { /// Cardinality SQL function pub fn cardinality_inner(args: &[ArrayRef]) -> Result { let [array] = take_function_args("cardinality", args)?; - match &array.data_type() { + match array.data_type() { + Null => Ok(Arc::new(UInt64Array::from_value(0, array.len()))), List(_) => { - let list_array = as_list_array(&array)?; + let list_array = as_list_array(array)?; generic_list_cardinality::(list_array) } LargeList(_) => { - let list_array = as_large_list_array(&array)?; + let list_array = as_large_list_array(array)?; generic_list_cardinality::(list_array) } Map(_, _) => { - let map_array = as_map_array(&array)?; + let map_array = as_map_array(array)?; generic_map_cardinality(map_array) } - other => { - exec_err!("cardinality does not support type '{:?}'", other) + arg_type => { + exec_err!("cardinality does not support an argument of type '{arg_type}'") } } } diff --git a/datafusion/functions-nested/src/concat.rs b/datafusion/functions-nested/src/concat.rs index f4b9208e5c83..d98ed7c0d01f 100644 --- a/datafusion/functions-nested/src/concat.rs +++ b/datafusion/functions-nested/src/concat.rs @@ -17,30 +17,32 @@ //! [`ScalarUDFImpl`] definitions for `array_append`, `array_prepend` and `array_concat` functions. +use std::any::Any; use std::sync::Arc; -use std::{any::Any, cmp::Ordering}; +use crate::make_array::make_array_inner; +use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function}; use arrow::array::{ - Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullBufferBuilder, - OffsetSizeTrait, + Array, ArrayRef, Capacities, GenericListArray, MutableArrayData, NullArray, + NullBufferBuilder, OffsetSizeTrait, }; use arrow::buffer::OffsetBuffer; use arrow::datatypes::{DataType, Field}; -use datafusion_common::utils::ListCoercion; +use datafusion_common::utils::{ + base_type, coerced_type_with_base_type_only, ListCoercion, +}; use datafusion_common::Result; use datafusion_common::{ cast::as_generic_list_array, - exec_err, not_impl_err, plan_err, + exec_err, plan_err, utils::{list_ndims, take_function_args}, }; +use datafusion_expr::binary::type_union_resolution; use datafusion_expr::{ - ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation, - ScalarUDFImpl, Signature, TypeSignature, Volatility, + ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; use datafusion_macros::user_doc; -use crate::utils::{align_array_dimensions, check_datatypes, make_scalar_function}; - make_udf_expr_and_func!( ArrayAppend, array_append, @@ -106,7 +108,12 @@ impl ScalarUDFImpl for ArrayAppend { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(arg_types[0].clone()) + let [array_type, element_type] = take_function_args(self.name(), arg_types)?; + if array_type.is_null() { + Ok(DataType::new_list(element_type.clone(), true)) + } else { + Ok(array_type.clone()) + } } fn invoke_with_args( @@ -166,18 +173,7 @@ impl Default for ArrayPrepend { impl ArrayPrepend { pub fn new() -> Self { Self { - signature: Signature { - type_signature: TypeSignature::ArraySignature( - ArrayFunctionSignature::Array { - arguments: vec![ - ArrayFunctionArgument::Element, - ArrayFunctionArgument::Array, - ], - array_coercion: Some(ListCoercion::FixedSizedListToList), - }, - ), - volatility: Volatility::Immutable, - }, + signature: Signature::element_and_array(Volatility::Immutable), aliases: vec![ String::from("list_prepend"), String::from("array_push_front"), @@ -201,7 +197,12 @@ impl ScalarUDFImpl for ArrayPrepend { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(arg_types[1].clone()) + let [element_type, array_type] = take_function_args(self.name(), arg_types)?; + if array_type.is_null() { + Ok(DataType::new_list(element_type.clone(), true)) + } else { + Ok(array_type.clone()) + } } fn invoke_with_args( @@ -263,7 +264,7 @@ impl Default for ArrayConcat { impl ArrayConcat { pub fn new() -> Self { Self { - signature: Signature::variadic_any(Volatility::Immutable), + signature: Signature::user_defined(Volatility::Immutable), aliases: vec![ String::from("array_cat"), String::from("list_concat"), @@ -287,39 +288,43 @@ impl ScalarUDFImpl for ArrayConcat { } fn return_type(&self, arg_types: &[DataType]) -> Result { - let mut expr_type = DataType::Null; let mut max_dims = 0; + let mut large_list = false; + let mut element_types = Vec::with_capacity(arg_types.len()); for arg_type in arg_types { - let DataType::List(field) = arg_type else { - return plan_err!( - "The array_concat function can only accept list as the args." - ); - }; - if !field.data_type().equals_datatype(&DataType::Null) { - let dims = list_ndims(arg_type); - expr_type = match max_dims.cmp(&dims) { - Ordering::Greater => expr_type, - Ordering::Equal => { - if expr_type == DataType::Null { - arg_type.clone() - } else if !expr_type.equals_datatype(arg_type) { - return plan_err!( - "It is not possible to concatenate arrays of different types. Expected: {}, got: {}", expr_type, arg_type - ); - } else { - expr_type - } - } - - Ordering::Less => { - max_dims = dims; - arg_type.clone() - } - }; + match arg_type { + DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (), + DataType::LargeList(_) => large_list = true, + arg_type => { + return plan_err!( + "{} does not support an argument of type {arg_type}", + self.name() + ) + } } + + max_dims = max_dims.max(list_ndims(arg_type)); + element_types.push(base_type(arg_type)) } - Ok(expr_type) + if max_dims == 0 { + Ok(DataType::Null) + } else if let Some(mut return_type) = type_union_resolution(&element_types) { + for _ in 1..max_dims { + return_type = DataType::new_list(return_type, true) + } + + if large_list { + Ok(DataType::new_large_list(return_type, true)) + } else { + Ok(DataType::new_list(return_type, true)) + } + } else { + plan_err!( + "Failed to unify argument types of {}: {arg_types:?}", + self.name() + ) + } } fn invoke_with_args( @@ -333,6 +338,16 @@ impl ScalarUDFImpl for ArrayConcat { &self.aliases } + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + let base_type = base_type(&self.return_type(arg_types)?); + let coercion = Some(&ListCoercion::FixedSizedListToList); + let arg_types = arg_types.iter().map(|arg_type| { + coerced_type_with_base_type_only(arg_type, &base_type, coercion) + }); + + Ok(arg_types.collect()) + } + fn documentation(&self) -> Option<&Documentation> { self.doc() } @@ -341,24 +356,27 @@ impl ScalarUDFImpl for ArrayConcat { /// Array_concat/Array_cat SQL function pub(crate) fn array_concat_inner(args: &[ArrayRef]) -> Result { if args.is_empty() { - return exec_err!("array_concat expects at least one arguments"); + return exec_err!("array_concat expects at least one argument"); } - let mut new_args = vec![]; + let mut all_null = true; + let mut large_list = false; for arg in args { - let ndim = list_ndims(arg.data_type()); - let base_type = datafusion_common::utils::base_type(arg.data_type()); - if ndim == 0 { - return not_impl_err!("Array is not type '{base_type:?}'."); - } - if !base_type.eq(&DataType::Null) { - new_args.push(Arc::clone(arg)); + match arg.data_type() { + DataType::Null => continue, + DataType::LargeList(_) => large_list = true, + _ => (), } + + all_null = false } - match &args[0].data_type() { - DataType::LargeList(_) => concat_internal::(new_args.as_slice()), - _ => concat_internal::(new_args.as_slice()), + if all_null { + Ok(Arc::new(NullArray::new(args[0].len()))) + } else if large_list { + concat_internal::(args) + } else { + concat_internal::(args) } } @@ -427,21 +445,29 @@ fn concat_internal(args: &[ArrayRef]) -> Result { /// Array_append SQL function pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result { - let [array, _] = take_function_args("array_append", args)?; + let [array, values] = take_function_args("array_append", args)?; match array.data_type() { + DataType::Null => make_array_inner(&[Arc::clone(values)]), + DataType::List(_) => general_append_and_prepend::(args, true), DataType::LargeList(_) => general_append_and_prepend::(args, true), - _ => general_append_and_prepend::(args, true), + arg_type => { + exec_err!("array_append does not support an argument of type {arg_type}") + } } } /// Array_prepend SQL function pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result { - let [_, array] = take_function_args("array_prepend", args)?; + let [values, array] = take_function_args("array_prepend", args)?; match array.data_type() { + DataType::Null => make_array_inner(&[Arc::clone(values)]), + DataType::List(_) => general_append_and_prepend::(args, false), DataType::LargeList(_) => general_append_and_prepend::(args, false), - _ => general_append_and_prepend::(args, false), + arg_type => { + exec_err!("array_prepend does not support an argument of type {arg_type}") + } } } diff --git a/datafusion/functions-nested/src/dimension.rs b/datafusion/functions-nested/src/dimension.rs index a7d033641413..96da50620a75 100644 --- a/datafusion/functions-nested/src/dimension.rs +++ b/datafusion/functions-nested/src/dimension.rs @@ -19,18 +19,20 @@ use arrow::array::{ Array, ArrayRef, GenericListArray, ListArray, OffsetSizeTrait, UInt64Array, + UInt64Builder, }; use arrow::datatypes::{ DataType, - DataType::{FixedSizeList, LargeList, List, UInt64}, - Field, UInt64Type, + DataType::{LargeList, List, Null, UInt64}, + UInt64Type, }; use std::any::Any; use datafusion_common::cast::{as_large_list_array, as_list_array}; -use datafusion_common::{exec_err, plan_err, utils::take_function_args, Result}; +use datafusion_common::{exec_err, utils::take_function_args, Result}; use crate::utils::{compute_array_dims, make_scalar_function}; +use datafusion_common::utils::list_ndims; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; @@ -95,15 +97,8 @@ impl ScalarUDFImpl for ArrayDims { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => { - List(Arc::new(Field::new_list_field(UInt64, true))) - } - _ => { - return plan_err!("The array_dims function can only accept List/LargeList/FixedSizeList."); - } - }) + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::new_list(UInt64, true)) } fn invoke_with_args( @@ -174,13 +169,8 @@ impl ScalarUDFImpl for ArrayNdims { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => UInt64, - _ => { - return plan_err!("The array_ndims function can only accept List/LargeList/FixedSizeList."); - } - }) + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(UInt64) } fn invoke_with_args( @@ -204,59 +194,57 @@ pub fn array_dims_inner(args: &[ArrayRef]) -> Result { let [array] = take_function_args("array_dims", args)?; let data = match array.data_type() { - List(_) => { - let array = as_list_array(&array)?; - array - .iter() - .map(compute_array_dims) - .collect::>>()? - } - LargeList(_) => { - let array = as_large_list_array(&array)?; - array - .iter() - .map(compute_array_dims) - .collect::>>()? - } - array_type => { - return exec_err!("array_dims does not support type '{array_type:?}'"); + List(_) => as_list_array(&array)? + .iter() + .map(compute_array_dims) + .collect::>>()?, + LargeList(_) => as_large_list_array(&array)? + .iter() + .map(compute_array_dims) + .collect::>>()?, + arg_type => { + return exec_err!( + "array_dims does not support an argument of type {arg_type}" + ); } }; let result = ListArray::from_iter_primitive::(data); - - Ok(Arc::new(result) as ArrayRef) + Ok(Arc::new(result)) } /// Array_ndims SQL function pub fn array_ndims_inner(args: &[ArrayRef]) -> Result { - let [array_dim] = take_function_args("array_ndims", args)?; + let [array] = take_function_args("array_ndims", args)?; fn general_list_ndims( array: &GenericListArray, ) -> Result { - let mut data = Vec::new(); - let ndims = datafusion_common::utils::list_ndims(array.data_type()); - + let mut builder = UInt64Builder::with_capacity(array.len()); + let ndims = list_ndims(array.data_type()); for arr in array.iter() { if arr.is_some() { - data.push(Some(ndims)) + builder.append_value(ndims) } else { - data.push(None) + builder.append_null() } } - Ok(Arc::new(UInt64Array::from(data)) as ArrayRef) + Ok(Arc::new(builder.finish())) } - match array_dim.data_type() { + + match array.data_type() { + Null => Ok(Arc::new(UInt64Array::new_null(array.len()))), List(_) => { - let array = as_list_array(&array_dim)?; + let array = as_list_array(array)?; general_list_ndims::(array) } LargeList(_) => { - let array = as_large_list_array(&array_dim)?; + let array = as_large_list_array(array)?; general_list_ndims::(array) } - array_type => exec_err!("array_ndims does not support type {array_type:?}"), + arg_type => { + exec_err!("array_ndims does not support an argument of type type {arg_type}") + } } } diff --git a/datafusion/functions-nested/src/distance.rs b/datafusion/functions-nested/src/distance.rs index cfc7fccdd70c..0190f2b886d3 100644 --- a/datafusion/functions-nested/src/distance.rs +++ b/datafusion/functions-nested/src/distance.rs @@ -23,21 +23,22 @@ use arrow::array::{ }; use arrow::datatypes::{ DataType, - DataType::{FixedSizeList, Float64, LargeList, List}, + DataType::{FixedSizeList, LargeList, List, Null}, }; use datafusion_common::cast::{ as_float32_array, as_float64_array, as_generic_list_array, as_int32_array, as_int64_array, }; -use datafusion_common::utils::coerced_fixed_size_list_to_list; +use datafusion_common::utils::{coerced_type_with_base_type_only, ListCoercion}; use datafusion_common::{ - exec_err, internal_datafusion_err, utils::take_function_args, Result, + exec_err, internal_datafusion_err, plan_err, utils::take_function_args, Result, }; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; use datafusion_functions::{downcast_arg, downcast_named_arg}; use datafusion_macros::user_doc; +use itertools::Itertools; use std::any::Any; use std::sync::Arc; @@ -104,24 +105,29 @@ impl ScalarUDFImpl for ArrayDistance { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => Ok(Float64), - _ => exec_err!("The array_distance function can only accept List/LargeList/FixedSizeList."), - } + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) } fn coerce_types(&self, arg_types: &[DataType]) -> Result> { let [_, _] = take_function_args(self.name(), arg_types)?; - let mut result = Vec::new(); - for arg_type in arg_types { - match arg_type { - List(_) | LargeList(_) | FixedSizeList(_, _) => result.push(coerced_fixed_size_list_to_list(arg_type)), - _ => return exec_err!("The array_distance function can only accept List/LargeList/FixedSizeList."), + let coercion = Some(&ListCoercion::FixedSizedListToList); + let arg_types = arg_types.iter().map(|arg_type| { + if matches!(arg_type, Null | List(_) | LargeList(_) | FixedSizeList(..)) { + Ok(coerced_type_with_base_type_only( + arg_type, + &DataType::Float64, + coercion, + )) + } else { + plan_err!( + "{} does not support an argument of type {arg_type}", + self.name() + ) } - } + }); - Ok(result) + arg_types.try_collect() } fn invoke_with_args( @@ -143,11 +149,11 @@ impl ScalarUDFImpl for ArrayDistance { pub fn array_distance_inner(args: &[ArrayRef]) -> Result { let [array1, array2] = take_function_args("array_distance", args)?; - match (&array1.data_type(), &array2.data_type()) { + match (array1.data_type(), array2.data_type()) { (List(_), List(_)) => general_array_distance::(args), (LargeList(_), LargeList(_)) => general_array_distance::(args), - (array_type1, array_type2) => { - exec_err!("array_distance does not support types '{array_type1:?}' and '{array_type2:?}'") + (arg_type1, arg_type2) => { + exec_err!("array_distance does not support arguments of type {arg_type1} and {arg_type2:?}") } } } @@ -243,7 +249,7 @@ fn compute_array_distance( /// Converts an array of any numeric type to a Float64Array. fn convert_to_f64_array(array: &ArrayRef) -> Result { match array.data_type() { - Float64 => Ok(as_float64_array(array)?.clone()), + DataType::Float64 => Ok(as_float64_array(array)?.clone()), DataType::Float32 => { let array = as_float32_array(array)?; let converted: Float64Array = diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 321dda55ce09..6078cd36dae6 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -19,12 +19,12 @@ use arrow::array::{ Array, ArrayRef, ArrowNativeTypeOp, Capacities, GenericListArray, Int64Array, - MutableArrayData, NullBufferBuilder, OffsetSizeTrait, + MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait, }; use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType; use arrow::datatypes::{ - DataType::{FixedSizeList, LargeList, List}, + DataType::{FixedSizeList, LargeList, List, Null}, Field, }; use datafusion_common::cast::as_int64_array; @@ -163,12 +163,11 @@ impl ScalarUDFImpl for ArrayElement { fn return_type(&self, arg_types: &[DataType]) -> Result { match &arg_types[0] { - List(field) - | LargeList(field) - | FixedSizeList(field, _) => Ok(field.data_type().clone()), - DataType::Null => Ok(List(Arc::new(Field::new_list_field(DataType::Int64, true)))), - _ => plan_err!( - "ArrayElement can only accept List, LargeList or FixedSizeList as the first argument" + Null => Ok(Null), + List(field) | LargeList(field) => Ok(field.data_type().clone()), + arg_type => plan_err!( + "{} does not support an argument of type {arg_type}", + self.name() ), } } @@ -200,6 +199,7 @@ fn array_element_inner(args: &[ArrayRef]) -> Result { let [array, indexes] = take_function_args("array_element", args)?; match &array.data_type() { + Null => Ok(Arc::new(NullArray::new(array.len()))), List(_) => { let array = as_list_array(&array)?; let indexes = as_int64_array(&indexes)?; @@ -210,10 +210,9 @@ fn array_element_inner(args: &[ArrayRef]) -> Result { let indexes = as_int64_array(&indexes)?; general_array_element::(array, indexes) } - _ => exec_err!( - "array_element does not support type: {:?}", - array.data_type() - ), + arg_type => { + exec_err!("array_element does not support an argument of type {arg_type}") + } } } diff --git a/datafusion/functions-nested/src/make_array.rs b/datafusion/functions-nested/src/make_array.rs index 4daaafc5a888..babb03919157 100644 --- a/datafusion/functions-nested/src/make_array.rs +++ b/datafusion/functions-nested/src/make_array.rs @@ -28,10 +28,7 @@ use arrow::array::{ }; use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType; -use arrow::datatypes::{ - DataType::{List, Null}, - Field, -}; +use arrow::datatypes::{DataType::Null, Field}; use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_common::{plan_err, Result}; use datafusion_expr::binary::{ @@ -105,16 +102,14 @@ impl ScalarUDFImpl for MakeArray { } fn return_type(&self, arg_types: &[DataType]) -> Result { - match arg_types.len() { - 0 => Ok(empty_array_type()), - _ => { - // At this point, all the type in array should be coerced to the same one - Ok(List(Arc::new(Field::new_list_field( - arg_types[0].to_owned(), - true, - )))) - } - } + let element_type = if arg_types.is_empty() { + Null + } else { + // At this point, all the type in array should be coerced to the same one. + arg_types[0].to_owned() + }; + + Ok(DataType::new_list(element_type, true)) } fn invoke_with_args( @@ -129,26 +124,16 @@ impl ScalarUDFImpl for MakeArray { } fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - let mut errors = vec![]; - match try_type_union_resolution_with_struct(arg_types) { - Ok(r) => return Ok(r), - Err(e) => { - errors.push(e); - } + if let Ok(unified) = try_type_union_resolution_with_struct(arg_types) { + return Ok(unified); } - if let Some(new_type) = type_union_resolution(arg_types) { - if new_type.is_null() { - Ok(vec![DataType::Int64; arg_types.len()]) - } else { - Ok(vec![new_type; arg_types.len()]) - } + if let Some(unified) = type_union_resolution(arg_types) { + Ok(vec![unified; arg_types.len()]) } else { plan_err!( - "Fail to find the valid type between {:?} for {}, errors are {:?}", - arg_types, - self.name(), - errors + "Failed to unify argument types of {}: {arg_types:?}", + self.name() ) } } @@ -158,35 +143,25 @@ impl ScalarUDFImpl for MakeArray { } } -// Empty array is a special case that is useful for many other array functions -pub(super) fn empty_array_type() -> DataType { - List(Arc::new(Field::new_list_field(DataType::Int64, true))) -} - /// `make_array_inner` is the implementation of the `make_array` function. /// Constructs an array using the input `data` as `ArrayRef`. /// Returns a reference-counted `Array` instance result. pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result { - let mut data_type = Null; - for arg in arrays { - let arg_data_type = arg.data_type(); - if !arg_data_type.equals_datatype(&Null) { - data_type = arg_data_type.clone(); - break; - } - } + let data_type = arrays.iter().find_map(|arg| { + let arg_type = arg.data_type(); + (!arg_type.is_null()).then_some(arg_type) + }); - match data_type { + let data_type = data_type.unwrap_or(&Null); + if data_type.is_null() { // Either an empty array or all nulls: - Null => { - let length = arrays.iter().map(|a| a.len()).sum(); - // By default Int64 - let array = new_null_array(&DataType::Int64, length); - Ok(Arc::new( - SingleRowListArrayBuilder::new(array).build_list_array(), - )) - } - _ => array_array::(arrays, data_type), + let length = arrays.iter().map(|a| a.len()).sum(); + let array = new_null_array(&Null, length); + Ok(Arc::new( + SingleRowListArrayBuilder::new(array).build_list_array(), + )) + } else { + array_array::(arrays, data_type.clone()) } } diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index a67945b1f1e1..9a5428633f8d 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -17,9 +17,11 @@ //! [`ScalarUDFImpl`] definitions for array_union, array_intersect and array_distinct functions. -use crate::make_array::{empty_array_type, make_array_inner}; use crate::utils::make_scalar_function; -use arrow::array::{new_empty_array, Array, ArrayRef, GenericListArray, OffsetSizeTrait}; +use arrow::array::{ + new_null_array, Array, ArrayRef, GenericListArray, LargeListArray, ListArray, + OffsetSizeTrait, +}; use arrow::buffer::OffsetBuffer; use arrow::compute; use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null}; @@ -104,7 +106,7 @@ impl Default for ArrayUnion { impl ArrayUnion { pub fn new() -> Self { Self { - signature: Signature::any(2, Volatility::Immutable), + signature: Signature::array_and_array(Volatility::Immutable), aliases: vec![String::from("list_union")], } } @@ -124,8 +126,10 @@ impl ScalarUDFImpl for ArrayUnion { } fn return_type(&self, arg_types: &[DataType]) -> Result { - match (&arg_types[0], &arg_types[1]) { - (&Null, dt) => Ok(dt.clone()), + let [array1, array2] = take_function_args(self.name(), arg_types)?; + match (array1, array2) { + (Null, Null) => Ok(DataType::new_list(Null, true)), + (Null, dt) => Ok(dt.clone()), (dt, Null) => Ok(dt.clone()), (dt, _) => Ok(dt.clone()), } @@ -183,7 +187,7 @@ pub(super) struct ArrayIntersect { impl ArrayIntersect { pub fn new() -> Self { Self { - signature: Signature::any(2, Volatility::Immutable), + signature: Signature::array_and_array(Volatility::Immutable), aliases: vec![String::from("list_intersect")], } } @@ -203,10 +207,12 @@ impl ScalarUDFImpl for ArrayIntersect { } fn return_type(&self, arg_types: &[DataType]) -> Result { - match (arg_types[0].clone(), arg_types[1].clone()) { - (Null, Null) | (Null, _) => Ok(Null), - (_, Null) => Ok(empty_array_type()), - (dt, _) => Ok(dt), + let [array1, array2] = take_function_args(self.name(), arg_types)?; + match (array1, array2) { + (Null, Null) => Ok(DataType::new_list(Null, true)), + (Null, dt) => Ok(dt.clone()), + (dt, Null) => Ok(dt.clone()), + (dt, _) => Ok(dt.clone()), } } @@ -347,80 +353,76 @@ fn generic_set_lists( field: Arc, set_op: SetOp, ) -> Result { - if matches!(l.value_type(), Null) { + if l.is_empty() || l.value_type().is_null() { let field = Arc::new(Field::new_list_field(r.value_type(), true)); return general_array_distinct::(r, &field); - } else if matches!(r.value_type(), Null) { + } else if r.is_empty() || r.value_type().is_null() { let field = Arc::new(Field::new_list_field(l.value_type(), true)); return general_array_distinct::(l, &field); } - // Handle empty array at rhs case - // array_union(arr, []) -> arr; - // array_intersect(arr, []) -> []; - if r.value_length(0).is_zero() { - if set_op == SetOp::Union { - return Ok(Arc::new(l.clone()) as ArrayRef); - } else { - return Ok(Arc::new(r.clone()) as ArrayRef); - } - } - if l.value_type() != r.value_type() { return internal_err!("{set_op:?} is not implemented for '{l:?}' and '{r:?}'"); } - let dt = l.value_type(); - let mut offsets = vec![OffsetSize::usize_as(0)]; let mut new_arrays = vec![]; - - let converter = RowConverter::new(vec![SortField::new(dt)])?; + let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; for (first_arr, second_arr) in l.iter().zip(r.iter()) { - if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) { - let l_values = converter.convert_columns(&[first_arr])?; - let r_values = converter.convert_columns(&[second_arr])?; - - let l_iter = l_values.iter().sorted().dedup(); - let values_set: HashSet<_> = l_iter.clone().collect(); - let mut rows = if set_op == SetOp::Union { - l_iter.collect::>() - } else { - vec![] - }; - for r_val in r_values.iter().sorted().dedup() { - match set_op { - SetOp::Union => { - if !values_set.contains(&r_val) { - rows.push(r_val); - } + let l_values = if let Some(first_arr) = first_arr { + converter.convert_columns(&[first_arr])? + } else { + converter.convert_columns(&[])? + }; + + let r_values = if let Some(second_arr) = second_arr { + converter.convert_columns(&[second_arr])? + } else { + converter.convert_columns(&[])? + }; + + let l_iter = l_values.iter().sorted().dedup(); + let values_set: HashSet<_> = l_iter.clone().collect(); + let mut rows = if set_op == SetOp::Union { + l_iter.collect() + } else { + vec![] + }; + + for r_val in r_values.iter().sorted().dedup() { + match set_op { + SetOp::Union => { + if !values_set.contains(&r_val) { + rows.push(r_val); } - SetOp::Intersect => { - if values_set.contains(&r_val) { - rows.push(r_val); - } + } + SetOp::Intersect => { + if values_set.contains(&r_val) { + rows.push(r_val); } } } - - let last_offset = match offsets.last().copied() { - Some(offset) => offset, - None => return internal_err!("offsets should not be empty"), - }; - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => Arc::clone(array), - None => { - return internal_err!("{set_op}: failed to get array from rows"); - } - }; - new_arrays.push(array); } + + let last_offset = match offsets.last() { + Some(offset) => *offset, + None => return internal_err!("offsets should not be empty"), + }; + + offsets.push(last_offset + OffsetSize::usize_as(rows.len())); + let arrays = converter.convert_rows(rows)?; + let array = match arrays.first() { + Some(array) => Arc::clone(array), + None => { + return internal_err!("{set_op}: failed to get array from rows"); + } + }; + + new_arrays.push(array); } let offsets = OffsetBuffer::new(offsets.into()); - let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); + let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect(); let values = compute::concat(&new_arrays_ref)?; let arr = GenericListArray::::try_new(field, offsets, values, None)?; Ok(Arc::new(arr)) @@ -431,38 +433,59 @@ fn general_set_op( array2: &ArrayRef, set_op: SetOp, ) -> Result { + fn empty_array(data_type: &DataType, len: usize, large: bool) -> Result { + let field = Arc::new(Field::new_list_field(data_type.clone(), true)); + let values = new_null_array(data_type, len); + if large { + Ok(Arc::new(LargeListArray::try_new( + field, + OffsetBuffer::new_zeroed(len), + values, + None, + )?)) + } else { + Ok(Arc::new(ListArray::try_new( + field, + OffsetBuffer::new_zeroed(len), + values, + None, + )?)) + } + } + match (array1.data_type(), array2.data_type()) { + (Null, Null) => Ok(Arc::new(ListArray::new_null( + Arc::new(Field::new_list_field(Null, true)), + array1.len(), + ))), (Null, List(field)) => { if set_op == SetOp::Intersect { - return Ok(new_empty_array(&Null)); + return empty_array(field.data_type(), array1.len(), false); } let array = as_list_array(&array2)?; general_array_distinct::(array, field) } - (List(field), Null) => { if set_op == SetOp::Intersect { - return make_array_inner(&[]); + return empty_array(field.data_type(), array1.len(), false); } let array = as_list_array(&array1)?; general_array_distinct::(array, field) } (Null, LargeList(field)) => { if set_op == SetOp::Intersect { - return Ok(new_empty_array(&Null)); + return empty_array(field.data_type(), array1.len(), true); } let array = as_large_list_array(&array2)?; general_array_distinct::(array, field) } (LargeList(field), Null) => { if set_op == SetOp::Intersect { - return make_array_inner(&[]); + return empty_array(field.data_type(), array1.len(), true); } let array = as_large_list_array(&array1)?; general_array_distinct::(array, field) } - (Null, Null) => Ok(new_empty_array(&Null)), - (List(field), List(_)) => { let array1 = as_list_array(&array1)?; let array2 = as_list_array(&array2)?; diff --git a/datafusion/functions-nested/src/sort.rs b/datafusion/functions-nested/src/sort.rs index 85737ef135bc..f2d6df837dc3 100644 --- a/datafusion/functions-nested/src/sort.rs +++ b/datafusion/functions-nested/src/sort.rs @@ -21,11 +21,11 @@ use crate::utils::make_scalar_function; use arrow::array::{new_null_array, Array, ArrayRef, ListArray, NullBufferBuilder}; use arrow::buffer::OffsetBuffer; use arrow::compute::SortColumn; -use arrow::datatypes::DataType::{FixedSizeList, LargeList, List}; use arrow::datatypes::{DataType, Field}; use arrow::{compute, compute::SortOptions}; use datafusion_common::cast::{as_list_array, as_string_array}; -use datafusion_common::{exec_err, Result}; +use datafusion_common::utils::ListCoercion; +use datafusion_common::{exec_err, plan_err, Result}; use datafusion_expr::{ ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -93,14 +93,14 @@ impl ArraySort { vec![ TypeSignature::ArraySignature(ArrayFunctionSignature::Array { arguments: vec![ArrayFunctionArgument::Array], - array_coercion: None, + array_coercion: Some(ListCoercion::FixedSizedListToList), }), TypeSignature::ArraySignature(ArrayFunctionSignature::Array { arguments: vec![ ArrayFunctionArgument::Array, ArrayFunctionArgument::String, ], - array_coercion: None, + array_coercion: Some(ListCoercion::FixedSizedListToList), }), TypeSignature::ArraySignature(ArrayFunctionSignature::Array { arguments: vec![ @@ -108,7 +108,7 @@ impl ArraySort { ArrayFunctionArgument::String, ArrayFunctionArgument::String, ], - array_coercion: None, + array_coercion: Some(ListCoercion::FixedSizedListToList), }), ], Volatility::Immutable, @@ -133,17 +133,16 @@ impl ScalarUDFImpl for ArraySort { fn return_type(&self, arg_types: &[DataType]) -> Result { match &arg_types[0] { - List(field) | FixedSizeList(field, _) => Ok(List(Arc::new( - Field::new_list_field(field.data_type().clone(), true), - ))), - LargeList(field) => Ok(LargeList(Arc::new(Field::new_list_field( - field.data_type().clone(), - true, - )))), DataType::Null => Ok(DataType::Null), - _ => exec_err!( - "Not reachable, data_type should be List, LargeList or FixedSizeList" - ), + DataType::List(field) => { + Ok(DataType::new_list(field.data_type().clone(), true)) + } + arg_type => { + plan_err!( + "{} does not support an argument of type {arg_type}", + self.name() + ) + } } } @@ -169,6 +168,10 @@ pub fn array_sort_inner(args: &[ArrayRef]) -> Result { return exec_err!("array_sort expects one to three arguments"); } + if args[0].data_type().is_null() { + return Ok(Arc::clone(&args[0])); + } + if args[1..].iter().any(|array| array.is_null(0)) { return Ok(new_null_array(args[0].data_type(), args[0].len())); } @@ -195,7 +198,7 @@ pub fn array_sort_inner(args: &[ArrayRef]) -> Result { let list_array = as_list_array(&args[0])?; let row_count = list_array.len(); - if row_count == 0 { + if row_count == 0 || list_array.value_type().is_null() { return Ok(Arc::clone(&args[0])); } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index f9bbcedff5ee..0c1a827ba326 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -1204,7 +1204,7 @@ select array_element([1, 2], NULL); ---- NULL -query I +query ? select array_element(NULL, 2); ---- NULL @@ -1448,7 +1448,7 @@ select array_max(make_array(5, 3, 4, NULL, 6, NULL)); ---- 6 -query I +query ? select array_max(make_array(NULL, NULL)); ---- NULL @@ -1512,7 +1512,7 @@ select array_max(arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int64)')), ar ---- 3 1 -query I +query ? select array_max(make_array()); ---- NULL @@ -2177,7 +2177,7 @@ select array_any_value(1), array_any_value('a'), array_any_value(NULL); # array_any_value scalar function #1 (with null and non-null elements) -query ITII +query IT?I select array_any_value(make_array(NULL, 1, 2, 3, 4, 5)), array_any_value(make_array(NULL, 'h', 'e', 'l', 'l', 'o')), array_any_value(make_array(NULL, NULL)), array_any_value(make_array(NULL, NULL, 1, 2, 3)); ---- 1 h NULL 1 @@ -2435,11 +2435,15 @@ select array_append(null, 1); ---- [1] -query error +query ? select array_append(null, [2, 3]); +---- +[[2, 3]] -query error +query ? select array_append(null, [[4]]); +---- +[[[4]]] query ???? select @@ -2716,8 +2720,10 @@ select array_prepend(null, [[1,2,3]]); # DuckDB: [[]] # ClickHouse: [[]] # TODO: We may also return [[]] -query error +query ? select array_prepend([], []); +---- +[[]] query ? select array_prepend(null, null); @@ -3080,22 +3086,26 @@ select array_concat( ---- [1, 2, 3] -# Concatenating Mixed types (doesn't work) -query error DataFusion error: Error during planning: It is not possible to concatenate arrays of different types\. Expected: List\(Field \{ name: "item", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\), got: List\(Field \{ name: "item", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\) +# Concatenating Mixed types +query ? select array_concat( [arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], [arrow_cast('3', 'LargeUtf8')] ); +---- +[1, 2, 3] -# Concatenating Mixed types (doesn't work) -query error DataFusion error: Error during planning: It is not possible to concatenate arrays of different types\. Expected: List\(Field \{ name: "item", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\), got: List\(Field \{ name: "item", data_type: Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: \{\} \}\) +# Concatenating Mixed types +query ? select array_concat( [arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], [arrow_cast('3', 'Utf8View')] ); +---- +[1, 2, 3] # array_concat error -query error DataFusion error: Error during planning: The array_concat function can only accept list as the args\. +query error DataFusion error: Error during planning: Execution error: Function 'array_concat' user-defined coercion failed with "Error during planning: array_concat does not support an argument of type Int64" select array_concat(1, 2); # array_concat scalar function #1 @@ -3406,15 +3416,11 @@ SELECT array_position(arrow_cast([1, 1, 100, 1, 1], 'LargeList(Int32)'), 100) ---- 3 -query I +query error DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to 'array_position' function: coercion from SELECT array_position([1, 2, 3], 'foo') ----- -NULL -query I +query error DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to 'array_position' function: coercion from SELECT array_position([1, 2, 3], 'foo', 2) ----- -NULL # list_position scalar function #5 (function alias `array_position`) query III @@ -4376,7 +4382,8 @@ select array_union(arrow_cast([1, 2, 3, 4], 'LargeList(Int64)'), arrow_cast([5, statement ok CREATE TABLE arrays_with_repeating_elements_for_union AS VALUES - ([1], [2]), + ([0, 1, 1], []), + ([1, 1], [2]), ([2, 3], [3]), ([3], [3, 4]) ; @@ -4384,6 +4391,7 @@ AS VALUES query ? select array_union(column1, column2) from arrays_with_repeating_elements_for_union; ---- +[0, 1] [1, 2] [2, 3] [3, 4] @@ -4391,6 +4399,7 @@ select array_union(column1, column2) from arrays_with_repeating_elements_for_uni query ? select array_union(arrow_cast(column1, 'LargeList(Int64)'), arrow_cast(column2, 'LargeList(Int64)')) from arrays_with_repeating_elements_for_union; ---- +[0, 1] [1, 2] [2, 3] [3, 4] @@ -4413,12 +4422,10 @@ select array_union(arrow_cast([], 'LargeList(Int64)'), arrow_cast([], 'LargeList query ? select array_union([[null]], []); ---- -[[NULL]] +[[]] -query ? +query error DataFusion error: Error during planning: Failed to unify argument types of array_union: select array_union(arrow_cast([[null]], 'LargeList(List(Int64))'), arrow_cast([], 'LargeList(Int64)')); ----- -[[NULL]] # array_union scalar function #8 query ? @@ -6428,12 +6435,12 @@ select array_intersect(arrow_cast([1, 1, 2, 2, 3, 3], 'LargeList(Int64)'), null) query ? select array_intersect(null, [1, 1, 2, 2, 3, 3]); ---- -NULL +[] query ? select array_intersect(null, arrow_cast([1, 1, 2, 2, 3, 3], 'LargeList(Int64)')); ---- -NULL +[] query ? select array_intersect([], null); @@ -6458,12 +6465,12 @@ select array_intersect(arrow_cast([], 'LargeList(Int64)'), null); query ? select array_intersect(null, []); ---- -NULL +[] query ? select array_intersect(null, arrow_cast([], 'LargeList(Int64)')); ---- -NULL +[] query ? select array_intersect(null, null); diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index b9c13582952a..92e6f9995ae3 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -91,12 +91,12 @@ select * from unnest(null); ## Unnest empty array in select list -query I +query ? select unnest([]); ---- ## Unnest empty array in from clause -query I +query ? select * from unnest([]); ---- @@ -243,7 +243,7 @@ query error DataFusion error: This feature is not implemented: unnest\(\) does n select unnest(null) from unnest_table; ## Multiple unnest functions in selection -query II +query ?I select unnest([]), unnest(NULL::int[]); ---- @@ -263,10 +263,10 @@ NULL 10 NULL NULL NULL 17 NULL NULL 18 -query IIIT -select - unnest(column1), unnest(column2) + 2, - column3 * 10, unnest(array_remove(column1, '4')) +query IIII +select + unnest(column1), unnest(column2) + 2, + column3 * 10, unnest(array_remove(column1, 4)) from unnest_table; ---- 1 9 10 1 @@ -316,7 +316,7 @@ select * from unnest( 2 b NULL NULL NULL c NULL NULL -query II +query ?I select * from unnest([], NULL::int[]); ---- From 7a0323550c1a995e42f7a0017ac775128f8f6d88 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Thu, 13 Mar 2025 17:02:07 +0100 Subject: [PATCH 2/7] Fix array_sort for empty record batch --- datafusion/functions-nested/src/sort.rs | 12 ++++++------ datafusion/sqllogictest/test_files/array.slt | 5 +++++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-nested/src/sort.rs b/datafusion/functions-nested/src/sort.rs index f2d6df837dc3..f9bfe71554ae 100644 --- a/datafusion/functions-nested/src/sort.rs +++ b/datafusion/functions-nested/src/sort.rs @@ -172,6 +172,12 @@ pub fn array_sort_inner(args: &[ArrayRef]) -> Result { return Ok(Arc::clone(&args[0])); } + let list_array = as_list_array(&args[0])?; + let row_count = list_array.len(); + if row_count == 0 || list_array.value_type().is_null() { + return Ok(Arc::clone(&args[0])); + } + if args[1..].iter().any(|array| array.is_null(0)) { return Ok(new_null_array(args[0].data_type(), args[0].len())); } @@ -196,12 +202,6 @@ pub fn array_sort_inner(args: &[ArrayRef]) -> Result { _ => return exec_err!("array_sort expects 1 to 3 arguments"), }; - let list_array = as_list_array(&args[0])?; - let row_count = list_array.len(); - if row_count == 0 || list_array.value_type().is_null() { - return Ok(Arc::clone(&args[0])); - } - let mut array_lengths = vec![]; let mut arrays = vec![]; let mut valid = NullBufferBuilder::new(row_count); diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 0c1a827ba326..c4f86f9e0129 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -2348,6 +2348,11 @@ NULL [NULL, 51, 52, 54, 55, 56, 57, 58, 59, 60] [61, 62, 63, 64, 65, 66, 67, 68, 69, 70] +# test with empty table +query ? +select array_sort(column1, 'DESC', 'NULLS FIRST') from arrays_values where false; +---- + # test with empty array query ? select array_sort([]); From c2241266d922196fb82041a70edb724401b35844 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Sun, 16 Mar 2025 08:38:27 +0100 Subject: [PATCH 3/7] Fix get_valid_types for FixedSizeLists --- datafusion/expr-common/src/signature.rs | 30 +-- .../expr/src/type_coercion/functions.rs | 222 +++++++++++++++--- datafusion/functions-nested/src/set_ops.rs | 4 +- datafusion/sqllogictest/test_files/array.slt | 2 +- 4 files changed, 196 insertions(+), 62 deletions(-) diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index 32a094e8e9d3..f8b2ba1d05ab 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -852,7 +852,7 @@ impl Signature { } } - /// Specialized Signature for ArrayAppend and similar functions + /// Specialized [Signature] for ArrayAppend and similar functions. pub fn array_and_element(volatility: Volatility) -> Self { Signature { type_signature: TypeSignature::ArraySignature( @@ -868,7 +868,7 @@ impl Signature { } } - /// Specialized Signature for ArrayPrepend and similar functions + /// Specialized [Signature] for ArrayPrepend and similar functions. pub fn element_and_array(volatility: Volatility) -> Self { Signature { type_signature: TypeSignature::ArraySignature( @@ -884,15 +884,12 @@ impl Signature { } } - /// Specialized Signature for ArrayUnion and similar functions - pub fn array_and_array(volatility: Volatility) -> Self { + /// Specialized [Signature] for functions that take a fixed number of arrays. + pub fn arrays(n: usize, volatility: Volatility) -> Self { Signature { type_signature: TypeSignature::ArraySignature( ArrayFunctionSignature::Array { - arguments: vec![ - ArrayFunctionArgument::Array, - ArrayFunctionArgument::Array, - ], + arguments: vec![ArrayFunctionArgument::Array; n], array_coercion: Some(ListCoercion::FixedSizedListToList), }, ), @@ -900,7 +897,7 @@ impl Signature { } } - /// Specialized Signature for Array functions with an optional index + /// Specialized [Signature] for Array functions with an optional index. pub fn array_and_element_and_optional_index(volatility: Volatility) -> Self { Signature { type_signature: TypeSignature::OneOf(vec![ @@ -924,7 +921,7 @@ impl Signature { } } - /// Specialized Signature for ArrayElement and similar functions + /// Specialized [Signature] for ArrayElement and similar functions. pub fn array_and_index(volatility: Volatility) -> Self { Signature { type_signature: TypeSignature::ArraySignature( @@ -939,17 +936,10 @@ impl Signature { volatility, } } - /// Specialized Signature for ArrayEmpty and similar functions + + /// Specialized [Signature] for ArrayEmpty and similar functions. pub fn array(volatility: Volatility) -> Self { - Signature { - type_signature: TypeSignature::ArraySignature( - ArrayFunctionSignature::Array { - arguments: vec![ArrayFunctionArgument::Array], - array_coercion: Some(ListCoercion::FixedSizedListToList), - }, - ), - volatility, - } + Signature::arrays(1, volatility) } } diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 64da3d74119f..485ce0302fa6 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -366,69 +366,63 @@ fn get_valid_types( return Ok(vec![vec![]]); } - let mut fixed_size = None; let mut large_list = false; + let mut fixed_size = array_coercion != Some(&ListCoercion::FixedSizedListToList); + let mut list_sizes = Vec::with_capacity(arguments.len()); let mut element_types = Vec::with_capacity(arguments.len()); for (argument, current_type) in arguments.iter().zip(current_types.iter()) { match argument { + ArrayFunctionArgument::Index | ArrayFunctionArgument::String => (), + ArrayFunctionArgument::Element => { + element_types.push(current_type.clone()) + } ArrayFunctionArgument::Array => match current_type { - DataType::FixedSizeList(field, size) => { - match array_coercion { - Some(ListCoercion::FixedSizedListToList) => (), - None if fixed_size.is_none() => fixed_size = Some(*size), - None if fixed_size == Some(*size) => (), - None => fixed_size = None, - } - - element_types.push(field.data_type().clone()) - } + DataType::Null => element_types.push(DataType::Null), DataType::List(field) => { - fixed_size = None; - element_types.push(field.data_type().clone()) + element_types.push(field.data_type().clone()); + fixed_size = false; } DataType::LargeList(field) => { - fixed_size = None; + element_types.push(field.data_type().clone()); large_list = true; - element_types.push(field.data_type().clone()) + fixed_size = false; } - DataType::Null => { - fixed_size = None; - element_types.push(DataType::Null) + DataType::FixedSizeList(field, size) => { + element_types.push(field.data_type().clone()); + list_sizes.push(*size) } - arg_type => { - return plan_err!( + arg_type => plan_err!( "{function_name} does not support an argument of type {arg_type}" - ) - } + )?, }, - ArrayFunctionArgument::Element => { - element_types.push(current_type.clone()) - } - ArrayFunctionArgument::Index | ArrayFunctionArgument::String => (), } } let Some(element_type) = type_union_resolution(&element_types) else { - return plan_err!( - "Failed to unify argument types of {function_name}: {current_types:?}." - ); + return Ok(vec![vec![]]); }; - let array_type = if large_list { - DataType::new_large_list(element_type.clone(), true) - } else if let Some(size) = fixed_size { - DataType::new_fixed_size_list(element_type.clone(), size, true) - } else { - DataType::new_list(element_type.clone(), true) - }; + if !fixed_size { + list_sizes.clear() + } + let mut list_sizes = list_sizes.into_iter(); let valid_types = arguments.iter().zip(current_types.iter()).map( |(argument_type, current_type)| match argument_type { - ArrayFunctionArgument::Array if current_type.is_null() => DataType::Null, - ArrayFunctionArgument::Array => array_type.clone(), - ArrayFunctionArgument::Element => element_type.clone(), ArrayFunctionArgument::Index => DataType::Int64, ArrayFunctionArgument::String => DataType::Utf8, + ArrayFunctionArgument::Element => element_type.clone(), + ArrayFunctionArgument::Array => { + if current_type.is_null() { + DataType::Null + } else if large_list { + DataType::new_large_list(element_type.clone(), true) + } else if let Some(size) = list_sizes.next() { + DataType::new_fixed_size_list(element_type.clone(), size, true) + } else { + DataType::new_list(element_type.clone(), true) + } + } }, ); @@ -1169,4 +1163,154 @@ mod tests { Some(type_into.clone()) ); } + + #[test] + fn test_get_valid_types_array_and_array() -> Result<()> { + let function = "array_and_array"; + let signature = Signature::arrays(2, Volatility::Immutable); + + let data_types = vec![ + DataType::new_list(DataType::Int32, true), + DataType::new_large_list(DataType::Float64, true), + ]; + assert_eq!( + get_valid_types(function, &signature.type_signature, &data_types)?, + vec![vec![ + DataType::new_large_list(DataType::Float64, true), + DataType::new_large_list(DataType::Float64, true), + ]] + ); + + let data_types = vec![ + DataType::new_fixed_size_list(DataType::Int64, 3, true), + DataType::new_fixed_size_list(DataType::Int32, 5, true), + ]; + assert_eq!( + get_valid_types(function, &signature.type_signature, &data_types)?, + vec![vec![ + DataType::new_list(DataType::Int64, true), + DataType::new_list(DataType::Int64, true), + ]] + ); + + let data_types = vec![ + DataType::new_fixed_size_list(DataType::Null, 3, true), + DataType::new_large_list(DataType::Utf8, true), + ]; + assert_eq!( + get_valid_types(function, &signature.type_signature, &data_types)?, + vec![vec![ + DataType::new_large_list(DataType::Utf8, true), + DataType::new_large_list(DataType::Utf8, true), + ]] + ); + + Ok(()) + } + + #[test] + fn test_get_valid_types_array_and_element() -> Result<()> { + let function = "array_and_element"; + let signature = Signature::array_and_element(Volatility::Immutable); + + let data_types = + vec![DataType::new_list(DataType::Int32, true), DataType::Float64]; + assert_eq!( + get_valid_types(function, &signature.type_signature, &data_types)?, + vec![vec![ + DataType::new_list(DataType::Float64, true), + DataType::Float64, + ]] + ); + + let data_types = vec![ + DataType::new_large_list(DataType::Int32, true), + DataType::Null, + ]; + assert_eq!( + get_valid_types(function, &signature.type_signature, &data_types)?, + vec![vec![ + DataType::new_large_list(DataType::Int32, true), + DataType::Int32, + ]] + ); + + let data_types = vec![ + DataType::new_fixed_size_list(DataType::Null, 3, true), + DataType::Utf8, + ]; + assert_eq!( + get_valid_types(function, &signature.type_signature, &data_types)?, + vec![vec![ + DataType::new_list(DataType::Utf8, true), + DataType::Utf8, + ]] + ); + + Ok(()) + } + + #[test] + fn test_get_valid_types_element_and_array() -> Result<()> { + let function = "element_and_array"; + let signature = Signature::element_and_array(Volatility::Immutable); + + let data_types = vec![ + DataType::new_large_list(DataType::Null, false), + DataType::new_list(DataType::new_list(DataType::Int64, true), true), + ]; + assert_eq!( + get_valid_types(function, &signature.type_signature, &data_types)?, + vec![vec![ + DataType::new_large_list(DataType::Int64, true), + DataType::new_list(DataType::new_large_list(DataType::Int64, true), true), + ]] + ); + + Ok(()) + } + + #[test] + fn test_get_valid_types_fixed_size_arrays() -> Result<()> { + let function = "fixed_size_arrays"; + let signature = TypeSignature::ArraySignature(ArrayFunctionSignature::Array { + arguments: vec![ArrayFunctionArgument::Array; 2], + array_coercion: None, + }); + + let data_types = vec![ + DataType::new_fixed_size_list(DataType::Int64, 3, true), + DataType::new_fixed_size_list(DataType::Int32, 5, true), + ]; + assert_eq!( + get_valid_types(function, &signature, &data_types)?, + vec![vec![ + DataType::new_fixed_size_list(DataType::Int64, 3, true), + DataType::new_fixed_size_list(DataType::Int64, 5, true), + ]] + ); + + let data_types = vec![ + DataType::new_fixed_size_list(DataType::Int64, 3, true), + DataType::new_list(DataType::Int32, true), + ]; + assert_eq!( + get_valid_types(function, &signature, &data_types)?, + vec![vec![ + DataType::new_list(DataType::Int64, true), + DataType::new_list(DataType::Int64, true), + ]] + ); + + let data_types = vec![ + DataType::new_fixed_size_list(DataType::Utf8, 3, true), + DataType::new_list(DataType::new_list(DataType::Int32, true), true), + ]; + assert_eq!( + get_valid_types(function, &signature, &data_types)?, + vec![vec![]] + ); + + Ok(()) + } } diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 9a5428633f8d..fe3ca9eab3f8 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -106,7 +106,7 @@ impl Default for ArrayUnion { impl ArrayUnion { pub fn new() -> Self { Self { - signature: Signature::array_and_array(Volatility::Immutable), + signature: Signature::arrays(2, Volatility::Immutable), aliases: vec![String::from("list_union")], } } @@ -187,7 +187,7 @@ pub(super) struct ArrayIntersect { impl ArrayIntersect { pub fn new() -> Self { Self { - signature: Signature::array_and_array(Volatility::Immutable), + signature: Signature::arrays(2, Volatility::Immutable), aliases: vec![String::from("list_intersect")], } } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index c4f86f9e0129..7d393a9d47f0 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -4429,7 +4429,7 @@ select array_union([[null]], []); ---- [[]] -query error DataFusion error: Error during planning: Failed to unify argument types of array_union: +query error DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to 'array_union' function: select array_union(arrow_cast([[null]], 'LargeList(List(Int64))'), arrow_cast([], 'LargeList(Int64)')); # array_union scalar function #8 From 1f518c66d9e28068586cf0f496211329eed0b967 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Sun, 16 Mar 2025 08:50:55 +0100 Subject: [PATCH 4/7] Optimize array_ndims --- datafusion/functions-nested/src/dimension.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/datafusion/functions-nested/src/dimension.rs b/datafusion/functions-nested/src/dimension.rs index 96da50620a75..835323b46cf6 100644 --- a/datafusion/functions-nested/src/dimension.rs +++ b/datafusion/functions-nested/src/dimension.rs @@ -19,7 +19,6 @@ use arrow::array::{ Array, ArrayRef, GenericListArray, ListArray, OffsetSizeTrait, UInt64Array, - UInt64Builder, }; use arrow::datatypes::{ DataType, @@ -220,17 +219,10 @@ pub fn array_ndims_inner(args: &[ArrayRef]) -> Result { fn general_list_ndims( array: &GenericListArray, ) -> Result { - let mut builder = UInt64Builder::with_capacity(array.len()); let ndims = list_ndims(array.data_type()); - for arr in array.iter() { - if arr.is_some() { - builder.append_value(ndims) - } else { - builder.append_null() - } - } - - Ok(Arc::new(builder.finish())) + let data = vec![ndims; array.len()]; + let result = UInt64Array::new(data.into(), array.nulls().cloned()); + Ok(Arc::new(result)) } match array.data_type() { From 86d716744f95768778b232ecaaabf2d915b1d149 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Sun, 16 Mar 2025 08:58:25 +0100 Subject: [PATCH 5/7] Add a test for result type of Concatenating Mixed types --- datafusion/sqllogictest/test_files/array.slt | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 7d393a9d47f0..1ad727db6bb5 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -3101,13 +3101,12 @@ select array_concat( [1, 2, 3] # Concatenating Mixed types -query ? -select array_concat( - [arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], - [arrow_cast('3', 'Utf8View')] -); +query ?T +select + array_concat([arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], [arrow_cast('3', 'Utf8View')]), + arrow_typeof(array_concat([arrow_cast('1', 'Utf8'), arrow_cast('2', 'Utf8')], [arrow_cast('3', 'Utf8View')])); ---- -[1, 2, 3] +[1, 2, 3] List(Field { name: "item", data_type: Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) # array_concat error query error DataFusion error: Error during planning: Execution error: Function 'array_concat' user-defined coercion failed with "Error during planning: array_concat does not support an argument of type Int64" From d3a881881e5f5f45025648acc050ab66c6c0bd73 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Sun, 16 Mar 2025 09:08:56 +0100 Subject: [PATCH 6/7] Fix array_element of empty array --- datafusion/functions-nested/src/extract.rs | 4 ++++ datafusion/sqllogictest/test_files/array.slt | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 6078cd36dae6..b463ecd0bac7 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -224,6 +224,10 @@ where i64: TryInto, { let values = array.values(); + if values.data_type().is_null() { + return Ok(Arc::new(NullArray::new(array.len()))); + } + let original_data = values.to_data(); let capacity = Capacities::Array(original_data.len()); diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 1ad727db6bb5..7acd169f97ea 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -1435,6 +1435,12 @@ NULL 23 NULL 43 5 NULL +# array_element of empty array +query T +select coalesce(array_element([], 1), array_element(NULL, 1), 'ok'); +---- +ok + ## array_max # array_max scalar function #1 (with positive index) From 3e8f005270d59896a28b331411b458a38bd776ac Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Sun, 16 Mar 2025 10:17:53 +0100 Subject: [PATCH 7/7] Handle more FixedSizeLists --- datafusion/expr-common/src/signature.rs | 10 +++- .../expr/src/type_coercion/functions.rs | 23 ++++---- .../functions-nested/src/cardinality.rs | 2 +- datafusion/functions-nested/src/concat.rs | 15 +---- datafusion/functions-nested/src/dimension.rs | 47 +++++++--------- datafusion/functions-nested/src/distance.rs | 8 +-- datafusion/functions-nested/src/empty.rs | 36 ++++++------ datafusion/functions-nested/src/extract.rs | 7 +-- datafusion/functions-nested/src/max.rs | 56 ++++++++++--------- datafusion/functions-nested/src/set_ops.rs | 52 ++++++++--------- datafusion/functions-nested/src/sort.rs | 5 +- datafusion/functions-nested/src/utils.rs | 23 +++++--- datafusion/sqllogictest/test_files/array.slt | 2 +- 13 files changed, 138 insertions(+), 148 deletions(-) diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index f8b2ba1d05ab..c0961d42ae03 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -885,12 +885,16 @@ impl Signature { } /// Specialized [Signature] for functions that take a fixed number of arrays. - pub fn arrays(n: usize, volatility: Volatility) -> Self { + pub fn arrays( + n: usize, + coercion: Option, + volatility: Volatility, + ) -> Self { Signature { type_signature: TypeSignature::ArraySignature( ArrayFunctionSignature::Array { arguments: vec![ArrayFunctionArgument::Array; n], - array_coercion: Some(ListCoercion::FixedSizedListToList), + array_coercion: coercion, }, ), volatility, @@ -939,7 +943,7 @@ impl Signature { /// Specialized [Signature] for ArrayEmpty and similar functions. pub fn array(volatility: Volatility) -> Self { - Signature::arrays(1, volatility) + Signature::arrays(1, Some(ListCoercion::FixedSizedListToList), volatility) } } diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 485ce0302fa6..57be9e3debe6 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -391,9 +391,9 @@ fn get_valid_types( element_types.push(field.data_type().clone()); list_sizes.push(*size) } - arg_type => plan_err!( - "{function_name} does not support an argument of type {arg_type}" - )?, + arg_type => { + plan_err!("{function_name} does not support type {arg_type}")? + } }, } } @@ -1167,7 +1167,11 @@ mod tests { #[test] fn test_get_valid_types_array_and_array() -> Result<()> { let function = "array_and_array"; - let signature = Signature::arrays(2, Volatility::Immutable); + let signature = Signature::arrays( + 2, + Some(ListCoercion::FixedSizedListToList), + Volatility::Immutable, + ); let data_types = vec![ DataType::new_list(DataType::Int32, true), @@ -1273,17 +1277,14 @@ mod tests { #[test] fn test_get_valid_types_fixed_size_arrays() -> Result<()> { let function = "fixed_size_arrays"; - let signature = TypeSignature::ArraySignature(ArrayFunctionSignature::Array { - arguments: vec![ArrayFunctionArgument::Array; 2], - array_coercion: None, - }); + let signature = Signature::arrays(2, None, Volatility::Immutable); let data_types = vec![ DataType::new_fixed_size_list(DataType::Int64, 3, true), DataType::new_fixed_size_list(DataType::Int32, 5, true), ]; assert_eq!( - get_valid_types(function, &signature, &data_types)?, + get_valid_types(function, &signature.type_signature, &data_types)?, vec![vec![ DataType::new_fixed_size_list(DataType::Int64, 3, true), DataType::new_fixed_size_list(DataType::Int64, 5, true), @@ -1295,7 +1296,7 @@ mod tests { DataType::new_list(DataType::Int32, true), ]; assert_eq!( - get_valid_types(function, &signature, &data_types)?, + get_valid_types(function, &signature.type_signature, &data_types)?, vec![vec![ DataType::new_list(DataType::Int64, true), DataType::new_list(DataType::Int64, true), @@ -1307,7 +1308,7 @@ mod tests { DataType::new_list(DataType::new_list(DataType::Int32, true), true), ]; assert_eq!( - get_valid_types(function, &signature, &data_types)?, + get_valid_types(function, &signature.type_signature, &data_types)?, vec![vec![]] ); diff --git a/datafusion/functions-nested/src/cardinality.rs b/datafusion/functions-nested/src/cardinality.rs index f76361ec8c1b..98bda81ef25f 100644 --- a/datafusion/functions-nested/src/cardinality.rs +++ b/datafusion/functions-nested/src/cardinality.rs @@ -141,7 +141,7 @@ pub fn cardinality_inner(args: &[ArrayRef]) -> Result { generic_map_cardinality(map_array) } arg_type => { - exec_err!("cardinality does not support an argument of type '{arg_type}'") + exec_err!("cardinality does not support type {arg_type}") } } } diff --git a/datafusion/functions-nested/src/concat.rs b/datafusion/functions-nested/src/concat.rs index d98ed7c0d01f..dd8784d36c48 100644 --- a/datafusion/functions-nested/src/concat.rs +++ b/datafusion/functions-nested/src/concat.rs @@ -296,10 +296,7 @@ impl ScalarUDFImpl for ArrayConcat { DataType::Null | DataType::List(_) | DataType::FixedSizeList(..) => (), DataType::LargeList(_) => large_list = true, arg_type => { - return plan_err!( - "{} does not support an argument of type {arg_type}", - self.name() - ) + return plan_err!("{} does not support type {arg_type}", self.name()) } } @@ -446,28 +443,22 @@ fn concat_internal(args: &[ArrayRef]) -> Result { /// Array_append SQL function pub(crate) fn array_append_inner(args: &[ArrayRef]) -> Result { let [array, values] = take_function_args("array_append", args)?; - match array.data_type() { DataType::Null => make_array_inner(&[Arc::clone(values)]), DataType::List(_) => general_append_and_prepend::(args, true), DataType::LargeList(_) => general_append_and_prepend::(args, true), - arg_type => { - exec_err!("array_append does not support an argument of type {arg_type}") - } + arg_type => exec_err!("array_append does not support type {arg_type}"), } } /// Array_prepend SQL function pub(crate) fn array_prepend_inner(args: &[ArrayRef]) -> Result { let [values, array] = take_function_args("array_prepend", args)?; - match array.data_type() { DataType::Null => make_array_inner(&[Arc::clone(values)]), DataType::List(_) => general_append_and_prepend::(args, false), DataType::LargeList(_) => general_append_and_prepend::(args, false), - arg_type => { - exec_err!("array_prepend does not support an argument of type {arg_type}") - } + arg_type => exec_err!("array_prepend does not support type {arg_type}"), } } diff --git a/datafusion/functions-nested/src/dimension.rs b/datafusion/functions-nested/src/dimension.rs index 835323b46cf6..d1e6b1be4cfa 100644 --- a/datafusion/functions-nested/src/dimension.rs +++ b/datafusion/functions-nested/src/dimension.rs @@ -17,17 +17,17 @@ //! [`ScalarUDFImpl`] definitions for array_dims and array_ndims functions. -use arrow::array::{ - Array, ArrayRef, GenericListArray, ListArray, OffsetSizeTrait, UInt64Array, -}; +use arrow::array::{Array, ArrayRef, ListArray, UInt64Array}; use arrow::datatypes::{ DataType, - DataType::{LargeList, List, Null, UInt64}, + DataType::{FixedSizeList, LargeList, List, Null, UInt64}, UInt64Type, }; use std::any::Any; -use datafusion_common::cast::{as_large_list_array, as_list_array}; +use datafusion_common::cast::{ + as_fixed_size_list_array, as_large_list_array, as_list_array, +}; use datafusion_common::{exec_err, utils::take_function_args, Result}; use crate::utils::{compute_array_dims, make_scalar_function}; @@ -36,6 +36,7 @@ use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; use datafusion_macros::user_doc; +use itertools::Itertools; use std::sync::Arc; make_udf_expr_and_func!( @@ -78,7 +79,7 @@ impl Default for ArrayDims { impl ArrayDims { pub fn new() -> Self { Self { - signature: Signature::array(Volatility::Immutable), + signature: Signature::arrays(1, None, Volatility::Immutable), aliases: vec!["list_dims".to_string()], } } @@ -150,7 +151,7 @@ pub(super) struct ArrayNdims { impl ArrayNdims { pub fn new() -> Self { Self { - signature: Signature::array(Volatility::Immutable), + signature: Signature::arrays(1, None, Volatility::Immutable), aliases: vec![String::from("list_ndims")], } } @@ -191,20 +192,21 @@ impl ScalarUDFImpl for ArrayNdims { /// Array_dims SQL function pub fn array_dims_inner(args: &[ArrayRef]) -> Result { let [array] = take_function_args("array_dims", args)?; - - let data = match array.data_type() { + let data: Vec<_> = match array.data_type() { List(_) => as_list_array(&array)? .iter() .map(compute_array_dims) - .collect::>>()?, + .try_collect()?, LargeList(_) => as_large_list_array(&array)? .iter() .map(compute_array_dims) - .collect::>>()?, + .try_collect()?, + FixedSizeList(..) => as_fixed_size_list_array(&array)? + .iter() + .map(compute_array_dims) + .try_collect()?, arg_type => { - return exec_err!( - "array_dims does not support an argument of type {arg_type}" - ); + return exec_err!("array_dims does not support type {arg_type}"); } }; @@ -216,9 +218,7 @@ pub fn array_dims_inner(args: &[ArrayRef]) -> Result { pub fn array_ndims_inner(args: &[ArrayRef]) -> Result { let [array] = take_function_args("array_ndims", args)?; - fn general_list_ndims( - array: &GenericListArray, - ) -> Result { + fn general_list_ndims(array: &ArrayRef) -> Result { let ndims = list_ndims(array.data_type()); let data = vec![ndims; array.len()]; let result = UInt64Array::new(data.into(), array.nulls().cloned()); @@ -227,16 +227,7 @@ pub fn array_ndims_inner(args: &[ArrayRef]) -> Result { match array.data_type() { Null => Ok(Arc::new(UInt64Array::new_null(array.len()))), - List(_) => { - let array = as_list_array(array)?; - general_list_ndims::(array) - } - LargeList(_) => { - let array = as_large_list_array(array)?; - general_list_ndims::(array) - } - arg_type => { - exec_err!("array_ndims does not support an argument of type type {arg_type}") - } + List(_) | LargeList(_) | FixedSizeList(..) => general_list_ndims(array), + arg_type => exec_err!("array_ndims does not support type {arg_type}"), } } diff --git a/datafusion/functions-nested/src/distance.rs b/datafusion/functions-nested/src/distance.rs index 0190f2b886d3..3392e194b176 100644 --- a/datafusion/functions-nested/src/distance.rs +++ b/datafusion/functions-nested/src/distance.rs @@ -120,10 +120,7 @@ impl ScalarUDFImpl for ArrayDistance { coercion, )) } else { - plan_err!( - "{} does not support an argument of type {arg_type}", - self.name() - ) + plan_err!("{} does not support type {arg_type}", self.name()) } }); @@ -148,12 +145,11 @@ impl ScalarUDFImpl for ArrayDistance { pub fn array_distance_inner(args: &[ArrayRef]) -> Result { let [array1, array2] = take_function_args("array_distance", args)?; - match (array1.data_type(), array2.data_type()) { (List(_), List(_)) => general_array_distance::(args), (LargeList(_), LargeList(_)) => general_array_distance::(args), (arg_type1, arg_type2) => { - exec_err!("array_distance does not support arguments of type {arg_type1} and {arg_type2:?}") + exec_err!("array_distance does not support types {arg_type1} and {arg_type2}") } } } diff --git a/datafusion/functions-nested/src/empty.rs b/datafusion/functions-nested/src/empty.rs index dcefd583e937..67c795886bde 100644 --- a/datafusion/functions-nested/src/empty.rs +++ b/datafusion/functions-nested/src/empty.rs @@ -18,13 +18,14 @@ //! [`ScalarUDFImpl`] definitions for array_empty function. use crate::utils::make_scalar_function; -use arrow::array::{ArrayRef, BooleanArray, OffsetSizeTrait}; +use arrow::array::{Array, ArrayRef, BooleanArray, OffsetSizeTrait}; +use arrow::buffer::BooleanBuffer; use arrow::datatypes::{ DataType, DataType::{Boolean, FixedSizeList, LargeList, List}, }; use datafusion_common::cast::as_generic_list_array; -use datafusion_common::{exec_err, plan_err, utils::take_function_args, Result}; +use datafusion_common::{exec_err, utils::take_function_args, Result}; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; @@ -71,7 +72,7 @@ impl Default for ArrayEmpty { impl ArrayEmpty { pub fn new() -> Self { Self { - signature: Signature::array(Volatility::Immutable), + signature: Signature::arrays(1, None, Volatility::Immutable), aliases: vec!["array_empty".to_string(), "list_empty".to_string()], } } @@ -89,13 +90,8 @@ impl ScalarUDFImpl for ArrayEmpty { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(match arg_types[0] { - List(_) | LargeList(_) | FixedSizeList(_, _) => Boolean, - _ => { - return plan_err!("The array_empty function can only accept List/LargeList/FixedSizeList."); - } - }) + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(Boolean) } fn invoke_with_args( @@ -117,21 +113,25 @@ impl ScalarUDFImpl for ArrayEmpty { /// Array_empty SQL function pub fn array_empty_inner(args: &[ArrayRef]) -> Result { let [array] = take_function_args("array_empty", args)?; - - let array_type = array.data_type(); - match array_type { + match array.data_type() { List(_) => general_array_empty::(array), LargeList(_) => general_array_empty::(array), - _ => exec_err!("array_empty does not support type '{array_type:?}'."), + FixedSizeList(_, size) => { + let values = if *size == 0 { + BooleanBuffer::new_set(array.len()) + } else { + BooleanBuffer::new_unset(array.len()) + }; + Ok(Arc::new(BooleanArray::new(values, array.nulls().cloned()))) + } + arg_type => exec_err!("array_empty does not support type {arg_type}"), } } fn general_array_empty(array: &ArrayRef) -> Result { - let array = as_generic_list_array::(array)?; - - let builder = array + let result = as_generic_list_array::(array)? .iter() .map(|arr| arr.map(|arr| arr.is_empty())) .collect::(); - Ok(Arc::new(builder)) + Ok(Arc::new(result)) } diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index b463ecd0bac7..bb84801f9862 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -165,10 +165,7 @@ impl ScalarUDFImpl for ArrayElement { match &arg_types[0] { Null => Ok(Null), List(field) | LargeList(field) => Ok(field.data_type().clone()), - arg_type => plan_err!( - "{} does not support an argument of type {arg_type}", - self.name() - ), + arg_type => plan_err!("{} does not support type {arg_type}", self.name()), } } @@ -211,7 +208,7 @@ fn array_element_inner(args: &[ArrayRef]) -> Result { general_array_element::(array, indexes) } arg_type => { - exec_err!("array_element does not support an argument of type {arg_type}") + exec_err!("array_element does not support type {arg_type}") } } } diff --git a/datafusion/functions-nested/src/max.rs b/datafusion/functions-nested/src/max.rs index 32957edc62b5..b667a7b42650 100644 --- a/datafusion/functions-nested/src/max.rs +++ b/datafusion/functions-nested/src/max.rs @@ -17,12 +17,13 @@ //! [`ScalarUDFImpl`] definitions for array_max function. use crate::utils::make_scalar_function; -use arrow::array::ArrayRef; +use arrow::array::{ArrayRef, GenericListArray, OffsetSizeTrait}; use arrow::datatypes::DataType; -use arrow::datatypes::DataType::List; -use datafusion_common::cast::as_list_array; +use arrow::datatypes::DataType::{LargeList, List}; +use datafusion_common::cast::{as_large_list_array, as_list_array}; use datafusion_common::utils::take_function_args; -use datafusion_common::{exec_err, ScalarValue}; +use datafusion_common::Result; +use datafusion_common::{exec_err, plan_err, ScalarValue}; use datafusion_doc::Documentation; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, @@ -91,17 +92,15 @@ impl ScalarUDFImpl for ArrayMax { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { - match &arg_types[0] { - List(field) => Ok(field.data_type().clone()), - _ => exec_err!("Not reachable, data_type should be List"), + fn return_type(&self, arg_types: &[DataType]) -> Result { + let [array] = take_function_args(self.name(), arg_types)?; + match array { + List(field) | LargeList(field) => Ok(field.data_type().clone()), + arg_type => plan_err!("{} does not support type {arg_type}", self.name()), } } - fn invoke_with_args( - &self, - args: ScalarFunctionArgs, - ) -> datafusion_common::Result { + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { make_scalar_function(array_max_inner)(&args.args) } @@ -121,18 +120,25 @@ impl ScalarUDFImpl for ArrayMax { /// /// For example: /// > array_max(\[1, 3, 2]) -> 3 -pub fn array_max_inner(args: &[ArrayRef]) -> datafusion_common::Result { - let [arg1] = take_function_args("array_max", args)?; - - match arg1.data_type() { - List(_) => { - let input_list_array = as_list_array(&arg1)?; - let result_vec = input_list_array - .iter() - .flat_map(|arr| min_max::max_batch(&arr.unwrap())) - .collect_vec(); - ScalarValue::iter_to_array(result_vec) - } - _ => exec_err!("array_max does not support type: {:?}", arg1.data_type()), +pub fn array_max_inner(args: &[ArrayRef]) -> Result { + let [array] = take_function_args("array_max", args)?; + match array.data_type() { + List(_) => general_array_max(as_list_array(array)?), + LargeList(_) => general_array_max(as_large_list_array(array)?), + arg_type => exec_err!("array_max does not support type: {arg_type}"), } } + +fn general_array_max( + array: &GenericListArray, +) -> Result { + let null_value = ScalarValue::try_from(array.value_type())?; + let result_vec: Vec = array + .iter() + .map(|arr| { + arr.as_ref() + .map_or_else(|| Ok(null_value.clone()), min_max::max_batch) + }) + .try_collect()?; + ScalarValue::iter_to_array(result_vec) +} diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index fe3ca9eab3f8..4f9457aa59c6 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -24,11 +24,14 @@ use arrow::array::{ }; use arrow::buffer::OffsetBuffer; use arrow::compute; -use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null}; +use arrow::datatypes::DataType::{LargeList, List, Null}; use arrow::datatypes::{DataType, Field, FieldRef}; use arrow::row::{RowConverter, SortField}; use datafusion_common::cast::{as_large_list_array, as_list_array}; -use datafusion_common::{exec_err, internal_err, utils::take_function_args, Result}; +use datafusion_common::utils::ListCoercion; +use datafusion_common::{ + exec_err, internal_err, plan_err, utils::take_function_args, Result, +}; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; @@ -106,7 +109,11 @@ impl Default for ArrayUnion { impl ArrayUnion { pub fn new() -> Self { Self { - signature: Signature::arrays(2, Volatility::Immutable), + signature: Signature::arrays( + 2, + Some(ListCoercion::FixedSizedListToList), + Volatility::Immutable, + ), aliases: vec![String::from("list_union")], } } @@ -187,7 +194,11 @@ pub(super) struct ArrayIntersect { impl ArrayIntersect { pub fn new() -> Self { Self { - signature: Signature::arrays(2, Volatility::Immutable), + signature: Signature::arrays( + 2, + Some(ListCoercion::FixedSizedListToList), + Volatility::Immutable, + ), aliases: vec![String::from("list_intersect")], } } @@ -279,16 +290,11 @@ impl ScalarUDFImpl for ArrayDistinct { fn return_type(&self, arg_types: &[DataType]) -> Result { match &arg_types[0] { - List(field) | FixedSizeList(field, _) => Ok(List(Arc::new( - Field::new_list_field(field.data_type().clone(), true), - ))), - LargeList(field) => Ok(LargeList(Arc::new(Field::new_list_field( - field.data_type().clone(), - true, - )))), - _ => exec_err!( - "Not reachable, data_type should be List, LargeList or FixedSizeList" - ), + List(field) => Ok(DataType::new_list(field.data_type().clone(), true)), + LargeList(field) => { + Ok(DataType::new_large_list(field.data_type().clone(), true)) + } + arg_type => plan_err!("{} does not support type {arg_type}", self.name()), } } @@ -311,24 +317,18 @@ impl ScalarUDFImpl for ArrayDistinct { /// array_distinct SQL function /// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4] fn array_distinct_inner(args: &[ArrayRef]) -> Result { - let [input_array] = take_function_args("array_distinct", args)?; - - // handle null - if input_array.data_type() == &Null { - return Ok(Arc::clone(input_array)); - } - - // handle for list & largelist - match input_array.data_type() { + let [array] = take_function_args("array_distinct", args)?; + match array.data_type() { + Null => Ok(Arc::clone(array)), List(field) => { - let array = as_list_array(&input_array)?; + let array = as_list_array(&array)?; general_array_distinct(array, field) } LargeList(field) => { - let array = as_large_list_array(&input_array)?; + let array = as_large_list_array(&array)?; general_array_distinct(array, field) } - array_type => exec_err!("array_distinct does not support type '{array_type:?}'"), + arg_type => exec_err!("array_distinct does not support type {arg_type}"), } } diff --git a/datafusion/functions-nested/src/sort.rs b/datafusion/functions-nested/src/sort.rs index f9bfe71554ae..7b2f41c0541c 100644 --- a/datafusion/functions-nested/src/sort.rs +++ b/datafusion/functions-nested/src/sort.rs @@ -138,10 +138,7 @@ impl ScalarUDFImpl for ArraySort { Ok(DataType::new_list(field.data_type().clone(), true)) } arg_type => { - plan_err!( - "{} does not support an argument of type {arg_type}", - self.name() - ) + plan_err!("{} does not support type {arg_type}", self.name()) } } } diff --git a/datafusion/functions-nested/src/utils.rs b/datafusion/functions-nested/src/utils.rs index 74b21a3ceb47..ed08a8235874 100644 --- a/datafusion/functions-nested/src/utils.rs +++ b/datafusion/functions-nested/src/utils.rs @@ -22,17 +22,15 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field, Fields}; use arrow::array::{ - Array, ArrayRef, BooleanArray, GenericListArray, ListArray, OffsetSizeTrait, Scalar, - UInt32Array, + Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait, Scalar, UInt32Array, }; use arrow::buffer::OffsetBuffer; -use datafusion_common::cast::{as_large_list_array, as_list_array}; -use datafusion_common::{ - exec_err, internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, +use datafusion_common::cast::{ + as_fixed_size_list_array, as_large_list_array, as_list_array, }; +use datafusion_common::{exec_err, internal_err, plan_err, Result, ScalarValue}; use datafusion_expr::ColumnarValue; -use datafusion_functions::{downcast_arg, downcast_named_arg}; pub(crate) fn check_datatypes(name: &str, args: &[&ArrayRef]) -> Result<()> { let data_type = args[0].data_type(); @@ -234,8 +232,16 @@ pub(crate) fn compute_array_dims( loop { match value.data_type() { - DataType::List(..) => { - value = downcast_arg!(value, ListArray).value(0); + DataType::List(_) => { + value = as_list_array(&value)?.value(0); + res.push(Some(value.len() as u64)); + } + DataType::LargeList(_) => { + value = as_large_list_array(&value)?.value(0); + res.push(Some(value.len() as u64)); + } + DataType::FixedSizeList(..) => { + value = as_fixed_size_list_array(&value)?.value(0); res.push(Some(value.len() as u64)); } _ => return Ok(Some(res)), @@ -261,6 +267,7 @@ pub(crate) fn get_map_entry_field(data_type: &DataType) -> Result<&Fields> { #[cfg(test)] mod tests { use super::*; + use arrow::array::ListArray; use arrow::datatypes::Int64Type; use datafusion_common::utils::SingleRowListArrayBuilder; diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 7acd169f97ea..601316b18ab6 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -3115,7 +3115,7 @@ select [1, 2, 3] List(Field { name: "item", data_type: Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) # array_concat error -query error DataFusion error: Error during planning: Execution error: Function 'array_concat' user-defined coercion failed with "Error during planning: array_concat does not support an argument of type Int64" +query error DataFusion error: Error during planning: Execution error: Function 'array_concat' user-defined coercion failed with "Error during planning: array_concat does not support type Int64" select array_concat(1, 2); # array_concat scalar function #1