From 33df7e2fab8f1f60e3118a6565e64d8c0b72abc9 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Mon, 9 Jun 2025 14:39:33 +0200 Subject: [PATCH 1/2] Fix array_agg memory over accounting --- datafusion/common/src/scalar/mod.rs | 6 ++ .../functions-aggregate/src/array_agg.rs | 80 +++++++++++++++++-- 2 files changed, 81 insertions(+), 5 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 3d4aa78b6da6..c04a62d19466 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -3525,6 +3525,12 @@ impl ScalarValue { } } } + + /// Compacts ([ScalarValue::compact]) the current [ScalarValue] and returns it. + pub fn compacted(mut self) -> Self { + self.compact(); + self + } } pub fn copy_array_data(data: &ArrayData) -> ArrayData { diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 8c426d1f884a..62a895a3fb89 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -23,12 +23,14 @@ use std::mem::{size_of, size_of_val}; use std::sync::Arc; use arrow::array::{ - new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray, + make_array, new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, + StructArray, }; use arrow::compute::{filter, SortOptions}; use arrow::datatypes::{DataType, Field, FieldRef, Fields}; use datafusion_common::cast::as_list_array; +use datafusion_common::scalar::copy_array_data; use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder}; use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -313,7 +315,8 @@ impl Accumulator for ArrayAggAccumulator { }; if !val.is_empty() { - self.values.push(val); + self.values + .push(make_array(copy_array_data(&val.to_data()))); } Ok(()) @@ -423,7 +426,8 @@ impl Accumulator for DistinctArrayAggAccumulator { if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) { for i in 0..val.len() { if nulls.is_none_or(|nulls| nulls.is_valid(i)) { - self.values.insert(ScalarValue::try_from_array(val, i)?); + self.values + .insert(ScalarValue::try_from_array(val, i)?.compacted()); } } } @@ -577,8 +581,14 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) { for i in 0..val.len() { if nulls.is_none_or(|nulls| nulls.is_valid(i)) { - self.values.push(ScalarValue::try_from_array(val, i)?); - self.ordering_values.push(get_row_at_idx(ord, i)?) + self.values + .push(ScalarValue::try_from_array(val, i)?.compacted()); + self.ordering_values.push( + get_row_at_idx(ord, i)? + .into_iter() + .map(|v| v.compacted()) + .collect(), + ) } } } @@ -714,6 +724,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { #[cfg(test)] mod tests { use super::*; + use arrow::array::{ListBuilder, StringBuilder}; use arrow::datatypes::{FieldRef, Schema}; use datafusion_common::cast::as_generic_string_array; use datafusion_common::internal_err; @@ -980,6 +991,56 @@ mod tests { Ok(()) } + #[test] + fn does_not_over_account_memory() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + // without compaction, the size is 2652. + assert_eq!(acc1.size(), 732); + + Ok(()) + } + #[test] + fn does_not_over_account_memory_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[string_list_data([ + vec!["a", "b", "c"], + vec!["d", "e", "f"], + ])])?; + acc2.update_batch(&[string_list_data([vec!["e", "f", "g"]])])?; + acc1 = merge(acc1, acc2)?; + + // without compaction, the size is 16660 + assert_eq!(acc1.size(), 1660); + + Ok(()) + } + + #[test] + fn does_not_over_account_memory_ordered() -> Result<()> { + let mut acc = ArrayAggAccumulatorBuilder::string() + .order_by_col("col", SortOptions::new(false, false)) + .build()?; + + acc.update_batch(&[string_list_data([ + vec!["a", "b", "c"], + vec!["c", "d", "e"], + vec!["b", "c", "d"], + ])])?; + + // without compaction, the size is 17112 + assert_eq!(acc.size(), 2112); + + Ok(()) + } + struct ArrayAggAccumulatorBuilder { return_field: FieldRef, distinct: bool, @@ -1059,6 +1120,15 @@ mod tests { .collect() } + fn string_list_data<'a>(data: impl IntoIterator>) -> ArrayRef { + let mut builder = ListBuilder::new(StringBuilder::new()); + for string_list in data.into_iter() { + builder.append_value(string_list.iter().map(Some).collect::>()); + } + + Arc::new(builder.finish()) + } + fn data(list: [T; N]) -> ArrayRef where ScalarValue: From, From f5eba19dffcdec4ab112b464336363b66e2a0189 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Mon, 9 Jun 2025 16:38:55 +0200 Subject: [PATCH 2/2] Add comment --- datafusion/functions-aggregate/src/array_agg.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 62a895a3fb89..4ec73e306e0f 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -315,6 +315,9 @@ impl Accumulator for ArrayAggAccumulator { }; if !val.is_empty() { + // The ArrayRef might be holding a reference to its original input buffer, so + // storing it here directly copied/compacted avoids over accounting memory + // not used here. self.values .push(make_array(copy_array_data(&val.to_data()))); }