From 3472b513fc071de0dc3bb9bafb8f6cbd111dcca6 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 9 Dec 2023 19:36:58 +0800 Subject: [PATCH 1/7] list/non-list conversion Signed-off-by: jayzhan211 --- datafusion/common/src/scalar.rs | 149 ++++++++++++++---- datafusion/core/tests/sql/aggregates.rs | 2 +- .../src/aggregate/array_agg_distinct.rs | 27 ++-- .../src/aggregate/array_agg_ordered.rs | 4 +- .../src/aggregate/count_distinct.rs | 2 +- 5 files changed, 136 insertions(+), 48 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index d730fbf89b723..b2589f46bb8fd 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2039,7 +2039,11 @@ impl ScalarValue { } } - /// Retrieve ScalarValue for each row in `array` + /// Retrieve `ScalarValue` for each row in `array` + /// + /// Convert `ListArray` to `Vec>`, first `Vec` is for rows, second `Vec` is for elements in the list + /// + /// Return `Err` if `array` is not `ListArray` /// /// Example /// ``` @@ -2053,7 +2057,7 @@ impl ScalarValue { /// Some(vec![Some(4), Some(5)]) /// ]); /// - /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); + /// let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(&list_arr).unwrap(); /// /// let expected = vec![ /// vec![ @@ -2067,32 +2071,79 @@ impl ScalarValue { /// /// assert_eq!(scalar_vec, expected); /// ``` - pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result>> { - let mut scalars = Vec::with_capacity(array.len()); - - for index in 0..array.len() { - let scalar_values = match array.data_type() { - DataType::List(_) => { - let list_array = as_list_array(array); - match list_array.is_null(index) { - true => Vec::new(), - false => { - let nested_array = list_array.value(index); - ScalarValue::convert_array_to_scalar_vec(&nested_array)? - .into_iter() - .flatten() - .collect() - } + pub fn convert_list_array_to_scalar_vec(array: &dyn Array) -> Result>> { + + + if as_list_array(array).is_ok() { + Self::convert_list_array_to_scalar_vec_internal(array) + } else { + _internal_err!("Expected ListArray but found: {array:?}") + } + } + + fn convert_list_array_to_scalar_vec_internal( + array: &dyn Array, + ) -> Result>> { + let mut scalars_vec = Vec::with_capacity(array.len()); + + let list_arr = as_generic_list_array::(array); + + if let Ok(list_arr) = as_list_array(array) { + for index in 0..list_arr.len() { + let scalars = match list_arr.is_null(index) { + true => Vec::new(), + false => { + let nested_array = list_arr.value(index); + Self::convert_list_array_to_scalar_vec_internal(&nested_array)? + .into_iter() + .flatten() + .collect() } - } - _ => { - let scalar = ScalarValue::try_from_array(array, index)?; - vec![scalar] - } - }; - scalars.push(scalar_values); + }; + scalars_vec.push(scalars); + } + } else { + let scalars = ScalarValue::convert_non_list_array_to_scalars(array)?; + scalars_vec.push(scalars); + } + Ok(scalars_vec) + } + + /// Convert non-ListArray to `Vec` + /// + /// Return Err if `array` is ListArray + /// + /// Example + /// ``` + /// use datafusion_common::ScalarValue; + /// use arrow::array::Int32Array; + /// + /// let list_arr = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(4), Some(5)]); + /// + /// let scalar_vec = ScalarValue::convert_non_list_array_to_scalars(&list_arr).unwrap(); + /// + /// let expected = vec![ + /// ScalarValue::Int32(Some(1)), + /// ScalarValue::Int32(Some(2)), + /// ScalarValue::Int32(Some(3)), + /// ScalarValue::Int32(None), + /// ScalarValue::Int32(Some(4)), + /// ScalarValue::Int32(Some(5)), + /// ]; + /// + /// assert_eq!(scalar_vec, expected); + /// ``` + pub fn convert_non_list_array_to_scalars(array: &dyn Array) -> Result> { + if as_list_array(array).is_ok() { + _internal_err!("Expected non-ListArray but found: {array:?}") + } else { + let mut scalars = Vec::with_capacity(array.len()); + for index in 0..array.len() { + let scalar = ScalarValue::try_from_array(array, index)?; + scalars.push(scalar); + } + Ok(scalars) } - Ok(scalars) } // TODO: Support more types after other ScalarValue is wrapped with ArrayRef @@ -2143,7 +2194,7 @@ impl ScalarValue { typed_cast!(array, index, LargeStringArray, LargeUtf8)? } DataType::List(_) => { - let list_array = as_list_array(array); + let list_array = as_list_array(array)?; let nested_array = list_array.value(index); // Produces a single element `ListArray` with the value at `index`. let arr = Arc::new(array_into_list_array(nested_array)); @@ -3112,6 +3163,7 @@ impl ScalarType for TimestampNanosecondType { } #[cfg(test)] +#[cfg(feature = "parquet")] mod tests { use super::*; @@ -3129,6 +3181,44 @@ mod tests { use arrow_array::ArrowNumericType; use crate::cast::{as_string_array, as_uint32_array, as_uint64_array}; + use crate::utils::arrays_into_list_array; + + #[test] + fn convert_list_array_to_scalar_vec_nested() { + let l1 = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5)]), + ]); + + let l2 = ListArray::from_iter_primitive::(vec![Some(vec![ + Some(6), + Some(7), + Some(8), + ])]); + + let l1 = Arc::new(l1) as ArrayRef; + let l2 = Arc::new(l2) as ArrayRef; + + let l12 = arrays_into_list_array([l1, l2]).unwrap(); + let arr = Arc::new(l12) as ArrayRef; + + let actual = ScalarValue::convert_list_array_to_scalar_vec(&arr).unwrap(); + let expected = vec![ + vec![ + ScalarValue::Int32(Some(1)), + ScalarValue::Int32(Some(2)), + ScalarValue::Int32(Some(3)), + ScalarValue::Int32(Some(4)), + ScalarValue::Int32(Some(5)), + ], + vec![ + ScalarValue::Int32(Some(6)), + ScalarValue::Int32(Some(7)), + ScalarValue::Int32(Some(8)), + ], + ]; + assert_eq!(actual, expected); + } #[test] fn test_to_array_of_size_for_list() { @@ -3142,7 +3232,7 @@ mod tests { let actual_arr = sv .to_array_of_size(2) .expect("Failed to convert to array of size"); - let actual_list_arr = as_list_array(&actual_arr); + let actual_list_arr = as_list_array(&actual_arr).unwrap(); let arr = ListArray::from_iter_primitive::(vec![ Some(vec![Some(1), None, Some(2)]), @@ -3182,13 +3272,14 @@ mod tests { ]; let array = ScalarValue::new_list(scalars.as_slice(), &DataType::Utf8); + let result = as_list_array(&array).unwrap(); let expected = array_into_list_array(Arc::new(StringArray::from(vec![ "rust", "arrow", "data-fusion", ]))); - let result = as_list_array(&array); + assert_eq!(result, &expected); } diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index af6d0d5f4e245..5773792488456 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -45,7 +45,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> { let column = actual[0].column(0); assert_eq!(column.len(), 1); - let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&column)?; + let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(&column)?; let mut scalars = scalar_vec[0].clone(); // workaround lack of Ord of ScalarValue let cmp = |a: &ScalarValue, b: &ScalarValue| { diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 1efae424cc699..f905e63728a04 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -137,9 +137,17 @@ impl Accumulator for DistinctArrayAggAccumulator { assert_eq!(values.len(), 1, "batch input should only include 1 column!"); let array = &values[0]; - let scalars = ScalarValue::convert_array_to_scalar_vec(array)?; - for scalar in scalars { - self.values.extend(scalar) + match array.data_type() { + DataType::List(_) => { + let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(array)?; + for scalars in scalar_vec { + self.values.extend(scalars); + } + } + _ => { + let scalars = ScalarValue::convert_non_list_array_to_scalars(array)?; + self.values.extend(scalars); + } } Ok(()) } @@ -149,18 +157,7 @@ impl Accumulator for DistinctArrayAggAccumulator { return Ok(()); } - assert_eq!( - states.len(), - 1, - "array_agg_distinct states must contain single array" - ); - - let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&states[0])?; - for scalars in scalar_vec { - self.values.extend(scalars) - } - - Ok(()) + self.update_batch(states) } fn evaluate(&self) -> Result { diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index eb5ae8b0b0c3f..c5783f07f21d6 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -225,13 +225,13 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { partition_ordering_values.push(self.ordering_values.clone()); let array_agg_res = - ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; + ScalarValue::convert_list_array_to_scalar_vec(array_agg_values)?; for v in array_agg_res.into_iter() { partition_values.push(v); } - let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; + let orderings = ScalarValue::convert_list_array_to_scalar_vec(agg_orderings)?; for partition_ordering_rows in orderings.into_iter() { // Extract value from struct to ordering_rows for each group/partition diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index f5242d983d4cf..1f38a8875aa30 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -167,7 +167,7 @@ impl Accumulator for DistinctCountAccumulator { return Ok(()); } assert_eq!(states.len(), 1, "array_agg states must be singleton!"); - let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&states[0])?; + let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(&states[0])?; for scalars in scalar_vec.into_iter() { self.values.extend(scalars) } From d704c8321932794da4ba5c7bcb3846763a1da843 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 9 Dec 2023 20:01:56 +0800 Subject: [PATCH 2/7] fix offset trait Signed-off-by: jayzhan211 --- datafusion/common/src/scalar.rs | 54 +++++++++---------- datafusion/core/tests/sql/aggregates.rs | 8 ++- .../src/aggregate/array_agg_distinct.rs | 2 +- .../src/aggregate/array_agg_ordered.rs | 4 +- .../src/aggregate/count_distinct.rs | 2 +- 5 files changed, 37 insertions(+), 33 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index b2589f46bb8fd..4c08538843cb2 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2071,13 +2071,13 @@ impl ScalarValue { /// /// assert_eq!(scalar_vec, expected); /// ``` - pub fn convert_list_array_to_scalar_vec(array: &dyn Array) -> Result>> { - - - if as_list_array(array).is_ok() { - Self::convert_list_array_to_scalar_vec_internal(array) + pub fn convert_list_array_to_scalar_vec( + array: &dyn Array, + ) -> Result>> { + if array.as_list_opt::().is_some() { + Self::convert_list_array_to_scalar_vec_internal::(array) } else { - _internal_err!("Expected ListArray but found: {array:?}") + _internal_err!("Expected GenericListArray but found: {array:?}") } } @@ -2086,18 +2086,18 @@ impl ScalarValue { ) -> Result>> { let mut scalars_vec = Vec::with_capacity(array.len()); - let list_arr = as_generic_list_array::(array); - - if let Ok(list_arr) = as_list_array(array) { + if let Some(list_arr) = array.as_list_opt::() { for index in 0..list_arr.len() { let scalars = match list_arr.is_null(index) { true => Vec::new(), false => { let nested_array = list_arr.value(index); - Self::convert_list_array_to_scalar_vec_internal(&nested_array)? - .into_iter() - .flatten() - .collect() + Self::convert_list_array_to_scalar_vec_internal::( + &nested_array, + )? + .into_iter() + .flatten() + .collect() } }; scalars_vec.push(scalars); @@ -2106,6 +2106,7 @@ impl ScalarValue { let scalars = ScalarValue::convert_non_list_array_to_scalars(array)?; scalars_vec.push(scalars); } + Ok(scalars_vec) } @@ -2134,16 +2135,16 @@ impl ScalarValue { /// assert_eq!(scalar_vec, expected); /// ``` pub fn convert_non_list_array_to_scalars(array: &dyn Array) -> Result> { - if as_list_array(array).is_ok() { - _internal_err!("Expected non-ListArray but found: {array:?}") - } else { - let mut scalars = Vec::with_capacity(array.len()); - for index in 0..array.len() { - let scalar = ScalarValue::try_from_array(array, index)?; - scalars.push(scalar); - } - Ok(scalars) + if array.as_list_opt::().is_some() || array.as_list_opt::().is_some() { + return _internal_err!("Expected non ListArray but found: {array:?}"); } + + let mut scalars = Vec::with_capacity(array.len()); + for index in 0..array.len() { + let scalar = ScalarValue::try_from_array(array, index)?; + scalars.push(scalar); + } + Ok(scalars) } // TODO: Support more types after other ScalarValue is wrapped with ArrayRef @@ -2194,7 +2195,7 @@ impl ScalarValue { typed_cast!(array, index, LargeStringArray, LargeUtf8)? } DataType::List(_) => { - let list_array = as_list_array(array)?; + let list_array = as_list_array(array); let nested_array = list_array.value(index); // Produces a single element `ListArray` with the value at `index`. let arr = Arc::new(array_into_list_array(nested_array)); @@ -3163,7 +3164,6 @@ impl ScalarType for TimestampNanosecondType { } #[cfg(test)] -#[cfg(feature = "parquet")] mod tests { use super::*; @@ -3202,7 +3202,7 @@ mod tests { let l12 = arrays_into_list_array([l1, l2]).unwrap(); let arr = Arc::new(l12) as ArrayRef; - let actual = ScalarValue::convert_list_array_to_scalar_vec(&arr).unwrap(); + let actual = ScalarValue::convert_list_array_to_scalar_vec::(&arr).unwrap(); let expected = vec![ vec![ ScalarValue::Int32(Some(1)), @@ -3232,7 +3232,7 @@ mod tests { let actual_arr = sv .to_array_of_size(2) .expect("Failed to convert to array of size"); - let actual_list_arr = as_list_array(&actual_arr).unwrap(); + let actual_list_arr = as_list_array(&actual_arr); let arr = ListArray::from_iter_primitive::(vec![ Some(vec![Some(1), None, Some(2)]), @@ -3272,7 +3272,7 @@ mod tests { ]; let array = ScalarValue::new_list(scalars.as_slice(), &DataType::Utf8); - let result = as_list_array(&array).unwrap(); + let result = as_list_array(&array); let expected = array_into_list_array(Arc::new(StringArray::from(vec![ "rust", diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 5773792488456..68c58df41cb6c 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -45,13 +45,17 @@ async fn csv_query_array_agg_distinct() -> Result<()> { let column = actual[0].column(0); assert_eq!(column.len(), 1); - let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(&column)?; - let mut scalars = scalar_vec[0].clone(); + // 1 row + let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec::(&column)?; + // workaround lack of Ord of ScalarValue let cmp = |a: &ScalarValue, b: &ScalarValue| { a.partial_cmp(b).expect("Can compare ScalarValues") }; + + let mut scalars = scalar_vec.first().unwrap().to_owned(); scalars.sort_by(cmp); + assert_eq!( scalars, vec![ diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index f905e63728a04..f6eaee58f50e0 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -139,7 +139,7 @@ impl Accumulator for DistinctArrayAggAccumulator { let array = &values[0]; match array.data_type() { DataType::List(_) => { - let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(array)?; + let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec::(array)?; for scalars in scalar_vec { self.values.extend(scalars); } diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index c5783f07f21d6..9644bbba3b5cd 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -225,13 +225,13 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { partition_ordering_values.push(self.ordering_values.clone()); let array_agg_res = - ScalarValue::convert_list_array_to_scalar_vec(array_agg_values)?; + ScalarValue::convert_list_array_to_scalar_vec::(array_agg_values)?; for v in array_agg_res.into_iter() { partition_values.push(v); } - let orderings = ScalarValue::convert_list_array_to_scalar_vec(agg_orderings)?; + let orderings = ScalarValue::convert_list_array_to_scalar_vec::(agg_orderings)?; for partition_ordering_rows in orderings.into_iter() { // Extract value from struct to ordering_rows for each group/partition diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index 1f38a8875aa30..332e1b690b49e 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -167,7 +167,7 @@ impl Accumulator for DistinctCountAccumulator { return Ok(()); } assert_eq!(states.len(), 1, "array_agg states must be singleton!"); - let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(&states[0])?; + let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec::(&states[0])?; for scalars in scalar_vec.into_iter() { self.values.extend(scalars) } From aee4eff6e6c1d3de659ba3b049bc7156cf6d1e4c Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 9 Dec 2023 20:07:21 +0800 Subject: [PATCH 3/7] fmt Signed-off-by: jayzhan211 --- datafusion/common/src/scalar.rs | 2 +- datafusion/core/tests/sql/aggregates.rs | 2 +- datafusion/physical-expr/src/aggregate/array_agg_distinct.rs | 3 ++- datafusion/physical-expr/src/aggregate/count_distinct.rs | 3 ++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 4c08538843cb2..97c0a6f7a30b8 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -3272,7 +3272,6 @@ mod tests { ]; let array = ScalarValue::new_list(scalars.as_slice(), &DataType::Utf8); - let result = as_list_array(&array); let expected = array_into_list_array(Arc::new(StringArray::from(vec![ "rust", @@ -3280,6 +3279,7 @@ mod tests { "data-fusion", ]))); + let result = as_list_array(&array); assert_eq!(result, &expected); } diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 68c58df41cb6c..b30dffb50f2c1 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -47,13 +47,13 @@ async fn csv_query_array_agg_distinct() -> Result<()> { // 1 row let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec::(&column)?; + let mut scalars = scalar_vec.first().unwrap().to_owned(); // workaround lack of Ord of ScalarValue let cmp = |a: &ScalarValue, b: &ScalarValue| { a.partial_cmp(b).expect("Can compare ScalarValues") }; - let mut scalars = scalar_vec.first().unwrap().to_owned(); scalars.sort_by(cmp); assert_eq!( diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index f6eaee58f50e0..39f769d0d4ff0 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -139,7 +139,8 @@ impl Accumulator for DistinctArrayAggAccumulator { let array = &values[0]; match array.data_type() { DataType::List(_) => { - let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec::(array)?; + let scalar_vec = + ScalarValue::convert_list_array_to_scalar_vec::(array)?; for scalars in scalar_vec { self.values.extend(scalars); } diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index 332e1b690b49e..d2a18869ae995 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -167,7 +167,8 @@ impl Accumulator for DistinctCountAccumulator { return Ok(()); } assert_eq!(states.len(), 1, "array_agg states must be singleton!"); - let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec::(&states[0])?; + let scalar_vec = + ScalarValue::convert_list_array_to_scalar_vec::(&states[0])?; for scalars in scalar_vec.into_iter() { self.values.extend(scalars) } From b92d535266fa48bc6a5b5c3546edcd34654919ae Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 13 Dec 2023 22:24:00 +0800 Subject: [PATCH 4/7] fmt Signed-off-by: jayzhan211 --- datafusion/physical-expr/src/aggregate/array_agg_ordered.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index 9644bbba3b5cd..4956c4901da67 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -231,7 +231,8 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { partition_values.push(v); } - let orderings = ScalarValue::convert_list_array_to_scalar_vec::(agg_orderings)?; + let orderings = + ScalarValue::convert_list_array_to_scalar_vec::(agg_orderings)?; for partition_ordering_rows in orderings.into_iter() { // Extract value from struct to ordering_rows for each group/partition From 3b6b53eb52a07e2f90957a054bcd723d13fe06bd Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 13 Dec 2023 22:59:04 +0800 Subject: [PATCH 5/7] ci fix Signed-off-by: jayzhan211 --- datafusion/common/src/scalar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 97c0a6f7a30b8..f96d1af9f18e6 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2057,7 +2057,7 @@ impl ScalarValue { /// Some(vec![Some(4), Some(5)]) /// ]); /// - /// let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(&list_arr).unwrap(); + /// let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec::(&list_arr).unwrap(); /// /// let expected = vec![ /// vec![ From c69f8748b82579a4a55980ec0646b286e6c999ff Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 13 Dec 2023 23:07:20 +0800 Subject: [PATCH 6/7] add largetlist Signed-off-by: jayzhan211 --- .../physical-expr/src/aggregate/array_agg_distinct.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 39f769d0d4ff0..cbf2b713074fd 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -145,6 +145,13 @@ impl Accumulator for DistinctArrayAggAccumulator { self.values.extend(scalars); } } + DataType::LargeList(_) => { + let scalar_vec = + ScalarValue::convert_list_array_to_scalar_vec::(array)?; + for scalars in scalar_vec { + self.values.extend(scalars); + } + } _ => { let scalars = ScalarValue::convert_non_list_array_to_scalars(array)?; self.values.extend(scalars); From f7b6ac02d12c5d16728f9afc85e3c60030c39432 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sun, 17 Dec 2023 08:05:32 +0800 Subject: [PATCH 7/7] improve doc Signed-off-by: jayzhan211 --- datafusion/common/src/scalar.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index f96d1af9f18e6..f18db2f31b14c 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2041,7 +2041,13 @@ impl ScalarValue { /// Retrieve `ScalarValue` for each row in `array` /// - /// Convert `ListArray` to `Vec>`, first `Vec` is for rows, second `Vec` is for elements in the list + /// Convert `ListArray` into a 2 dimensional to `Vec>`, first `Vec` is for rows, + /// second `Vec` is for elements in the list. + /// + /// See [`Self::convert_non_list_array_to_scalars`] for converting non Lists + /// + /// This method is an optimization to unwrap nested ListArrays to nested Rust structures without + /// converting them twice /// /// Return `Err` if `array` is not `ListArray` ///