From 3006a142f347bf2fb59a385b3151f4102761f5ab Mon Sep 17 00:00:00 2001 From: Gabriel <45515538+gabotechs@users.noreply.github.com> Date: Wed, 11 Jun 2025 15:04:09 +0200 Subject: [PATCH 1/2] Fix array_agg memory over use (#16346) * Fix array_agg memory over accounting * Add comment (cherry picked from commit 8a2d61821a6a9097dfabe2758642fd75dcc73aaa) --- datafusion/common/src/scalar/mod.rs | 6 ++ .../functions-aggregate/src/array_agg.rs | 83 +++++++++++++++++-- 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 3d4aa78b6da65..c04a62d194665 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -3525,6 +3525,12 @@ impl ScalarValue { } } } + + /// Compacts ([ScalarValue::compact]) the current [ScalarValue] and returns it. + pub fn compacted(mut self) -> Self { + self.compact(); + self + } } pub fn copy_array_data(data: &ArrayData) -> ArrayData { diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 71278767a83fc..b1c332f5473a8 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -18,12 +18,14 @@ //! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`] use arrow::array::{ - new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray, + make_array, new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, + StructArray, }; use arrow::compute::{filter, SortOptions}; use arrow::datatypes::{DataType, Field, FieldRef, Fields}; use datafusion_common::cast::as_list_array; +use datafusion_common::scalar::copy_array_data; use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder}; use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{internal_err, Result}; @@ -319,7 +321,11 @@ impl Accumulator for ArrayAggAccumulator { }; if !val.is_empty() { - self.values.push(val); + // The ArrayRef might be holding a reference to its original input buffer, so + // storing it here directly copied/compacted avoids over accounting memory + // not used here. + self.values + .push(make_array(copy_array_data(&val.to_data()))); } Ok(()) @@ -429,7 +435,8 @@ impl Accumulator for DistinctArrayAggAccumulator { if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) { for i in 0..val.len() { if nulls.is_none_or(|nulls| nulls.is_valid(i)) { - self.values.insert(ScalarValue::try_from_array(val, i)?); + self.values + .insert(ScalarValue::try_from_array(val, i)?.compacted()); } } } @@ -558,8 +565,14 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) { for i in 0..val.len() { if nulls.is_none_or(|nulls| nulls.is_valid(i)) { - self.values.push(ScalarValue::try_from_array(val, i)?); - self.ordering_values.push(get_row_at_idx(ord, i)?) + self.values + .push(ScalarValue::try_from_array(val, i)?.compacted()); + self.ordering_values.push( + get_row_at_idx(ord, i)? + .into_iter() + .map(|v| v.compacted()) + .collect(), + ) } } } @@ -722,6 +735,7 @@ impl OrderSensitiveArrayAggAccumulator { #[cfg(test)] mod tests { use super::*; + use arrow::array::{ListBuilder, StringBuilder}; use arrow::datatypes::{FieldRef, Schema}; use datafusion_common::cast::as_generic_string_array; use datafusion_common::internal_err; @@ -988,6 +1002,56 @@ mod tests { Ok(()) } + #[test] + fn does_not_over_account_memory() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + // without compaction, the size is 2652. + assert_eq!(acc1.size(), 732); + + Ok(()) + } + #[test] + fn does_not_over_account_memory_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[string_list_data([ + vec!["a", "b", "c"], + vec!["d", "e", "f"], + ])])?; + acc2.update_batch(&[string_list_data([vec!["e", "f", "g"]])])?; + acc1 = merge(acc1, acc2)?; + + // without compaction, the size is 16660 + assert_eq!(acc1.size(), 1660); + + Ok(()) + } + + #[test] + fn does_not_over_account_memory_ordered() -> Result<()> { + let mut acc = ArrayAggAccumulatorBuilder::string() + .order_by_col("col", SortOptions::new(false, false)) + .build()?; + + acc.update_batch(&[string_list_data([ + vec!["a", "b", "c"], + vec!["c", "d", "e"], + vec!["b", "c", "d"], + ])])?; + + // without compaction, the size is 17112 + assert_eq!(acc.size(), 2112); + + Ok(()) + } + struct ArrayAggAccumulatorBuilder { return_field: FieldRef, distinct: bool, @@ -1066,6 +1130,15 @@ mod tests { .collect() } + fn string_list_data<'a>(data: impl IntoIterator>) -> ArrayRef { + let mut builder = ListBuilder::new(StringBuilder::new()); + for string_list in data.into_iter() { + builder.append_value(string_list.iter().map(Some).collect::>()); + } + + Arc::new(builder.finish()) + } + fn data(list: [T; N]) -> ArrayRef where ScalarValue: From, From d7d054ed062346daedac28998a2d485d5b6fa9e5 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Thu, 12 Jun 2025 10:26:44 +0200 Subject: [PATCH 2/2] Fix test --- datafusion/functions-aggregate/src/array_agg.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index b1c332f5473a8..dd07b5c3aca56 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -1047,7 +1047,7 @@ mod tests { ])])?; // without compaction, the size is 17112 - assert_eq!(acc.size(), 2112); + assert_eq!(acc.size(), 2080); Ok(()) }