From 7af558e07d5f95bab3c3dbb1ff9e8745bf73a6b8 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 27 Sep 2025 09:28:09 +0200 Subject: [PATCH 01/17] first attempt --- .../src/aggregate.rs | 1 + .../src/aggregate/array_agg.rs | 596 ++++++++++++++++++ .../functions-aggregate/src/array_agg.rs | 123 +++- datafusion/sqllogictest/test_files/temp2.slt | 146 +++++ 4 files changed, 865 insertions(+), 1 deletion(-) create mode 100644 datafusion/functions-aggregate-common/src/aggregate/array_agg.rs create mode 100644 datafusion/sqllogictest/test_files/temp2.slt diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs index aadce907e7cc..92802a8e5228 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -19,3 +19,4 @@ pub mod avg_distinct; pub mod count_distinct; pub mod groups_accumulator; pub mod sum_distinct; +pub mod array_agg; diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs new file mode 100644 index 000000000000..b971a1e8f1f5 --- /dev/null +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -0,0 +1,596 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for implementing GroupsAccumulator +//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] + +use std::mem::{size_of, size_of_val}; +use std::sync::Arc; + +use arrow::array::{new_empty_array, Array, GenericListArray, ListArray}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use arrow::{ + array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}, + compute, + compute::take_arrays, + datatypes::UInt32Type, +}; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; +use datafusion_expr_common::accumulator::Accumulator; +use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; + +use crate::accumulator::AccumulatorArgs; + +/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] +/// +/// While [`Accumulator`] are simpler to implement and can support +/// more general calculations (like retractable window functions), +/// they are not as fast as a specialized `GroupsAccumulator`. This +/// interface bridges the gap so the group by operator only operates +/// in terms of [`Accumulator`]. +/// +/// Internally, this adapter creates a new [`Accumulator`] for each group which +/// stores the state for that group. This both requires an allocation for each +/// Accumulator, internal indices, as well as whatever internal allocations the +/// Accumulator itself requires. +/// +/// For example, a `MinAccumulator` that computes the minimum string value with +/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group +/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`). +/// +/// ```text +/// ┌─────────────────────────────────┐ +/// │MinAccumulator { │ +/// ┌─────▶│ min: ScalarValue::Utf8("A") │───────┐ +/// │ │} │ │ +/// │ └─────────────────────────────────┘ └───────▶ "A" +/// ┌─────┐ │ ┌─────────────────────────────────┐ +/// │ 0 │─────┘ │MinAccumulator { │ +/// ├─────┤ ┌─────▶│ min: ScalarValue::Utf8("Z") │───────────────▶ "Z" +/// │ 1 │─────┘ │} │ +/// └─────┘ └─────────────────────────────────┘ ... +/// ... ... +/// ┌─────┐ ┌────────────────────────────────┐ +/// │ N-2 │ │MinAccumulator { │ +/// ├─────┤ │ min: ScalarValue::Utf8("A") │────────────────▶ "A" +/// │ N-1 │─────┐ │} │ +/// └─────┘ │ └────────────────────────────────┘ +/// │ ┌────────────────────────────────┐ ┌───────▶ "Q" +/// │ │MinAccumulator { │ │ +/// └─────▶│ min: ScalarValue::Utf8("Q") │────────┘ +/// │} │ +/// └────────────────────────────────┘ +/// +/// +/// Logical group Current Min/Max value for that group stored +/// number as a ScalarValue which points to an +/// individually allocated String +/// +///``` +/// +/// # Optimizations +/// +/// The adapter minimizes the number of calls to [`Accumulator::update_batch`] +/// by first collecting the input rows for each group into a contiguous array +/// using [`compute::take`] +/// +/// +pub struct AggGroupAccumulator { + /// state for each group, stored in group_index order + states: Vec, + + factory: Box Result> + Send>, + + /// Current memory usage, in bytes. + /// + /// Note this is incrementally updated with deltas to avoid the + /// call to size() being a bottleneck. We saw size() being a + /// bottleneck in earlier implementations when there were many + /// distinct groups. + allocation_bytes: usize, +} + +#[derive(Debug)] +struct AccumulatorState { + /// [`Accumulator`] that stores the per-group state + accumulator: Box, + + /// scratch space: indexes in the input array that will be fed to + /// this accumulator. Stores indexes as `u32` to match the arrow + /// `take` kernel input. + indices: Vec, +} + +impl AccumulatorState { + fn new(accumulator: Box) -> Self { + Self { + accumulator, + indices: vec![], + } + } + + /// Returns the amount of memory taken by this structure and its accumulator + fn size(&self) -> usize { + self.accumulator.size() + size_of_val(self) + self.indices.allocated_size() + } +} + +impl AggGroupAccumulator { + /// Create a new adapter that will create a new [`Accumulator`] + /// for each group, using the specified factory function + pub fn new(f: F) -> Self + where + F: Fn() -> Result> + Send + 'static, + { + Self { + factory: Box::new(f), + states: vec![], + allocation_bytes: 0, + } + } + + /// Ensure that self.accumulators has total_num_groups + fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> { + // can't shrink + assert!(total_num_groups >= self.states.len()); + let vec_size_pre = self.states.allocated_size(); + + // instantiate new accumulators + let new_accumulators = total_num_groups - self.states.len(); + for _ in 0..new_accumulators { + let accumulator = (self.factory)()?; + let state = AccumulatorState::new(accumulator); + self.add_allocation(state.size()); + self.states.push(state); + } + + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); + Ok(()) + } + + /// invokes f(accumulator, values) for each group that has values + /// in group_indices. + /// + /// This function first reorders the input and filter so that + /// values for each group_index are contiguous and then invokes f + /// on the contiguous ranges, to minimize per-row overhead + /// + /// ```text + /// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ + /// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐ + /// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │ + /// │ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘ + /// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘ + /// + /// logical group values opt_filter logical group values opt_filter + /// + /// ``` + fn invoke_per_accumulator( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + f: F, + ) -> Result<()> + where + F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>, + { + self.make_accumulators_if_needed(total_num_groups)?; + + assert_eq!(values[0].len(), group_indices.len()); + + // figure out which input rows correspond to which groups. + // Note that self.state.indices starts empty for all groups + // (it is cleared out below) + for (idx, group_index) in group_indices.iter().enumerate() { + self.states[*group_index].indices.push(idx as u32); + } + + // groups_with_rows holds a list of group indexes that have + // any rows that need to be accumulated, stored in order of + // group_index + + let mut groups_with_rows = vec![]; + + // batch_indices holds indices into values, each group is contiguous + let mut batch_indices = vec![]; + + // offsets[i] is index into batch_indices where the rows for + // group_index i starts + let mut offsets = vec![0]; + + let mut offset_so_far = 0; + for (group_index, state) in self.states.iter_mut().enumerate() { + let indices = &state.indices; + if indices.is_empty() { + continue; + } + + groups_with_rows.push(group_index); + batch_indices.extend_from_slice(indices); + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.into(); + + // reorder the values and opt_filter by batch_indices so that + // all values for each group are contiguous, then invoke the + // accumulator once per group with values + let values = take_arrays(values, &batch_indices, None)?; + let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; + + // invoke each accumulator with the appropriate rows, first + // pulling the input arguments for this group into their own + // RecordBatch(es) + let iter = groups_with_rows.iter().zip(offsets.windows(2)); + + let mut sizes_pre = 0; + let mut sizes_post = 0; + for (&group_idx, offsets) in iter { + let state = &mut self.states[group_idx]; + sizes_pre += state.size(); + + let values_to_accumulate = slice_and_maybe_filter( + &values, + opt_filter.as_ref().map(|f| f.as_boolean()), + offsets, + )?; + f(state.accumulator.as_mut(), &values_to_accumulate)?; + + // clear out the state so they are empty for next + // iteration + state.indices.clear(); + sizes_post += state.size(); + } + + self.adjust_allocation(sizes_pre, sizes_post); + Ok(()) + } + + /// Increment the allocation by `n` + /// + /// See [`Self::allocation_bytes`] for rationale. + fn add_allocation(&mut self, size: usize) { + self.allocation_bytes += size; + } + + /// Decrease the allocation by `n` + /// + /// See [`Self::allocation_bytes`] for rationale. + fn free_allocation(&mut self, size: usize) { + // use saturating sub to avoid errors if the accumulators + // report erroneous sizes + self.allocation_bytes = self.allocation_bytes.saturating_sub(size) + } + + /// Adjusts the allocation for something that started with + /// start_size and now has new_size avoiding overflow + /// + /// See [`Self::allocation_bytes`] for rationale. + fn adjust_allocation(&mut self, old_size: usize, new_size: usize) { + if new_size > old_size { + self.add_allocation(new_size - old_size) + } else { + self.free_allocation(old_size - new_size) + } + } +} + +impl GroupsAccumulator for AggGroupAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator( + values, + group_indices, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.update_batch(values_to_accumulate) + }, + )?; + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let vec_size_pre = self.states.allocated_size(); + + let states = emit_to.take_needed(&mut self.states); + + let results: Vec = states + .into_iter() + .map(|mut state| { + self.free_allocation(state.size()); + state.accumulator.evaluate() + }) + .collect::>()?; + + let result = ScalarValue::iter_to_array(results); + + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); + + result + } + + // filtered_null_mask(opt_filter, &values); + fn state(&mut self, emit_to: EmitTo) -> Result> { + let vec_size_pre = self.states.allocated_size(); + let states = emit_to.take_needed(&mut self.states); + + // each accumulator produces a potential vector of values + // which we need to form into columns + let mut results: Vec> = vec![]; + + for mut state in states { + self.free_allocation(state.size()); + let accumulator_state = state.accumulator.state()?; + results.resize_with(accumulator_state.len(), Vec::new); + for (idx, state_val) in accumulator_state.into_iter().enumerate() { + results[idx].push(state_val); + } + } + + // create an array for each intermediate column + let arrays = results + .into_iter() + .map(ScalarValue::iter_to_array) + .collect::>>()?; + + // double check each array has the same length (aka the + // accumulator was implemented correctly + if let Some(first_col) = arrays.first() { + for arr in &arrays { + assert_eq!(arr.len(), first_col.len()) + } + } + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); + + Ok(arrays) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator( + values, + group_indices, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.merge_batch(values_to_accumulate)?; + Ok(()) + }, + )?; + Ok(()) + } + + fn size(&self) -> usize { + self.allocation_bytes + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let num_rows = values[0].len(); + + // If there are no rows, return empty arrays + if num_rows == 0 { + // create empty accumulator to get the state types + let empty_state = (self.factory)()?.state()?; + let empty_arrays = empty_state + .into_iter() + .map(|state_val| new_empty_array(&state_val.data_type())) + .collect::>(); + + return Ok(empty_arrays); + } + + if false { + let mut results = vec![]; + for row_idx in 0..num_rows { + // Create the empty accumulator for converting + let mut converted_accumulator = (self.factory)()?; + + // Convert row to states + let values_to_accumulate = + slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?; + converted_accumulator.update_batch(&values_to_accumulate)?; + let states = converted_accumulator.state()?; + + // Resize results to have enough columns according to the converted states + results.resize_with(states.len(), || Vec::with_capacity(num_rows)); + + // Add the states to results + for (idx, state_val) in states.into_iter().enumerate() { + results[idx].push(state_val); + } + } + // vec> -> vec + + let arrays = results + .into_iter() + .map(ScalarValue::iter_to_array) + .collect::>>()?; + println!("{:?}", arrays[0]); + if arrays[0].len() != num_rows { + panic!( + "state after calling convert_to_state is not the same with numrows" + ) + } + return Ok(arrays); + } else { + // Each row has its respective group + let mut results = vec![]; + + let mut converted_accumulator = (self.factory)()?; + // Convert row to states + // println!("incoming values {:?}", values); + let values_to_accumulate = + slice_and_maybe_filter(values, opt_filter, &[0, num_rows])?; + converted_accumulator.update_batch(&values_to_accumulate)?; + let states = converted_accumulator.state()?; + + // Resize results to have enough columns according to the converted states + results.resize_with(states.len(), || ScalarValue::Null); + + // Add the states to results + for (idx, state_val) in states.into_iter().enumerate() { + results[idx] = state_val; + } + let arr = results + .into_iter() + .enumerate() + .map(|(index, a)| { + let item_type = inner_datatype_from_list(&a.data_type()); + let dt = a.data_type(); + + // let backend = ScalarValue::iter_to_array(a)?; + let backend = try_unnest(&a).unwrap(); + let offsets = backend.offsets(); + // backend. + // let arr = FixedSizeListArray::new(field, 1, backend, None); + // values.extend_from_slice(&[None, Some("F")]); + + let offsets = + OffsetBuffer::from_lengths(std::iter::repeat_n(1, num_rows)); + // let new_dt = inner_datatype_from_list(dt); + // println!("{new_dt}"); + let field = Arc::new(Field::new_list_field(item_type, true)); + + let arr = GenericListArray::::new( + field, + OffsetBuffer::new(offsets.into()), + backend.values().clone(), + None, + ); + return Ok(Arc::new(arr) as Arc); + }) + .collect::>>()?; + return Ok(arr); + } + // println!("{:?}, num rows {num_rows}",results[0]); + // let a = results[0]; + + // let dt = (&results[0][0]).data_type(); + // let first_state = results.first().unwrap(); + // let field = Arc::new(Field::new_list_field(dt, false)); + // // let valid = NullBuffer::from(vec![true, false, true, false, true, true]); + // let a = ScalarValue::iter_to_array(first_state.iter())?; + + // let arr = FixedSizeListArray::new(field, 1, a, None); + // return Ok(vec![Arc::new(arr)]); + + // Ok(arrays) + } + + fn supports_convert_to_state(&self) -> bool { + true + } +} + +/// Extension trait for [`Vec`] to account for allocations. +pub trait VecAllocExt { + /// Item type. + type T; + /// Return the amount of memory allocated by this Vec (not + /// recursively counting any heap allocations contained within the + /// structure). Does not include the size of `self` + fn allocated_size(&self) -> usize; +} + +impl VecAllocExt for Vec { + type T = T; + fn allocated_size(&self) -> usize { + size_of::() * self.capacity() + } +} + +fn get_filter_at_indices( + opt_filter: Option<&BooleanArray>, + indices: &PrimitiveArray, +) -> Result> { + opt_filter + .map(|filter| { + compute::take( + &filter, indices, None, // None: no index check + ) + }) + .transpose() + .map_err(|e| arrow_datafusion_err!(e)) +} + +// Copied from physical-plan +pub(crate) fn slice_and_maybe_filter( + aggr_array: &[ArrayRef], + filter_opt: Option<&BooleanArray>, + offsets: &[usize], +) -> Result> { + let (offset, length) = (offsets[0], offsets[1] - offsets[0]); + let sliced_arrays: Vec = aggr_array + .iter() + .map(|array| array.slice(offset, length)) + .collect(); + + if let Some(f) = filter_opt { + let filter = f.slice(offset, length); + + sliced_arrays + .iter() + .map(|array| { + compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e)) + }) + .collect() + } else { + Ok(sliced_arrays) + } +} + +fn inner_datatype_from_list(dt: &DataType) -> DataType { + match dt { + DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { + f.data_type().clone() + } + _ => dt.clone(), + } +} + +fn try_unnest(a: &ScalarValue) -> Option> { + match a { + ScalarValue::List(l) => Some(l.clone()), + _ => None, + } +} diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 268349ecf1b6..3bb05a8e1ce6 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -36,12 +36,14 @@ use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::{ - Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility, + Accumulator, AggregateUDFImpl, Documentation, Signature, Sort, Volatility, }; +use datafusion_functions_aggregate_common::aggregate::array_agg::AggGroupAccumulator; use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays; use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; use datafusion_functions_aggregate_common::utils::ordering_fields; use datafusion_macros::user_doc; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; make_udaf_expr_and_func!( @@ -91,6 +93,69 @@ impl Default for ArrayAgg { } } +fn accumulator_independent( + is_input_pre_ordered: bool, + // acc_args: AccumulatorArgs, + data_type: DataType, + ignore_nulls: bool, + is_distinct: bool, + is_reversed: bool, + sort_options: Option, + lex_ordering: Option, + ordering_dtypes: Vec, +) -> Result> { + // let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; + // let ignore_nulls = + // acc_args.ignore_nulls && acc_args.exprs[0].nullable(acc_args.schema)?; + + if is_distinct { + // Limitation similar to Postgres. The aggregation function can only mix + // DISTINCT and ORDER BY if all the expressions in the ORDER BY appear + // also in the arguments of the function. This implies that if the + // aggregation function only accepts one argument, only one argument + // can be used in the ORDER BY, For example: + // + // ARRAY_AGG(DISTINCT col) + // + // can only be mixed with an ORDER BY if the order expression is "col". + // + // ARRAY_AGG(DISTINCT col ORDER BY col) <- Valid + // ARRAY_AGG(DISTINCT concat(col, '') ORDER BY concat(col, '')) <- Valid + // ARRAY_AGG(DISTINCT col ORDER BY other_col) <- Invalid + // ARRAY_AGG(DISTINCT col ORDER BY concat(col, '')) <- Invalid + // let sort_option = match acc_args.order_bys { + // [single] if single.expr.eq(&acc_args.exprs[0]) => Some(single.options), + // [] => None, + // _ => { + // return exec_err!( + // "In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list" + // ); + // } + // }; + return Ok(Box::new(DistinctArrayAggAccumulator::try_new( + &data_type, + sort_options, + ignore_nulls, + )?)); + } + let Some(ordering) = lex_ordering else { + return Ok(Box::new(ArrayAggAccumulator::try_new( + &data_type, + ignore_nulls, + )?)); + }; + + OrderSensitiveArrayAggAccumulator::try_new( + &data_type, + &ordering_dtypes, + ordering, + is_input_pre_ordered, + is_reversed, + ignore_nulls, + ) + .map(|acc| Box::new(acc) as _) +} + impl AggregateUDFImpl for ArrayAgg { fn as_any(&self) -> &dyn std::any::Any { self @@ -99,6 +164,62 @@ impl AggregateUDFImpl for ArrayAgg { fn name(&self) -> &str { "array_agg" } + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { + true + } + fn create_groups_accumulator( + &self, + acc_args: AccumulatorArgs, + ) -> Result> { + let is_distinct = acc_args.is_distinct; + let is_reversed = acc_args.is_reversed; + let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; + let ignore_nulls = + acc_args.ignore_nulls && acc_args.exprs[0].nullable(acc_args.schema)?; + + let sort_options = match acc_args.order_bys { + [single] if single.expr.eq(&acc_args.exprs[0]) => Some(single.options), + [] => None, + _ => { + if acc_args.is_distinct { + return exec_err!( + "In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list" + ); + } + None + } + }; + + let lex_ordering = LexOrdering::new(acc_args.order_bys.to_vec()); + let ordering_dtypes = match lex_ordering { + Some(ref ordering) => { + let ordering_dtypes = ordering + .iter() + .map(|e| e.expr.data_type(acc_args.schema)) + .collect::>>()?; + ordering_dtypes + } + None => vec![], + }; + + let is_input_pre_ordered = self.is_input_pre_ordered; + + let factory = { + move || { + accumulator_independent( + is_input_pre_ordered, + data_type.clone(), + ignore_nulls, + is_distinct, + is_reversed, + sort_options.clone(), + lex_ordering.clone(), + ordering_dtypes.clone(), + ) + } + }; + Ok(Box::new(AggGroupAccumulator::new(factory))) + } fn signature(&self) -> &Signature { &self.signature diff --git a/datafusion/sqllogictest/test_files/temp2.slt b/datafusion/sqllogictest/test_files/temp2.slt new file mode 100644 index 000000000000..523a4c5ec5e8 --- /dev/null +++ b/datafusion/sqllogictest/test_files/temp2.slt @@ -0,0 +1,146 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# make sure to a batch size smaller than row number of the table. +statement ok +set datafusion.execution.batch_size = 2; + +############# +## Subquery Tests +############# + + +############# +## Setup test data table +############# +# there tables for subquery + + +statement ok +CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES +(11, 'a', 1), +(22, 'a', 1), +(33, 'a', 1), +(33, 'a', 1), +(33, 'a', 1), +(33, 'a', 1), +(44, 'a', 1), +(44, 'a', 1), +(55, 'a', 1), +(44, 'a', 1), +(44, 'a', 1), +(44, 'a', 1), +(55, 'a', 1), +(55, 'a', 1), +(55, 'a', 1), +(55, 'a', 1), +(66, 'a', 1), +(66, 'a', 1), +(66, 'a', 1), +(66, 'a', 1), +(66, 'a', 1), +(33, 'a', 1), +(33, 'a', 1); + +statement ok +CREATE TABLE t2 AS VALUES +(11, 'z', struct(1,'hello',3)), +(22, 'y',NULL), +(11, 'x', struct(1,'hola',6)), +(22, 'w', NULL); + +# Prepare settings to skip partial aggregation from the beginning +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 0; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_ratio_threshold = 0.0; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + + + + + +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 10; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_ratio_threshold = 0.0; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 2; + +# query IIR +# SELECT c2, approx_median(c5), approx_median(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; +# ---- +# 1 191655437 0.59926736 +# 2 -587831330 0.43230486 +# 3 240273900 0.40199697 +# 4 762932956 0.48515016 +# 5 593204320 0.5156586 + +query I? +SELECT t1_id, array_agg(t1_id) FROM t1 GROUP BY t1_id ORDER BY t1_id; +---- +1 191655437 +2 -587831330 +3 240273900 +4 762932956 +5 593204320 + + + + + +# query TT +# explain select t1.t1_int from t1 left join ( +# select * from t1 left join ( +# select count(*) cnt ,t1_int from t1 left join t2 +# on t2.t2_int is not distinct from t1_int group by t1_int +# ) temp on t1.t1_int is not distinct from temp.t1_int +# ) a on t1.t1_int=a.t1_int where a.cnt=0 +# --- + + From d1e694c8d764f9a4a3a7af21ea73df593a4f91a2 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Fri, 3 Oct 2025 14:11:54 +0200 Subject: [PATCH 02/17] feat: try interleave kernel --- .../src/aggregate/array_agg.rs | 620 ++++++------------ .../groups_accumulator/accumulate.rs | 5 +- .../functions-aggregate/src/array_agg.rs | 1 + 3 files changed, 212 insertions(+), 414 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index b971a1e8f1f5..ffc6be5bc451 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -18,11 +18,13 @@ //! Utilities for implementing GroupsAccumulator //! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] -use std::mem::{size_of, size_of_val}; +use std::iter::repeat; +use std::mem::{size_of, size_of_val, take}; use std::sync::Arc; -use arrow::array::{new_empty_array, Array, GenericListArray, ListArray}; -use arrow::buffer::OffsetBuffer; +use arrow::array::{new_empty_array, Array, GenericListArray, ListArray, StructArray}; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; +use arrow::compute::kernels::{self, concat}; use arrow::datatypes::{DataType, Field}; use arrow::{ array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}, @@ -30,72 +32,33 @@ use arrow::{ compute::take_arrays, datatypes::UInt32Type, }; -use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::utils::SingleRowListArrayBuilder; +use datafusion_common::{ + arrow_datafusion_err, internal_datafusion_err, DataFusionError, Result, ScalarValue, +}; use datafusion_expr_common::accumulator::Accumulator; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; use crate::accumulator::AccumulatorArgs; +use crate::aggregate::groups_accumulator::accumulate::NullState; -/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] -/// -/// While [`Accumulator`] are simpler to implement and can support -/// more general calculations (like retractable window functions), -/// they are not as fast as a specialized `GroupsAccumulator`. This -/// interface bridges the gap so the group by operator only operates -/// in terms of [`Accumulator`]. -/// -/// Internally, this adapter creates a new [`Accumulator`] for each group which -/// stores the state for that group. This both requires an allocation for each -/// Accumulator, internal indices, as well as whatever internal allocations the -/// Accumulator itself requires. -/// -/// For example, a `MinAccumulator` that computes the minimum string value with -/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group -/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`). -/// -/// ```text -/// ┌─────────────────────────────────┐ -/// │MinAccumulator { │ -/// ┌─────▶│ min: ScalarValue::Utf8("A") │───────┐ -/// │ │} │ │ -/// │ └─────────────────────────────────┘ └───────▶ "A" -/// ┌─────┐ │ ┌─────────────────────────────────┐ -/// │ 0 │─────┘ │MinAccumulator { │ -/// ├─────┤ ┌─────▶│ min: ScalarValue::Utf8("Z") │───────────────▶ "Z" -/// │ 1 │─────┘ │} │ -/// └─────┘ └─────────────────────────────────┘ ... -/// ... ... -/// ┌─────┐ ┌────────────────────────────────┐ -/// │ N-2 │ │MinAccumulator { │ -/// ├─────┤ │ min: ScalarValue::Utf8("A") │────────────────▶ "A" -/// │ N-1 │─────┐ │} │ -/// └─────┘ │ └────────────────────────────────┘ -/// │ ┌────────────────────────────────┐ ┌───────▶ "Q" -/// │ │MinAccumulator { │ │ -/// └─────▶│ min: ScalarValue::Utf8("Q") │────────┘ -/// │} │ -/// └────────────────────────────────┘ -/// -/// -/// Logical group Current Min/Max value for that group stored -/// number as a ScalarValue which points to an -/// individually allocated String -/// -///``` -/// -/// # Optimizations -/// -/// The adapter minimizes the number of calls to [`Accumulator::update_batch`] -/// by first collecting the input rows for each group into a contiguous array -/// using [`compute::take`] -/// -/// pub struct AggGroupAccumulator { - /// state for each group, stored in group_index order - states: Vec, - - factory: Box Result> + Send>, - + // [1,2,3] [4,5,6] + stacked_batches: Vec, + // address items of each group within the stacked_batches + // this is maintained to perform kernel::interleave + stacked_group_indices: Vec>, + + // similar to the previous two fields, but these for states merging + stacked_states: Vec, + stacked_states_group_indices: Vec>, + // TODO: document me + // for each group index, total accumulated length + stacked_states_group_length: Vec, + // merged_states: Vec< + ns: NullState, + + // factory: Box Result> + Send>, /// Current memory usage, in bytes. /// /// Note this is incrementally updated with deltas to avoid the @@ -105,31 +68,6 @@ pub struct AggGroupAccumulator { allocation_bytes: usize, } -#[derive(Debug)] -struct AccumulatorState { - /// [`Accumulator`] that stores the per-group state - accumulator: Box, - - /// scratch space: indexes in the input array that will be fed to - /// this accumulator. Stores indexes as `u32` to match the arrow - /// `take` kernel input. - indices: Vec, -} - -impl AccumulatorState { - fn new(accumulator: Box) -> Self { - Self { - accumulator, - indices: vec![], - } - } - - /// Returns the amount of memory taken by this structure and its accumulator - fn size(&self) -> usize { - self.accumulator.size() + size_of_val(self) + self.indices.allocated_size() - } -} - impl AggGroupAccumulator { /// Create a new adapter that will create a new [`Accumulator`] /// for each group, using the specified factory function @@ -138,169 +76,99 @@ impl AggGroupAccumulator { F: Fn() -> Result> + Send + 'static, { Self { - factory: Box::new(f), - states: vec![], + stacked_batches: vec![], + stacked_group_indices: vec![], + stacked_states: vec![], + stacked_states_group_indices: vec![], + stacked_states_group_length: vec![], + ns: NullState::new(), allocation_bytes: 0, } } + fn consume_stacked_batches(&mut self) -> Result> { + let stacked = take(&mut self.stacked_batches); + let stack2 = stacked.iter().map(|arr| arr.as_ref()).collect::>(); + + let group_indices = take(&mut self.stacked_group_indices); + let offsets = group_indices.iter().map(|v| v.len()).scan(0, |state, len| { + *state += len; + Some(*state as i32) + }); + + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from_iter(offsets)); + // group indices like [1,1,1,2,2,2] + // backend_array like [a,b,c,d,e,f] + // offsets should be: [0,3,6] + // then result should be [a,b,c], [d,e,f] + + // backend_array is a flatten list of individual values before aggregation + let backend_array = kernels::interleave::interleave( + &stack2, + group_indices + .into_iter() + .flatten() + .collect::>() + .as_slice(), + )?; + let dt = backend_array.data_type(); + let field = Arc::new(Field::new_list_field(dt.clone(), false)); - /// Ensure that self.accumulators has total_num_groups - fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> { - // can't shrink - assert!(total_num_groups >= self.states.len()); - let vec_size_pre = self.states.allocated_size(); - - // instantiate new accumulators - let new_accumulators = total_num_groups - self.states.len(); - for _ in 0..new_accumulators { - let accumulator = (self.factory)()?; - let state = AccumulatorState::new(accumulator); - self.add_allocation(state.size()); - self.states.push(state); - } - - self.adjust_allocation(vec_size_pre, self.states.allocated_size()); - Ok(()) + let arr = + GenericListArray::::new(field, offsets_buffer, backend_array, None); + return Ok(arr); } - /// invokes f(accumulator, values) for each group that has values - /// in group_indices. - /// - /// This function first reorders the input and filter so that - /// values for each group_index are contiguous and then invokes f - /// on the contiguous ranges, to minimize per-row overhead - /// - /// ```text - /// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ - /// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐ - /// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │ - /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ - /// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │ - /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ - /// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │ - /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ - /// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │ - /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ - /// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │ - /// │ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘ - /// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘ - /// - /// logical group values opt_filter logical group values opt_filter - /// - /// ``` - fn invoke_per_accumulator( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&BooleanArray>, - total_num_groups: usize, - f: F, - ) -> Result<()> - where - F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>, - { - self.make_accumulators_if_needed(total_num_groups)?; - - assert_eq!(values[0].len(), group_indices.len()); - - // figure out which input rows correspond to which groups. - // Note that self.state.indices starts empty for all groups - // (it is cleared out below) - for (idx, group_index) in group_indices.iter().enumerate() { - self.states[*group_index].indices.push(idx as u32); - } - - // groups_with_rows holds a list of group indexes that have - // any rows that need to be accumulated, stored in order of - // group_index - - let mut groups_with_rows = vec![]; - - // batch_indices holds indices into values, each group is contiguous - let mut batch_indices = vec![]; + fn consume_stacked_states(&mut self) -> Result> { + let stacked = take(&mut self.stacked_states); + let stacked2 = stacked.iter().map(|arr| arr.as_ref()).collect::>(); - // offsets[i] is index into batch_indices where the rows for - // group_index i starts - let mut offsets = vec![0]; + let group_indices = take(&mut self.stacked_states_group_indices); + let group_length = take(&mut self.stacked_states_group_length); - let mut offset_so_far = 0; - for (group_index, state) in self.states.iter_mut().enumerate() { - let indices = &state.indices; - if indices.is_empty() { - continue; - } - - groups_with_rows.push(group_index); - batch_indices.extend_from_slice(indices); - offset_so_far += indices.len(); - offsets.push(offset_so_far); - } - let batch_indices = batch_indices.into(); - - // reorder the values and opt_filter by batch_indices so that - // all values for each group are contiguous, then invoke the - // accumulator once per group with values - let values = take_arrays(values, &batch_indices, None)?; - let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; - - // invoke each accumulator with the appropriate rows, first - // pulling the input arguments for this group into their own - // RecordBatch(es) - let iter = groups_with_rows.iter().zip(offsets.windows(2)); - - let mut sizes_pre = 0; - let mut sizes_post = 0; - for (&group_idx, offsets) in iter { - let state = &mut self.states[group_idx]; - sizes_pre += state.size(); - - let values_to_accumulate = slice_and_maybe_filter( - &values, - opt_filter.as_ref().map(|f| f.as_boolean()), - offsets, - )?; - f(state.accumulator.as_mut(), &values_to_accumulate)?; - - // clear out the state so they are empty for next - // iteration - state.indices.clear(); - sizes_post += state.size(); - } - - self.adjust_allocation(sizes_pre, sizes_post); - Ok(()) - } - - /// Increment the allocation by `n` - /// - /// See [`Self::allocation_bytes`] for rationale. - fn add_allocation(&mut self, size: usize) { - self.allocation_bytes += size; - } - - /// Decrease the allocation by `n` - /// - /// See [`Self::allocation_bytes`] for rationale. - fn free_allocation(&mut self, size: usize) { - // use saturating sub to avoid errors if the accumulators - // report erroneous sizes - self.allocation_bytes = self.allocation_bytes.saturating_sub(size) - } - - /// Adjusts the allocation for something that started with - /// start_size and now has new_size avoiding overflow - /// - /// See [`Self::allocation_bytes`] for rationale. - fn adjust_allocation(&mut self, old_size: usize, new_size: usize) { - if new_size > old_size { - self.add_allocation(new_size - old_size) - } else { - self.free_allocation(old_size - new_size) - } + let offsets: Vec = group_length + .iter() + .scan(0, |state, len| { + *state += len; + Some(*state) + }) + .collect(); + + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from_iter(offsets)); + // group indices like [1,1,2,2] + // interleave result like [[a,b],[c,d],[e,f], [g]] + // backend array like [a,b,c,d,e,f,g] + // offsets should be: [0,4,7] + // then result should be [a,b,c,d], [e,f, g] + let list_arr = kernels::interleave::interleave( + &stacked2, + group_indices + .into_iter() + .flatten() + .collect::>() + .as_slice(), + )?; + let backend_array = list_arr.as_list::().values(); + let dt = backend_array.data_type(); + let field = Arc::new(Field::new_list_field(dt.clone(), false)); + + let arr = GenericListArray::::new( + field, + offsets_buffer, + backend_array.clone(), + None, + ); + return Ok(arr); } } impl GroupsAccumulator for AggGroupAccumulator { + // batch1 [1,4,5,6,7] + // batch2 [5,1,1,1,1] + + // indices g1: [(0,0), (1,1), (1,2) ...] + // indices g2: [] + // indices g3: [] + // indices g4: [(0,1)] fn update_batch( &mut self, values: &[ArrayRef], @@ -308,72 +176,58 @@ impl GroupsAccumulator for AggGroupAccumulator { opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { - self.invoke_per_accumulator( - values, - group_indices, - opt_filter, - total_num_groups, - |accumulator, values_to_accumulate| { - accumulator.update_batch(values_to_accumulate) - }, - )?; - Ok(()) - } - - fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let vec_size_pre = self.states.allocated_size(); - - let states = emit_to.take_needed(&mut self.states); + if opt_filter.is_some() { + panic!("not implemented"); + } - let results: Vec = states - .into_iter() - .map(|mut state| { - self.free_allocation(state.size()); - state.accumulator.evaluate() - }) - .collect::>()?; + let singular_col = values + .get(0) + .ok_or(internal_datafusion_err!("invalid agg input"))?; + if self.stacked_group_indices.len() < total_num_groups { + self.stacked_group_indices + .resize(total_num_groups, Vec::new()); + } + // null value is handled - let result = ScalarValue::iter_to_array(results); + self.stacked_batches.push(Arc::clone(singular_col)); + let batch_index = self.stacked_batches.len() - 1; + for (array_offset, group_index) in group_indices.iter().enumerate() { + self.stacked_group_indices[*group_index].push((batch_index, array_offset)); + } - self.adjust_allocation(vec_size_pre, self.states.allocated_size()); + Ok(()) + } - result + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + if matches!(emit_to, EmitTo::First(_)) { + return Err(internal_datafusion_err!("unimpl eimit to first")); + } + let arr = self.consume_stacked_batches()?; + return Ok(Arc::new(arr) as ArrayRef); + // only batch stacked no states interleaved + // if !self.stacked_batches.is_empty() + // && self.stacked_states_group_indices.is_empty() + // { + // let arr = self.consume_stacked_batches()?; + // return Ok(Arc::new(arr) as ArrayRef); + // } + // // only stacked states, no stacked batches interleave + // if self.stacked_batches.is_empty() + // && !self.stacked_states_group_indices.is_empty() + // { + // let arr = self.consume_stacked_states()?; + // return Ok(Arc::new(arr) as ArrayRef); + // } + // let stacked = take(&mut self.stacked_batches); + // let stacked_indices = take(&mut self.stacked_group_indices); + + // let stacked_states = take(&mut self.stacked_states); + // let staced_state_indices = take(&mut self.stacked_states_group_indices); } // filtered_null_mask(opt_filter, &values); fn state(&mut self, emit_to: EmitTo) -> Result> { - let vec_size_pre = self.states.allocated_size(); - let states = emit_to.take_needed(&mut self.states); - - // each accumulator produces a potential vector of values - // which we need to form into columns - let mut results: Vec> = vec![]; - - for mut state in states { - self.free_allocation(state.size()); - let accumulator_state = state.accumulator.state()?; - results.resize_with(accumulator_state.len(), Vec::new); - for (idx, state_val) in accumulator_state.into_iter().enumerate() { - results[idx].push(state_val); - } - } - - // create an array for each intermediate column - let arrays = results - .into_iter() - .map(ScalarValue::iter_to_array) - .collect::>>()?; - - // double check each array has the same length (aka the - // accumulator was implemented correctly - if let Some(first_col) = arrays.first() { - for arr in &arrays { - assert_eq!(arr.len(), first_col.len()) - } - } - self.adjust_allocation(vec_size_pre, self.states.allocated_size()); - - Ok(arrays) + Ok(vec![self.evaluate(emit_to)?]) } fn merge_batch( @@ -383,21 +237,49 @@ impl GroupsAccumulator for AggGroupAccumulator { opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { - self.invoke_per_accumulator( - values, - group_indices, - opt_filter, + // TODO: all the reference to this function always result into this opt_filter as none + assert!(opt_filter.is_none()); + let singular_col = values + .get(0) + .ok_or(internal_datafusion_err!("invalid agg input"))?; + let list_arr = singular_col.as_list::(); + let backed_arr = list_arr.values(); + let flatten_group_index = group_indices + .iter() + .enumerate() + .map(|(row, group_index)| { + let row_length = list_arr.value_length(row); + repeat(*group_index).take(row_length as usize) + }) + .flatten() + .collect::>(); + return self.update_batch( + &[backed_arr.clone()], + &flatten_group_index, + None, total_num_groups, - |accumulator, values_to_accumulate| { - accumulator.merge_batch(values_to_accumulate)?; - Ok(()) - }, - )?; + ); + // if self.stacked_states.len() < total_num_groups { + // self.stacked_states_group_indices + // .resize(total_num_groups, Vec::new()); + // self.stacked_states_group_length.resize(total_num_groups, 0); + // } + + // let batch_index = self.stacked_states.len(); + // for (array_offset, group_index) in group_indices.iter().enumerate() { + // self.stacked_states_group_indices[*group_index] + // .push((batch_index, array_offset)); + // self.stacked_states_group_length[*group_index] += + // singular_col.as_list::().value_length(array_offset) + // } + + // self.stacked_states.push(Arc::clone(singular_col)); Ok(()) } fn size(&self) -> usize { - self.allocation_bytes + 1000 + // self.allocation_bytes } fn convert_to_state( @@ -405,116 +287,28 @@ impl GroupsAccumulator for AggGroupAccumulator { values: &[ArrayRef], opt_filter: Option<&BooleanArray>, ) -> Result> { - let num_rows = values[0].len(); + assert!(opt_filter.is_none()); + let col_array = values + .get(0) + .ok_or(internal_datafusion_err!("invalid state for array agg"))?; + let num_rows = col_array.len(); // If there are no rows, return empty arrays if num_rows == 0 { - // create empty accumulator to get the state types - let empty_state = (self.factory)()?.state()?; - let empty_arrays = empty_state - .into_iter() - .map(|state_val| new_empty_array(&state_val.data_type())) - .collect::>(); - - return Ok(empty_arrays); - } - - if false { - let mut results = vec![]; - for row_idx in 0..num_rows { - // Create the empty accumulator for converting - let mut converted_accumulator = (self.factory)()?; - - // Convert row to states - let values_to_accumulate = - slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?; - converted_accumulator.update_batch(&values_to_accumulate)?; - let states = converted_accumulator.state()?; - - // Resize results to have enough columns according to the converted states - results.resize_with(states.len(), || Vec::with_capacity(num_rows)); - - // Add the states to results - for (idx, state_val) in states.into_iter().enumerate() { - results[idx].push(state_val); - } - } - // vec> -> vec - - let arrays = results - .into_iter() - .map(ScalarValue::iter_to_array) - .collect::>>()?; - println!("{:?}", arrays[0]); - if arrays[0].len() != num_rows { - panic!( - "state after calling convert_to_state is not the same with numrows" - ) - } - return Ok(arrays); - } else { - // Each row has its respective group - let mut results = vec![]; - - let mut converted_accumulator = (self.factory)()?; - // Convert row to states - // println!("incoming values {:?}", values); - let values_to_accumulate = - slice_and_maybe_filter(values, opt_filter, &[0, num_rows])?; - converted_accumulator.update_batch(&values_to_accumulate)?; - let states = converted_accumulator.state()?; - - // Resize results to have enough columns according to the converted states - results.resize_with(states.len(), || ScalarValue::Null); - - // Add the states to results - for (idx, state_val) in states.into_iter().enumerate() { - results[idx] = state_val; - } - let arr = results - .into_iter() - .enumerate() - .map(|(index, a)| { - let item_type = inner_datatype_from_list(&a.data_type()); - let dt = a.data_type(); - - // let backend = ScalarValue::iter_to_array(a)?; - let backend = try_unnest(&a).unwrap(); - let offsets = backend.offsets(); - // backend. - // let arr = FixedSizeListArray::new(field, 1, backend, None); - // values.extend_from_slice(&[None, Some("F")]); - - let offsets = - OffsetBuffer::from_lengths(std::iter::repeat_n(1, num_rows)); - // let new_dt = inner_datatype_from_list(dt); - // println!("{new_dt}"); - let field = Arc::new(Field::new_list_field(item_type, true)); - - let arr = GenericListArray::::new( - field, - OffsetBuffer::new(offsets.into()), - backend.values().clone(), - None, - ); - return Ok(Arc::new(arr) as Arc); - }) - .collect::>>()?; - return Ok(arr); + return Ok(vec![new_empty_array(col_array.data_type())]); } - // println!("{:?}, num rows {num_rows}",results[0]); - // let a = results[0]; - - // let dt = (&results[0][0]).data_type(); - // let first_state = results.first().unwrap(); - // let field = Arc::new(Field::new_list_field(dt, false)); - // // let valid = NullBuffer::from(vec![true, false, true, false, true, true]); - // let a = ScalarValue::iter_to_array(first_state.iter())?; - - // let arr = FixedSizeListArray::new(field, 1, a, None); - // return Ok(vec![Arc::new(arr)]); - - // Ok(arrays) + let dt = col_array.data_type(); + + let offsets = OffsetBuffer::from_lengths(std::iter::repeat_n(1, num_rows)); + let field = Arc::new(Field::new_list_field(dt.clone(), false)); + + let arr = GenericListArray::::new( + field, + OffsetBuffer::new(offsets.into()), + col_array.clone(), + None, + ); + return Ok(vec![Arc::new(arr) as Arc]); } fn supports_convert_to_state(&self) -> bool { diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 987ba57f7719..ff1ac99b72e6 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -19,8 +19,9 @@ //! //! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator -use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; +use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, ListArray, PrimitiveArray, StructArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; +use arrow::compute::kernels; use arrow::datatypes::ArrowPrimitiveType; use datafusion_expr_common::groups_accumulator::EmitTo; @@ -117,6 +118,7 @@ impl NullState { }); } + /// Invokes `value_fn(group_index, value)` for each non null, non /// filtered value in `values`, while tracking which groups have /// seen null inputs and which groups have seen any inputs, for @@ -371,6 +373,7 @@ pub fn accumulate( } } + /// Accumulates with multiple accumulate(value) columns. (e.g. `corr(c1, c2)`) /// /// This method assumes that for any input record index, if any of the value column diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 3bb05a8e1ce6..1884ff5ec6a3 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -350,6 +350,7 @@ impl AggregateUDFImpl for ArrayAgg { } } +/// Note that this is order insensitive #[derive(Debug)] pub struct ArrayAggAccumulator { values: Vec, From 7106918aab12b7ee7ccff43d5bd70d2d899efd93 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 4 Oct 2025 07:31:10 +0200 Subject: [PATCH 03/17] feat: account mem --- .../src/aggregate/array_agg.rs | 285 ++++-------------- .../functions-aggregate/src/array_agg.rs | 123 +------- 2 files changed, 68 insertions(+), 340 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index ffc6be5bc451..d94496006fbb 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -18,30 +18,19 @@ //! Utilities for implementing GroupsAccumulator //! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] -use std::iter::repeat; -use std::mem::{size_of, size_of_val, take}; +use std::iter::repeat_n; use std::sync::Arc; -use arrow::array::{new_empty_array, Array, GenericListArray, ListArray, StructArray}; -use arrow::buffer::{OffsetBuffer, ScalarBuffer}; -use arrow::compute::kernels::{self, concat}; -use arrow::datatypes::{DataType, Field}; -use arrow::{ - array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}, - compute, - compute::take_arrays, - datatypes::UInt32Type, -}; -use datafusion_common::utils::SingleRowListArrayBuilder; -use datafusion_common::{ - arrow_datafusion_err, internal_datafusion_err, DataFusionError, Result, ScalarValue, -}; -use datafusion_expr_common::accumulator::Accumulator; +use arrow::array::{new_empty_array, Array, GenericListArray}; +use arrow::array::{ArrayRef, AsArray, BooleanArray}; +use arrow::buffer::OffsetBuffer; +use arrow::compute::kernels::{self}; +use arrow::datatypes::Field; +use datafusion_common::utils::proxy::VecAllocExt; +use datafusion_common::{internal_datafusion_err, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; -use crate::accumulator::AccumulatorArgs; -use crate::aggregate::groups_accumulator::accumulate::NullState; - +#[derive(Default)] pub struct AggGroupAccumulator { // [1,2,3] [4,5,6] stacked_batches: Vec, @@ -49,53 +38,41 @@ pub struct AggGroupAccumulator { // this is maintained to perform kernel::interleave stacked_group_indices: Vec>, - // similar to the previous two fields, but these for states merging - stacked_states: Vec, - stacked_states_group_indices: Vec>, - // TODO: document me - // for each group index, total accumulated length - stacked_states_group_length: Vec, - // merged_states: Vec< - ns: NullState, - - // factory: Box Result> + Send>, /// Current memory usage, in bytes. - /// - /// Note this is incrementally updated with deltas to avoid the - /// call to size() being a bottleneck. We saw size() being a - /// bottleneck in earlier implementations when there were many - /// distinct groups. allocation_bytes: usize, } impl AggGroupAccumulator { /// Create a new adapter that will create a new [`Accumulator`] /// for each group, using the specified factory function - pub fn new(f: F) -> Self - where - F: Fn() -> Result> + Send + 'static, - { + pub fn new() -> Self { Self { stacked_batches: vec![], stacked_group_indices: vec![], - stacked_states: vec![], - stacked_states_group_indices: vec![], - stacked_states_group_length: vec![], - ns: NullState::new(), allocation_bytes: 0, } } - fn consume_stacked_batches(&mut self) -> Result> { - let stacked = take(&mut self.stacked_batches); - let stack2 = stacked.iter().map(|arr| arr.as_ref()).collect::>(); - - let group_indices = take(&mut self.stacked_group_indices); - let offsets = group_indices.iter().map(|v| v.len()).scan(0, |state, len| { - *state += len; - Some(*state as i32) + fn consume_stacked_batches( + &mut self, + emit_to: EmitTo, + ) -> Result> { + let stacked_batches = self + .stacked_batches + .iter() + .map(|arr| arr.as_ref()) + .collect::>(); + + let group_indices = emit_to.take_needed(&mut self.stacked_group_indices); + let mut reduced_size = 0; + let lengths = group_indices.iter().map(|v| { + reduced_size += v.len() * size_of::() * 2; + v.len() }); - let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from_iter(offsets)); + let offsets_buffer = OffsetBuffer::from_lengths(lengths); + + self.allocation_bytes += reduced_size; + // group indices like [1,1,1,2,2,2] // backend_array like [a,b,c,d,e,f] // offsets should be: [0,3,6] @@ -103,7 +80,7 @@ impl AggGroupAccumulator { // backend_array is a flatten list of individual values before aggregation let backend_array = kernels::interleave::interleave( - &stack2, + &stacked_batches, group_indices .into_iter() .flatten() @@ -111,64 +88,28 @@ impl AggGroupAccumulator { .as_slice(), )?; let dt = backend_array.data_type(); - let field = Arc::new(Field::new_list_field(dt.clone(), false)); + let field = Arc::new(Field::new_list_field(dt.clone(), true)); let arr = GenericListArray::::new(field, offsets_buffer, backend_array, None); - return Ok(arr); - } - - fn consume_stacked_states(&mut self) -> Result> { - let stacked = take(&mut self.stacked_states); - let stacked2 = stacked.iter().map(|arr| arr.as_ref()).collect::>(); - - let group_indices = take(&mut self.stacked_states_group_indices); - let group_length = take(&mut self.stacked_states_group_length); - - let offsets: Vec = group_length - .iter() - .scan(0, |state, len| { - *state += len; - Some(*state) - }) - .collect(); - - let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from_iter(offsets)); - // group indices like [1,1,2,2] - // interleave result like [[a,b],[c,d],[e,f], [g]] - // backend array like [a,b,c,d,e,f,g] - // offsets should be: [0,4,7] - // then result should be [a,b,c,d], [e,f, g] - let list_arr = kernels::interleave::interleave( - &stacked2, - group_indices - .into_iter() - .flatten() - .collect::>() - .as_slice(), - )?; - let backend_array = list_arr.as_list::().values(); - let dt = backend_array.data_type(); - let field = Arc::new(Field::new_list_field(dt.clone(), false)); - - let arr = GenericListArray::::new( - field, - offsets_buffer, - backend_array.clone(), - None, - ); - return Ok(arr); + Ok(arr) } } impl GroupsAccumulator for AggGroupAccumulator { - // batch1 [1,4,5,6,7] - // batch2 [5,1,1,1,1] + // given the stacked_batch as: + // - batch1 [1,4,5,6,7] + // - batch2 [5,1,1,1,1] + // and group_indices as // indices g1: [(0,0), (1,1), (1,2) ...] // indices g2: [] // indices g3: [] // indices g4: [(0,1)] + // each tuple represents (batch_index, and offset within the batch index) + // for example + // - (0,0) means the 0th item inside batch1, which is `1` + // - (1,1) means the 1th item inside batch2, which is `1` fn update_batch( &mut self, values: &[ArrayRef], @@ -181,7 +122,7 @@ impl GroupsAccumulator for AggGroupAccumulator { } let singular_col = values - .get(0) + .first() .ok_or(internal_datafusion_err!("invalid agg input"))?; if self.stacked_group_indices.len() < total_num_groups { self.stacked_group_indices @@ -195,34 +136,14 @@ impl GroupsAccumulator for AggGroupAccumulator { self.stacked_group_indices[*group_index].push((batch_index, array_offset)); } + self.allocation_bytes += size_of::() * 2 * group_indices.len(); + Ok(()) } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - if matches!(emit_to, EmitTo::First(_)) { - return Err(internal_datafusion_err!("unimpl eimit to first")); - } - let arr = self.consume_stacked_batches()?; - return Ok(Arc::new(arr) as ArrayRef); - // only batch stacked no states interleaved - // if !self.stacked_batches.is_empty() - // && self.stacked_states_group_indices.is_empty() - // { - // let arr = self.consume_stacked_batches()?; - // return Ok(Arc::new(arr) as ArrayRef); - // } - // // only stacked states, no stacked batches interleave - // if self.stacked_batches.is_empty() - // && !self.stacked_states_group_indices.is_empty() - // { - // let arr = self.consume_stacked_states()?; - // return Ok(Arc::new(arr) as ArrayRef); - // } - // let stacked = take(&mut self.stacked_batches); - // let stacked_indices = take(&mut self.stacked_group_indices); - - // let stacked_states = take(&mut self.stacked_states); - // let staced_state_indices = take(&mut self.stacked_states_group_indices); + let arr = self.consume_stacked_batches(emit_to)?; + Ok(Arc::new(arr) as ArrayRef) } // filtered_null_mask(opt_filter, &values); @@ -240,45 +161,29 @@ impl GroupsAccumulator for AggGroupAccumulator { // TODO: all the reference to this function always result into this opt_filter as none assert!(opt_filter.is_none()); let singular_col = values - .get(0) + .first() .ok_or(internal_datafusion_err!("invalid agg input"))?; let list_arr = singular_col.as_list::(); let backed_arr = list_arr.values(); let flatten_group_index = group_indices .iter() .enumerate() - .map(|(row, group_index)| { + .flat_map(|(row, group_index)| { let row_length = list_arr.value_length(row); - repeat(*group_index).take(row_length as usize) + repeat_n(*group_index, row_length as usize) }) - .flatten() .collect::>(); - return self.update_batch( - &[backed_arr.clone()], + self.update_batch( + std::slice::from_ref(backed_arr), &flatten_group_index, None, total_num_groups, - ); - // if self.stacked_states.len() < total_num_groups { - // self.stacked_states_group_indices - // .resize(total_num_groups, Vec::new()); - // self.stacked_states_group_length.resize(total_num_groups, 0); - // } - - // let batch_index = self.stacked_states.len(); - // for (array_offset, group_index) in group_indices.iter().enumerate() { - // self.stacked_states_group_indices[*group_index] - // .push((batch_index, array_offset)); - // self.stacked_states_group_length[*group_index] += - // singular_col.as_list::().value_length(array_offset) - // } - - // self.stacked_states.push(Arc::clone(singular_col)); - Ok(()) + ) } fn size(&self) -> usize { - 1000 + // all batched array's underlying memory is borrowed, and thus not counted + self.stacked_batches.allocated_size() + self.allocation_bytes // self.allocation_bytes } @@ -288,8 +193,9 @@ impl GroupsAccumulator for AggGroupAccumulator { opt_filter: Option<&BooleanArray>, ) -> Result> { assert!(opt_filter.is_none()); + assert!(values.len() == 1); let col_array = values - .get(0) + .first() .ok_or(internal_datafusion_err!("invalid state for array agg"))?; let num_rows = col_array.len(); @@ -299,92 +205,19 @@ impl GroupsAccumulator for AggGroupAccumulator { } let dt = col_array.data_type(); - let offsets = OffsetBuffer::from_lengths(std::iter::repeat_n(1, num_rows)); - let field = Arc::new(Field::new_list_field(dt.clone(), false)); + let offsets = OffsetBuffer::from_lengths(repeat_n(1, num_rows)); + let field = Arc::new(Field::new_list_field(dt.clone(), true)); let arr = GenericListArray::::new( field, OffsetBuffer::new(offsets.into()), - col_array.clone(), + Arc::clone(col_array), None, ); - return Ok(vec![Arc::new(arr) as Arc]); + Ok(vec![Arc::new(arr) as Arc]) } fn supports_convert_to_state(&self) -> bool { true } } - -/// Extension trait for [`Vec`] to account for allocations. -pub trait VecAllocExt { - /// Item type. - type T; - /// Return the amount of memory allocated by this Vec (not - /// recursively counting any heap allocations contained within the - /// structure). Does not include the size of `self` - fn allocated_size(&self) -> usize; -} - -impl VecAllocExt for Vec { - type T = T; - fn allocated_size(&self) -> usize { - size_of::() * self.capacity() - } -} - -fn get_filter_at_indices( - opt_filter: Option<&BooleanArray>, - indices: &PrimitiveArray, -) -> Result> { - opt_filter - .map(|filter| { - compute::take( - &filter, indices, None, // None: no index check - ) - }) - .transpose() - .map_err(|e| arrow_datafusion_err!(e)) -} - -// Copied from physical-plan -pub(crate) fn slice_and_maybe_filter( - aggr_array: &[ArrayRef], - filter_opt: Option<&BooleanArray>, - offsets: &[usize], -) -> Result> { - let (offset, length) = (offsets[0], offsets[1] - offsets[0]); - let sliced_arrays: Vec = aggr_array - .iter() - .map(|array| array.slice(offset, length)) - .collect(); - - if let Some(f) = filter_opt { - let filter = f.slice(offset, length); - - sliced_arrays - .iter() - .map(|array| { - compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e)) - }) - .collect() - } else { - Ok(sliced_arrays) - } -} - -fn inner_datatype_from_list(dt: &DataType) -> DataType { - match dt { - DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { - f.data_type().clone() - } - _ => dt.clone(), - } -} - -fn try_unnest(a: &ScalarValue) -> Option> { - match a { - ScalarValue::List(l) => Some(l.clone()), - _ => None, - } -} diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 1884ff5ec6a3..cced1bdd4320 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -36,14 +36,13 @@ use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::{ - Accumulator, AggregateUDFImpl, Documentation, Signature, Sort, Volatility, + Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility, }; use datafusion_functions_aggregate_common::aggregate::array_agg::AggGroupAccumulator; use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays; use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; use datafusion_functions_aggregate_common::utils::ordering_fields; use datafusion_macros::user_doc; -use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; make_udaf_expr_and_func!( @@ -93,68 +92,7 @@ impl Default for ArrayAgg { } } -fn accumulator_independent( - is_input_pre_ordered: bool, - // acc_args: AccumulatorArgs, - data_type: DataType, - ignore_nulls: bool, - is_distinct: bool, - is_reversed: bool, - sort_options: Option, - lex_ordering: Option, - ordering_dtypes: Vec, -) -> Result> { - // let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; - // let ignore_nulls = - // acc_args.ignore_nulls && acc_args.exprs[0].nullable(acc_args.schema)?; - - if is_distinct { - // Limitation similar to Postgres. The aggregation function can only mix - // DISTINCT and ORDER BY if all the expressions in the ORDER BY appear - // also in the arguments of the function. This implies that if the - // aggregation function only accepts one argument, only one argument - // can be used in the ORDER BY, For example: - // - // ARRAY_AGG(DISTINCT col) - // - // can only be mixed with an ORDER BY if the order expression is "col". - // - // ARRAY_AGG(DISTINCT col ORDER BY col) <- Valid - // ARRAY_AGG(DISTINCT concat(col, '') ORDER BY concat(col, '')) <- Valid - // ARRAY_AGG(DISTINCT col ORDER BY other_col) <- Invalid - // ARRAY_AGG(DISTINCT col ORDER BY concat(col, '')) <- Invalid - // let sort_option = match acc_args.order_bys { - // [single] if single.expr.eq(&acc_args.exprs[0]) => Some(single.options), - // [] => None, - // _ => { - // return exec_err!( - // "In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list" - // ); - // } - // }; - return Ok(Box::new(DistinctArrayAggAccumulator::try_new( - &data_type, - sort_options, - ignore_nulls, - )?)); - } - let Some(ordering) = lex_ordering else { - return Ok(Box::new(ArrayAggAccumulator::try_new( - &data_type, - ignore_nulls, - )?)); - }; - - OrderSensitiveArrayAggAccumulator::try_new( - &data_type, - &ordering_dtypes, - ordering, - is_input_pre_ordered, - is_reversed, - ignore_nulls, - ) - .map(|acc| Box::new(acc) as _) -} + impl AggregateUDFImpl for ArrayAgg { fn as_any(&self) -> &dyn std::any::Any { @@ -164,61 +102,18 @@ impl AggregateUDFImpl for ArrayAgg { fn name(&self) -> &str { "array_agg" } - fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { - true + // use groups accumulator only when no order and no distinct required + // because current groups_acc impl produce undeterministic output + fn groups_accumulator_supported(&self, acc_args: AccumulatorArgs) -> bool { + let no_order_no_distinct = + acc_args.order_bys.is_empty() && (!acc_args.is_distinct); + no_order_no_distinct } fn create_groups_accumulator( &self, acc_args: AccumulatorArgs, ) -> Result> { - let is_distinct = acc_args.is_distinct; - let is_reversed = acc_args.is_reversed; - let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; - let ignore_nulls = - acc_args.ignore_nulls && acc_args.exprs[0].nullable(acc_args.schema)?; - - let sort_options = match acc_args.order_bys { - [single] if single.expr.eq(&acc_args.exprs[0]) => Some(single.options), - [] => None, - _ => { - if acc_args.is_distinct { - return exec_err!( - "In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list" - ); - } - None - } - }; - - let lex_ordering = LexOrdering::new(acc_args.order_bys.to_vec()); - let ordering_dtypes = match lex_ordering { - Some(ref ordering) => { - let ordering_dtypes = ordering - .iter() - .map(|e| e.expr.data_type(acc_args.schema)) - .collect::>>()?; - ordering_dtypes - } - None => vec![], - }; - - let is_input_pre_ordered = self.is_input_pre_ordered; - - let factory = { - move || { - accumulator_independent( - is_input_pre_ordered, - data_type.clone(), - ignore_nulls, - is_distinct, - is_reversed, - sort_options.clone(), - lex_ordering.clone(), - ordering_dtypes.clone(), - ) - } - }; - Ok(Box::new(AggGroupAccumulator::new(factory))) + Ok(Box::new(AggGroupAccumulator::new())) } fn signature(&self) -> &Signature { From f209d27c5335749d8c806717ffac56589b5e2b63 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 4 Oct 2025 09:02:55 +0200 Subject: [PATCH 04/17] fix: rm temp2 --- datafusion/sqllogictest/test_files/temp2.slt | 146 ------------------- 1 file changed, 146 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/temp2.slt diff --git a/datafusion/sqllogictest/test_files/temp2.slt b/datafusion/sqllogictest/test_files/temp2.slt deleted file mode 100644 index 523a4c5ec5e8..000000000000 --- a/datafusion/sqllogictest/test_files/temp2.slt +++ /dev/null @@ -1,146 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# make sure to a batch size smaller than row number of the table. -statement ok -set datafusion.execution.batch_size = 2; - -############# -## Subquery Tests -############# - - -############# -## Setup test data table -############# -# there tables for subquery - - -statement ok -CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES -(11, 'a', 1), -(22, 'a', 1), -(33, 'a', 1), -(33, 'a', 1), -(33, 'a', 1), -(33, 'a', 1), -(44, 'a', 1), -(44, 'a', 1), -(55, 'a', 1), -(44, 'a', 1), -(44, 'a', 1), -(44, 'a', 1), -(55, 'a', 1), -(55, 'a', 1), -(55, 'a', 1), -(55, 'a', 1), -(66, 'a', 1), -(66, 'a', 1), -(66, 'a', 1), -(66, 'a', 1), -(66, 'a', 1), -(33, 'a', 1), -(33, 'a', 1); - -statement ok -CREATE TABLE t2 AS VALUES -(11, 'z', struct(1,'hello',3)), -(22, 'y',NULL), -(11, 'x', struct(1,'hola',6)), -(22, 'w', NULL); - -# Prepare settings to skip partial aggregation from the beginning -statement ok -set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 0; - -statement ok -set datafusion.execution.skip_partial_aggregation_probe_ratio_threshold = 0.0; - -statement ok -set datafusion.execution.target_partitions = 2; - -statement ok -CREATE EXTERNAL TABLE aggregate_test_100 ( - c1 VARCHAR NOT NULL, - c2 TINYINT NOT NULL, - c3 SMALLINT NOT NULL, - c4 SMALLINT, - c5 INT, - c6 BIGINT NOT NULL, - c7 SMALLINT NOT NULL, - c8 INT NOT NULL, - c9 INT UNSIGNED NOT NULL, - c10 BIGINT UNSIGNED NOT NULL, - c11 FLOAT NOT NULL, - c12 DOUBLE NOT NULL, - c13 VARCHAR NOT NULL -) -STORED AS CSV -LOCATION '../../testing/data/csv/aggregate_test_100.csv' -OPTIONS ('format.has_header' 'true'); - - - - - -statement ok -set datafusion.sql_parser.dialect = 'Postgres'; - -statement ok -set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 10; - -statement ok -set datafusion.execution.skip_partial_aggregation_probe_ratio_threshold = 0.0; - -statement ok -set datafusion.execution.target_partitions = 2; - -statement ok -set datafusion.execution.batch_size = 2; - -# query IIR -# SELECT c2, approx_median(c5), approx_median(c11) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2; -# ---- -# 1 191655437 0.59926736 -# 2 -587831330 0.43230486 -# 3 240273900 0.40199697 -# 4 762932956 0.48515016 -# 5 593204320 0.5156586 - -query I? -SELECT t1_id, array_agg(t1_id) FROM t1 GROUP BY t1_id ORDER BY t1_id; ----- -1 191655437 -2 -587831330 -3 240273900 -4 762932956 -5 593204320 - - - - - -# query TT -# explain select t1.t1_int from t1 left join ( -# select * from t1 left join ( -# select count(*) cnt ,t1_int from t1 left join t2 -# on t2.t2_int is not distinct from t1_int group by t1_int -# ) temp on t1.t1_int is not distinct from temp.t1_int -# ) a on t1.t1_int=a.t1_int where a.cnt=0 -# --- - - From d072eefe15076ca9441b23f5d79888e6ce21844e Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 4 Oct 2025 11:12:25 +0200 Subject: [PATCH 05/17] fix: correct mem accounting --- .../src/aggregate/array_agg.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index d94496006fbb..f35ef031c3ec 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Utilities for implementing GroupsAccumulator -//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] +//! Dedicated implementation of `GroupsAccumulator` for `array_agg` use std::iter::repeat_n; use std::sync::Arc; @@ -129,6 +128,7 @@ impl GroupsAccumulator for AggGroupAccumulator { .resize(total_num_groups, Vec::new()); } // null value is handled + self.allocation_bytes += singular_col.get_array_memory_size(); self.stacked_batches.push(Arc::clone(singular_col)); let batch_index = self.stacked_batches.len() - 1; @@ -182,9 +182,14 @@ impl GroupsAccumulator for AggGroupAccumulator { } fn size(&self) -> usize { - // all batched array's underlying memory is borrowed, and thus not counted - self.stacked_batches.allocated_size() + self.allocation_bytes - // self.allocation_bytes + size_of_val(self) + + self.stacked_group_indices.capacity() * size_of::>() + + self + .stacked_group_indices + .iter() + .map(|v| v.capacity() * size_of::()) + .sum::() + + self.stacked_batches.capacity() * size_of::>() } fn convert_to_state( From c25ab7514756a04593b0f09ce1688e312c37f348 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 4 Oct 2025 11:33:53 +0200 Subject: [PATCH 06/17] fix: lint --- .../src/aggregate/array_agg.rs | 36 +++++++++---------- .../functions-aggregate/src/array_agg.rs | 11 +++--- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index f35ef031c3ec..eda4dd71e33e 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -23,9 +23,8 @@ use std::sync::Arc; use arrow::array::{new_empty_array, Array, GenericListArray}; use arrow::array::{ArrayRef, AsArray, BooleanArray}; use arrow::buffer::OffsetBuffer; -use arrow::compute::kernels::{self}; +use arrow::compute::kernels; use arrow::datatypes::Field; -use datafusion_common::utils::proxy::VecAllocExt; use datafusion_common::{internal_datafusion_err, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; @@ -36,9 +35,6 @@ pub struct AggGroupAccumulator { // address items of each group within the stacked_batches // this is maintained to perform kernel::interleave stacked_group_indices: Vec>, - - /// Current memory usage, in bytes. - allocation_bytes: usize, } impl AggGroupAccumulator { @@ -48,7 +44,6 @@ impl AggGroupAccumulator { Self { stacked_batches: vec![], stacked_group_indices: vec![], - allocation_bytes: 0, } } fn consume_stacked_batches( @@ -62,16 +57,10 @@ impl AggGroupAccumulator { .collect::>(); let group_indices = emit_to.take_needed(&mut self.stacked_group_indices); - let mut reduced_size = 0; - let lengths = group_indices.iter().map(|v| { - reduced_size += v.len() * size_of::() * 2; - v.len() - }); + let lengths = group_indices.iter().map(|v| v.len()); let offsets_buffer = OffsetBuffer::from_lengths(lengths); - self.allocation_bytes += reduced_size; - // group indices like [1,1,1,2,2,2] // backend_array like [a,b,c,d,e,f] // offsets should be: [0,3,6] @@ -127,16 +116,25 @@ impl GroupsAccumulator for AggGroupAccumulator { self.stacked_group_indices .resize(total_num_groups, Vec::new()); } - // null value is handled - self.allocation_bytes += singular_col.get_array_memory_size(); self.stacked_batches.push(Arc::clone(singular_col)); let batch_index = self.stacked_batches.len() - 1; - for (array_offset, group_index) in group_indices.iter().enumerate() { - self.stacked_group_indices[*group_index].push((batch_index, array_offset)); - } - self.allocation_bytes += size_of::() * 2 * group_indices.len(); + if let Some(filter) = opt_filter { + for (array_offset, (group_index, filter_value)) in + group_indices.iter().zip(filter.iter()).enumerate() + { + if let Some(true) = filter_value { + self.stacked_group_indices[*group_index] + .push((batch_index, array_offset)); + } + } + } else { + for (array_offset, group_index) in group_indices.iter().enumerate() { + self.stacked_group_indices[*group_index] + .push((batch_index, array_offset)); + } + } Ok(()) } diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index cced1bdd4320..af0edd25a147 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -92,8 +92,6 @@ impl Default for ArrayAgg { } } - - impl AggregateUDFImpl for ArrayAgg { fn as_any(&self) -> &dyn std::any::Any { self @@ -103,15 +101,14 @@ impl AggregateUDFImpl for ArrayAgg { "array_agg" } // use groups accumulator only when no order and no distinct required - // because current groups_acc impl produce undeterministic output + // because current groups_acc impl produce indeterministic output fn groups_accumulator_supported(&self, acc_args: AccumulatorArgs) -> bool { - let no_order_no_distinct = - acc_args.order_bys.is_empty() && (!acc_args.is_distinct); - no_order_no_distinct + acc_args.order_bys.is_empty() && (!acc_args.is_distinct) } + fn create_groups_accumulator( &self, - acc_args: AccumulatorArgs, + _acc_args: AccumulatorArgs, ) -> Result> { Ok(Box::new(AggGroupAccumulator::new())) } From ab74e83d34409e7d413418ef820ff8c358e9d7ed Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sat, 4 Oct 2025 11:49:18 +0200 Subject: [PATCH 07/17] fix: lint --- datafusion/functions-aggregate-common/src/aggregate.rs | 2 +- .../functions-aggregate-common/src/aggregate/array_agg.rs | 2 -- .../src/aggregate/groups_accumulator/accumulate.rs | 5 +---- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs index 92802a8e5228..09f71232cc37 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +pub mod array_agg; pub mod avg_distinct; pub mod count_distinct; pub mod groups_accumulator; pub mod sum_distinct; -pub mod array_agg; diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index eda4dd71e33e..45c98531a91a 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -38,8 +38,6 @@ pub struct AggGroupAccumulator { } impl AggGroupAccumulator { - /// Create a new adapter that will create a new [`Accumulator`] - /// for each group, using the specified factory function pub fn new() -> Self { Self { stacked_batches: vec![], diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index ff1ac99b72e6..987ba57f7719 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -19,9 +19,8 @@ //! //! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator -use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, ListArray, PrimitiveArray, StructArray}; +use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; -use arrow::compute::kernels; use arrow::datatypes::ArrowPrimitiveType; use datafusion_expr_common::groups_accumulator::EmitTo; @@ -118,7 +117,6 @@ impl NullState { }); } - /// Invokes `value_fn(group_index, value)` for each non null, non /// filtered value in `values`, while tracking which groups have /// seen null inputs and which groups have seen any inputs, for @@ -373,7 +371,6 @@ pub fn accumulate( } } - /// Accumulates with multiple accumulate(value) columns. (e.g. `corr(c1, c2)`) /// /// This method assumes that for any input record index, if any of the value column From 34c47f20db6a198c6922da4c772272b0e9606b49 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Sun, 5 Oct 2025 22:36:12 +0200 Subject: [PATCH 08/17] refactor: avoid vec of vec --- .../src/aggregate/array_agg.rs | 371 +++++++++++++++--- 1 file changed, 319 insertions(+), 52 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index 45c98531a91a..e51092a5dea1 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -18,23 +18,30 @@ //! Dedicated implementation of `GroupsAccumulator` for `array_agg` use std::iter::repeat_n; +use std::mem; use std::sync::Arc; use arrow::array::{new_empty_array, Array, GenericListArray}; use arrow::array::{ArrayRef, AsArray, BooleanArray}; use arrow::buffer::OffsetBuffer; use arrow::compute::kernels; -use arrow::datatypes::Field; +use arrow::datatypes::{ArrowNativeType, Field}; use datafusion_common::{internal_datafusion_err, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; -#[derive(Default)] +#[derive(Default, Clone)] pub struct AggGroupAccumulator { // [1,2,3] [4,5,6] stacked_batches: Vec, // address items of each group within the stacked_batches // this is maintained to perform kernel::interleave - stacked_group_indices: Vec>, + stacked_group_indices: Vec<( + /*group_number*/ usize, + /*array_number*/ usize, + /*offset_in_array*/ usize, + )>, + indice_sorted: bool, + max_group: usize, } impl AggGroupAccumulator { @@ -42,22 +49,74 @@ impl AggGroupAccumulator { Self { stacked_batches: vec![], stacked_group_indices: vec![], + indice_sorted: false, + max_group: 0, } } fn consume_stacked_batches( &mut self, emit_to: EmitTo, ) -> Result> { + // in the case of continous calls to function `evaluate` happen, + // (without any interleaving calls to `merge_batch` or `update_batch`) + // the first call will basically sort everything beforehand + // so the second one does not need to + if !self.indice_sorted { + self.indice_sorted = true; + self.stacked_group_indices.sort_by_key(|a| { + // TODO: array_agg with distinct and custom order can be implemented here + a.0 + }); + } + + let mut current_group = self.stacked_group_indices[0].0; + + // this is inclusive, zero-based + let stop_at_group = match emit_to { + EmitTo::All => self.max_group-1, + EmitTo::First(groups_taken) => groups_taken-1, + }; + let mut group_windows = + Vec::::with_capacity(self.max_group.min(stop_at_group) + 1); + group_windows.push(0); + let mut split_offset = None; + + // TODO: init with a good cap if possible via some stats during accumulation phase + let mut interleave_offsets = vec![]; + for (offset, (group_index, array_number, offset_in_array)) in + self.stacked_group_indices.iter().enumerate() + { + if *group_index > stop_at_group { + split_offset = Some(offset); + break; + } + if *group_index > current_group { + current_group = *group_index; + group_windows.push(offset as i32); + } + interleave_offsets.push((*array_number, *offset_in_array)); + } + if let Some(split_offset) = split_offset { + let mut tail_part = self.stacked_group_indices.split_off(split_offset); + mem::swap(&mut self.stacked_group_indices, &mut tail_part); + for item in self.stacked_group_indices.iter_mut() { + // shift down the number of group being taken + item.0 -= (stop_at_group+1) + } + + group_windows.push(split_offset as i32); + } else { + group_windows.push(self.stacked_group_indices.len() as i32); + mem::take(&mut self.stacked_group_indices); + }; + let stacked_batches = self .stacked_batches .iter() - .map(|arr| arr.as_ref()) + .map(|a| a.as_ref()) .collect::>(); - let group_indices = emit_to.take_needed(&mut self.stacked_group_indices); - let lengths = group_indices.iter().map(|v| v.len()); - - let offsets_buffer = OffsetBuffer::from_lengths(lengths); + let offsets_buffer = OffsetBuffer::new(group_windows.into()); // group indices like [1,1,1,2,2,2] // backend_array like [a,b,c,d,e,f] @@ -65,14 +124,8 @@ impl AggGroupAccumulator { // then result should be [a,b,c], [d,e,f] // backend_array is a flatten list of individual values before aggregation - let backend_array = kernels::interleave::interleave( - &stacked_batches, - group_indices - .into_iter() - .flatten() - .collect::>() - .as_slice(), - )?; + let backend_array = + kernels::interleave::interleave(&stacked_batches, &interleave_offsets)?; let dt = backend_array.data_type(); let field = Arc::new(Field::new_list_field(dt.clone(), true)); @@ -103,17 +156,9 @@ impl GroupsAccumulator for AggGroupAccumulator { opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { - if opt_filter.is_some() { - panic!("not implemented"); - } - let singular_col = values .first() .ok_or(internal_datafusion_err!("invalid agg input"))?; - if self.stacked_group_indices.len() < total_num_groups { - self.stacked_group_indices - .resize(total_num_groups, Vec::new()); - } self.stacked_batches.push(Arc::clone(singular_col)); let batch_index = self.stacked_batches.len() - 1; @@ -123,17 +168,24 @@ impl GroupsAccumulator for AggGroupAccumulator { group_indices.iter().zip(filter.iter()).enumerate() { if let Some(true) = filter_value { - self.stacked_group_indices[*group_index] - .push((batch_index, array_offset)); + self.stacked_group_indices.push(( + *group_index, + batch_index, + array_offset, + )); } } } else { for (array_offset, group_index) in group_indices.iter().enumerate() { - self.stacked_group_indices[*group_index] - .push((batch_index, array_offset)); + self.stacked_group_indices.push(( + *group_index, + batch_index, + array_offset, + )); } } - + self.indice_sorted = false; + self.max_group = self.max_group.max(total_num_groups); Ok(()) } @@ -151,40 +203,39 @@ impl GroupsAccumulator for AggGroupAccumulator { &mut self, values: &[ArrayRef], group_indices: &[usize], - opt_filter: Option<&BooleanArray>, + // for merge_batch which happens at final stage + // opt_filter will always be none + _opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { - // TODO: all the reference to this function always result into this opt_filter as none - assert!(opt_filter.is_none()); let singular_col = values .first() .ok_or(internal_datafusion_err!("invalid agg input"))?; let list_arr = singular_col.as_list::(); + let new_array_number = self.stacked_batches.len(); + // TODO: the backed_arr contains redundant data + // make sure that flatten_group_index has the same length with backed_arr + let flatten_group_index = + group_indices + .iter() + .enumerate() + .flat_map(|(row, group_index)| { + let end = list_arr.value_offsets()[row + 1].as_usize(); + let start = list_arr.value_offsets()[row].as_usize(); + (start..end).map(|offset| (*group_index, new_array_number, offset)) + }); + self.stacked_group_indices.extend(flatten_group_index); + let backed_arr = list_arr.values(); - let flatten_group_index = group_indices - .iter() - .enumerate() - .flat_map(|(row, group_index)| { - let row_length = list_arr.value_length(row); - repeat_n(*group_index, row_length as usize) - }) - .collect::>(); - self.update_batch( - std::slice::from_ref(backed_arr), - &flatten_group_index, - None, - total_num_groups, - ) + self.stacked_batches.push(Arc::clone(backed_arr)); + self.indice_sorted = false; + self.max_group = self.max_group.max(total_num_groups); + Ok(()) } fn size(&self) -> usize { size_of_val(self) - + self.stacked_group_indices.capacity() * size_of::>() - + self - .stacked_group_indices - .iter() - .map(|v| v.capacity() * size_of::()) - .sum::() + + self.stacked_group_indices.capacity() * size_of::<(usize, usize, usize)>() + self.stacked_batches.capacity() * size_of::>() } @@ -222,3 +273,219 @@ impl GroupsAccumulator for AggGroupAccumulator { true } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::{ + array::{Array, AsArray, BooleanArray, GenericListArray, ListArray, StringArray}, + buffer::{NullBuffer, OffsetBuffer}, + datatypes::{DataType, Field}, + }; + use datafusion_common::Result; + use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; + + use crate::aggregate::array_agg::AggGroupAccumulator; + + fn build_list_arr( + values: Vec>, + offsets: Vec, + nulls: Option>, + ) -> Arc { + let field = Arc::new(Field::new_list_field(DataType::Utf8, true)); + let backed_arr = Arc::new(StringArray::from_iter(values)) as Arc; + let nulls = match nulls { + Some(nulls) => Some(NullBuffer::from_iter(nulls.into_iter())), + None => None, + }; + let arr = GenericListArray::::new( + field, + OffsetBuffer::new(offsets.into()), + backed_arr, + nulls, + ); + return Arc::new(arr) as Arc; + } + + #[test] + fn test_agg_group_accumulator() -> Result<()> { + let mut acc = AggGroupAccumulator::new(); + // backed_arr: ["a","b","c", null, "d"] + // partial_state: ["b","c"],[null], ["d"]] + // group_indices: [2,0,1] + // total num_group = 3 + let partial_state = build_list_arr( + vec![Some("a"), Some("b"), Some("c"), None, Some("d")], + vec![1, 3, 4, 5], + Some(vec![true, true, true]), + ); + let group_indices = vec![2, 0, 1]; + + acc.merge_batch(&[Arc::clone(&partial_state)], &group_indices, None, 3); + assert_eq!( + &vec![ + (2, 0, 1), // b + (2, 0, 2), // c + (0, 0, 3), // null + (1, 0, 4), // d + ], + &acc.stacked_group_indices + ); + let backed_arr = partial_state.as_list::().values(); + assert_eq!(vec![Arc::clone(backed_arr)], acc.stacked_batches); + + // backed_arr: ["a","b","c", null, "d"] + // group_indices: [2,4,0,0,0] + // filter_opt as [true,true,false,true,false] + let opt_filter = Some(BooleanArray::from_iter(vec![ + Some(true), + Some(true), + None, + Some(true), + None, + ])); + let group_indices = vec![2, 4, 0, 0, 0]; + acc.update_batch( + &[Arc::clone(backed_arr)], + &group_indices, + opt_filter.as_ref(), + 5, + ); + + assert_eq!( + &vec![ + // from the prev merge_batch call + (2, 0, 1), // b + (2, 0, 2), // c + (0, 0, 3), // null + (1, 0, 4), // d + // from the update_batch call + (2, 1, 0), // a + (4, 1, 1), // b + (0, 1, 3), // null + ], + &acc.stacked_group_indices + ); + assert_eq!( + vec![Arc::clone(&backed_arr), Arc::clone(&backed_arr),], + acc.stacked_batches + ); + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::All)?; + + let expected_final_state = build_list_arr( + vec![ + // group0 + None, + None, + // group1 + Some("d"), + // group2 + Some("b"), + Some("c"), + Some("a"), + // group3 not exist + // group4 + Some("b"), + ], + vec![0, 2, 3, 6, 7], + None, + ); + + assert_eq!(vec![expected_final_state], final_state); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(1))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 + None, None, + ], + vec![0, 2], + None, + ); + + assert_eq!(vec![expected_final_state], final_state); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(2))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 + None, + None, + // group1 + Some("d"), + ], + vec![0, 2, 3], + None, + ); + + assert_eq!(vec![expected_final_state], final_state); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(3))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 + None, + None, + // group1 + Some("d"), + // group2 + Some("b"), + Some("c"), + Some("a"), + ], + vec![0, 2, 3, 6], + None, + ); + + assert_eq!(vec![expected_final_state], final_state); + + assert_eq!( + &vec![ + (1, 1, 1), // shift downward from (4,1,1) representing b + ], + &acc2.stacked_group_indices + ); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(4))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 + None, + None, + // group1 + Some("d"), + // group2 + Some("b"), + Some("c"), + Some("a"), + ], + vec![0, 2, 3, 6], + None, + ); + + assert_eq!(vec![expected_final_state], final_state); + assert_eq!( + &vec![ + (0, 1, 1), // shift downward from (4,1,1) representing b + ], + &acc2.stacked_group_indices + ); + } + Ok(()) + } +} From b9a68717e9abf5da58399ddfedc7c5823b0a310d Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Wed, 8 Oct 2025 20:17:00 +0200 Subject: [PATCH 09/17] fix: reduce record batch size in fuzz test --- ...spilling_fuzz_in_memory_constrained_env.rs | 4 ++-- .../src/aggregate/array_agg.rs | 20 ++++++++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index 6c1bd316cdd3..9fd75714f15f 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -468,7 +468,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_different_ #[tokio::test] async fn test_aggregate_with_high_cardinality_with_limited_memory_and_large_record_batch( ) -> Result<()> { - let record_batch_size = 8192; + let record_batch_size = 1024; let pool_size = 2 * MB as usize; let task_ctx = { let memory_pool = Arc::new(FairSpillPool::new(pool_size)); @@ -530,7 +530,7 @@ async fn run_test_aggregate_with_high_cardinality( futures::stream::iter((0..args.number_of_record_batches as u64).map( move |index| { let mut record_batch_memory_size = - get_size_of_record_batch_to_generate(index as usize); + get_size_of_record_batch_to_generate(index as usize); // 333 record_batch_memory_size = record_batch_memory_size .saturating_sub(size_of::() * record_batch_size as usize); diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index e51092a5dea1..5e1e5163d46a 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -40,6 +40,7 @@ pub struct AggGroupAccumulator { /*array_number*/ usize, /*offset_in_array*/ usize, )>, + stacked_batches_size: usize, indice_sorted: bool, max_group: usize, } @@ -48,6 +49,7 @@ impl AggGroupAccumulator { pub fn new() -> Self { Self { stacked_batches: vec![], + stacked_batches_size: 0, stacked_group_indices: vec![], indice_sorted: false, max_group: 0, @@ -73,8 +75,8 @@ impl AggGroupAccumulator { // this is inclusive, zero-based let stop_at_group = match emit_to { - EmitTo::All => self.max_group-1, - EmitTo::First(groups_taken) => groups_taken-1, + EmitTo::All => self.max_group - 1, + EmitTo::First(groups_taken) => groups_taken - 1, }; let mut group_windows = Vec::::with_capacity(self.max_group.min(stop_at_group) + 1); @@ -101,7 +103,7 @@ impl AggGroupAccumulator { mem::swap(&mut self.stacked_group_indices, &mut tail_part); for item in self.stacked_group_indices.iter_mut() { // shift down the number of group being taken - item.0 -= (stop_at_group+1) + item.0 -= stop_at_group + 1 } group_windows.push(split_offset as i32); @@ -131,6 +133,11 @@ impl AggGroupAccumulator { let arr = GenericListArray::::new(field, offsets_buffer, backend_array, None); + // Only when this happen, we know that the stacked_batches are no longer neeeded + if self.stacked_group_indices.len() == 0 { + mem::take(&mut self.stacked_batches); + self.stacked_batches_size = 0; + } Ok(arr) } } @@ -161,6 +168,7 @@ impl GroupsAccumulator for AggGroupAccumulator { .ok_or(internal_datafusion_err!("invalid agg input"))?; self.stacked_batches.push(Arc::clone(singular_col)); + self.stacked_batches_size += singular_col.get_array_memory_size(); let batch_index = self.stacked_batches.len() - 1; if let Some(filter) = opt_filter { @@ -228,15 +236,21 @@ impl GroupsAccumulator for AggGroupAccumulator { let backed_arr = list_arr.values(); self.stacked_batches.push(Arc::clone(backed_arr)); + self.stacked_batches_size += backed_arr.get_array_memory_size(); self.indice_sorted = false; self.max_group = self.max_group.max(total_num_groups); Ok(()) } fn size(&self) -> usize { + let val = size_of_val(self) + + self.stacked_group_indices.capacity() * size_of::<(usize, usize, usize)>() + + self.stacked_batches.capacity() * size_of::>() + + self.stacked_batches_size; size_of_val(self) + self.stacked_group_indices.capacity() * size_of::<(usize, usize, usize)>() + self.stacked_batches.capacity() * size_of::>() + + self.stacked_batches_size } fn convert_to_state( From b2f420af49978cc9f8680235ac344534c8a95a88 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Wed, 8 Oct 2025 20:53:50 +0200 Subject: [PATCH 10/17] fix: lint --- .../functions-aggregate-common/src/aggregate/array_agg.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index 5e1e5163d46a..b59f4c829924 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -134,7 +134,7 @@ impl AggGroupAccumulator { let arr = GenericListArray::::new(field, offsets_buffer, backend_array, None); // Only when this happen, we know that the stacked_batches are no longer neeeded - if self.stacked_group_indices.len() == 0 { + if self.stacked_group_indices.is_empty() { mem::take(&mut self.stacked_batches); self.stacked_batches_size = 0; } @@ -243,10 +243,6 @@ impl GroupsAccumulator for AggGroupAccumulator { } fn size(&self) -> usize { - let val = size_of_val(self) - + self.stacked_group_indices.capacity() * size_of::<(usize, usize, usize)>() - + self.stacked_batches.capacity() * size_of::>() - + self.stacked_batches_size; size_of_val(self) + self.stacked_group_indices.capacity() * size_of::<(usize, usize, usize)>() + self.stacked_batches.capacity() * size_of::>() From f0353a7e4d1f5f0c8d230eadebac0ded18c6a243 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Wed, 8 Oct 2025 20:58:09 +0200 Subject: [PATCH 11/17] fix: typo --- .../functions-aggregate-common/src/aggregate/array_agg.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index b59f4c829924..ec893f602808 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -59,7 +59,7 @@ impl AggGroupAccumulator { &mut self, emit_to: EmitTo, ) -> Result> { - // in the case of continous calls to function `evaluate` happen, + // in the case of continuous calls to function `evaluate` happen, // (without any interleaving calls to `merge_batch` or `update_batch`) // the first call will basically sort everything beforehand // so the second one does not need to @@ -133,7 +133,7 @@ impl AggGroupAccumulator { let arr = GenericListArray::::new(field, offsets_buffer, backend_array, None); - // Only when this happen, we know that the stacked_batches are no longer neeeded + // Only when this happen, we know that the stacked_batches are no longer needed if self.stacked_group_indices.is_empty() { mem::take(&mut self.stacked_batches); self.stacked_batches_size = 0; From 8a5719761c9f0941a27b5a390f41ddb7fe880e74 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Mon, 27 Oct 2025 14:19:44 +0100 Subject: [PATCH 12/17] fix: failure if filter option is applied --- .../src/aggregate/array_agg.rs | 268 +++++++++++++----- .../functions-aggregate/src/array_agg.rs | 18 +- .../sqllogictest/test_files/aggregate.slt | 47 +++ 3 files changed, 257 insertions(+), 76 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index ec893f602808..828b561aca37 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -18,20 +18,24 @@ //! Dedicated implementation of `GroupsAccumulator` for `array_agg` use std::iter::repeat_n; +use std::marker::PhantomData; use std::mem; use std::sync::Arc; -use arrow::array::{new_empty_array, Array, GenericListArray}; +use arrow::array::{new_empty_array, Array, GenericListArray, OffsetSizeTrait}; use arrow::array::{ArrayRef, AsArray, BooleanArray}; use arrow::buffer::OffsetBuffer; use arrow::compute::kernels; -use arrow::datatypes::{ArrowNativeType, Field}; +use arrow::datatypes::{ArrowNativeType, Field, FieldRef}; use datafusion_common::{internal_datafusion_err, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; -#[derive(Default, Clone)] -pub struct AggGroupAccumulator { - // [1,2,3] [4,5,6] +#[derive(Clone)] +pub struct AggGroupAccumulator { + _virtual: PhantomData, + + inner_field: FieldRef, + // invoke of update_batch and [1,2,3] [4,5,6] stacked_batches: Vec, // address items of each group within the stacked_batches // this is maintained to perform kernel::interleave @@ -42,23 +46,49 @@ pub struct AggGroupAccumulator { )>, stacked_batches_size: usize, indice_sorted: bool, - max_group: usize, + // max group seen so far, 1 based offset + // after the call to `evaluate` this needs to be offseted by the number of + // group consumed + // zero means there is no state accumulated + max_seen_group: usize, + groups_consumed: usize, } -impl AggGroupAccumulator { - pub fn new() -> Self { +impl AggGroupAccumulator { + pub fn new(inner_field: FieldRef) -> Self { Self { + _virtual: PhantomData, + inner_field, stacked_batches: vec![], stacked_batches_size: 0, stacked_group_indices: vec![], indice_sorted: false, - max_group: 0, + max_seen_group: 0, + groups_consumed: 0, } } fn consume_stacked_batches( &mut self, emit_to: EmitTo, - ) -> Result> { + ) -> Result> { + // this is inclusive, zero-based + let stop_at_group = match emit_to { + EmitTo::All => self.max_seen_group - self.groups_consumed - 1, + EmitTo::First(groups_taken) => groups_taken - 1, + }; + // this can still happen, if all the groups have not been consumed + // but internally they are filtered out (by filter option) + // because stacked_group_indices only contains valid entry + if self.stacked_group_indices.is_empty() { + let offsets = OffsetBuffer::from_lengths(vec![0; stop_at_group + 1]); + return Ok(GenericListArray::::new( + self.inner_field.clone(), + offsets, + new_empty_array(self.inner_field.data_type()), + None, + )); + } + // in the case of continuous calls to function `evaluate` happen, // (without any interleaving calls to `merge_batch` or `update_batch`) // the first call will basically sort everything beforehand @@ -71,33 +101,36 @@ impl AggGroupAccumulator { }); } - let mut current_group = self.stacked_group_indices[0].0; + let mut current_group = 0; - // this is inclusive, zero-based - let stop_at_group = match emit_to { - EmitTo::All => self.max_group - 1, - EmitTo::First(groups_taken) => groups_taken - 1, - }; - let mut group_windows = - Vec::::with_capacity(self.max_group.min(stop_at_group) + 1); - group_windows.push(0); + let mut group_windows = Vec::::with_capacity(stop_at_group + 1); + group_windows.push(T::zero()); let mut split_offset = None; + self.groups_consumed += stop_at_group + 1; // TODO: init with a good cap if possible via some stats during accumulation phase let mut interleave_offsets = vec![]; for (offset, (group_index, array_number, offset_in_array)) in self.stacked_group_indices.iter().enumerate() { + // stop consuming from this offset if *group_index > stop_at_group { split_offset = Some(offset); break; } if *group_index > current_group { + // there can be interleaving empty group indices + // i.e if the indices are like [0 1 1 4] + // then the group_windows should be [0 1 3 3 3 4] + group_windows.push(T::usize_as(offset)); + for _ in current_group + 1..*group_index { + group_windows.push(T::usize_as(offset)); + } current_group = *group_index; - group_windows.push(offset as i32); } interleave_offsets.push((*array_number, *offset_in_array)); } + if let Some(split_offset) = split_offset { let mut tail_part = self.stacked_group_indices.split_off(split_offset); mem::swap(&mut self.stacked_group_indices, &mut tail_part); @@ -105,10 +138,35 @@ impl AggGroupAccumulator { // shift down the number of group being taken item.0 -= stop_at_group + 1 } - - group_windows.push(split_offset as i32); + // i.e given this offset arrays + // [1 1 1 2 7] + // and if stop_at_group = 5 the loop will break + // at current_group = 2 + // we have to backfill the group_windows with + // 5 items + // [0 3 4 4 4 4] + for _ in current_group..=stop_at_group { + group_windows.push(T::usize_as(split_offset)); + } } else { - group_windows.push(self.stacked_group_indices.len() as i32); + let end_offset = T::usize_as(self.stacked_group_indices.len()); + for _ in current_group..=stop_at_group { + group_windows.push(end_offset); + } + + if self.stacked_group_indices.len() > 11 { + println!("debug first items {:?}", &self.stacked_group_indices[..10]); + println!("debug first group windows {:?}", &group_windows[..10]); + println!( + "debug last items {:?}", + &self.stacked_group_indices[self.stacked_group_indices.len() - 10..] + ); + println!( + "debug group windows {:?}", + &group_windows[group_windows.len() - 10..] + ); + println!("group windows len {}", group_windows.len()); + } mem::take(&mut self.stacked_group_indices); }; @@ -122,7 +180,7 @@ impl AggGroupAccumulator { // group indices like [1,1,1,2,2,2] // backend_array like [a,b,c,d,e,f] - // offsets should be: [0,3,6] + // offsets is like: [0,3,6] // then result should be [a,b,c], [d,e,f] // backend_array is a flatten list of individual values before aggregation @@ -131,8 +189,7 @@ impl AggGroupAccumulator { let dt = backend_array.data_type(); let field = Arc::new(Field::new_list_field(dt.clone(), true)); - let arr = - GenericListArray::::new(field, offsets_buffer, backend_array, None); + let arr = GenericListArray::::new(field, offsets_buffer, backend_array, None); // Only when this happen, we know that the stacked_batches are no longer needed if self.stacked_group_indices.is_empty() { mem::take(&mut self.stacked_batches); @@ -142,7 +199,7 @@ impl AggGroupAccumulator { } } -impl GroupsAccumulator for AggGroupAccumulator { +impl GroupsAccumulator for AggGroupAccumulator { // given the stacked_batch as: // - batch1 [1,4,5,6,7] // - batch2 [5,1,1,1,1] @@ -170,7 +227,6 @@ impl GroupsAccumulator for AggGroupAccumulator { self.stacked_batches.push(Arc::clone(singular_col)); self.stacked_batches_size += singular_col.get_array_memory_size(); let batch_index = self.stacked_batches.len() - 1; - if let Some(filter) = opt_filter { for (array_offset, (group_index, filter_value)) in group_indices.iter().zip(filter.iter()).enumerate() @@ -193,18 +249,35 @@ impl GroupsAccumulator for AggGroupAccumulator { } } self.indice_sorted = false; - self.max_group = self.max_group.max(total_num_groups); + self.max_seen_group = total_num_groups; Ok(()) } fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let emit_to_str = match emit_to { + EmitTo::All => "all".to_string(), + EmitTo::First(n) => format!("first {n}"), + }; let arr = self.consume_stacked_batches(emit_to)?; + println!("finalize {} {}", arr.len(), emit_to_str); Ok(Arc::new(arr) as ArrayRef) } // filtered_null_mask(opt_filter, &values); fn state(&mut self, emit_to: EmitTo) -> Result> { - Ok(vec![self.evaluate(emit_to)?]) + let emit_to_str = match emit_to { + EmitTo::All => "all".to_string(), + EmitTo::First(n) => format!("first {n}"), + }; + let arr = self.consume_stacked_batches(emit_to)?; + println!( + "evaluate state {} {} {} {}", + arr.len(), + emit_to_str, + self.max_seen_group, + self.groups_consumed + ); + Ok(vec![Arc::new(arr) as ArrayRef]) } fn merge_batch( @@ -229,6 +302,7 @@ impl GroupsAccumulator for AggGroupAccumulator { .enumerate() .flat_map(|(row, group_index)| { let end = list_arr.value_offsets()[row + 1].as_usize(); + assert!(!list_arr.is_null(row)); let start = list_arr.value_offsets()[row].as_usize(); (start..end).map(|offset| (*group_index, new_array_number, offset)) }); @@ -238,7 +312,7 @@ impl GroupsAccumulator for AggGroupAccumulator { self.stacked_batches.push(Arc::clone(backed_arr)); self.stacked_batches_size += backed_arr.get_array_memory_size(); self.indice_sorted = false; - self.max_group = self.max_group.max(total_num_groups); + self.max_seen_group = total_num_groups; Ok(()) } @@ -289,7 +363,7 @@ mod tests { use std::sync::Arc; use arrow::{ - array::{Array, AsArray, BooleanArray, GenericListArray, ListArray, StringArray}, + array::{Array, AsArray, BooleanArray, GenericListArray, StringArray}, buffer::{NullBuffer, OffsetBuffer}, datatypes::{DataType, Field}, }; @@ -305,39 +379,40 @@ mod tests { ) -> Arc { let field = Arc::new(Field::new_list_field(DataType::Utf8, true)); let backed_arr = Arc::new(StringArray::from_iter(values)) as Arc; - let nulls = match nulls { - Some(nulls) => Some(NullBuffer::from_iter(nulls.into_iter())), - None => None, - }; + let arr = GenericListArray::::new( field, OffsetBuffer::new(offsets.into()), backed_arr, - nulls, + nulls.map(NullBuffer::from_iter), ); - return Arc::new(arr) as Arc; + Arc::new(arr) as Arc } #[test] fn test_agg_group_accumulator() -> Result<()> { - let mut acc = AggGroupAccumulator::new(); // backed_arr: ["a","b","c", null, "d"] // partial_state: ["b","c"],[null], ["d"]] - // group_indices: [2,0,1] + // group_indices: [2,1,1] // total num_group = 3 let partial_state = build_list_arr( vec![Some("a"), Some("b"), Some("c"), None, Some("d")], vec![1, 3, 4, 5], Some(vec![true, true, true]), ); - let group_indices = vec![2, 0, 1]; - - acc.merge_batch(&[Arc::clone(&partial_state)], &group_indices, None, 3); + let group_indices = vec![2, 1, 1]; + + let mut acc = AggGroupAccumulator::::new(Arc::new(Field::new( + "item", + DataType::Utf8, + true, + ))); + acc.merge_batch(&[Arc::clone(&partial_state)], &group_indices, None, 3)?; assert_eq!( &vec![ (2, 0, 1), // b (2, 0, 2), // c - (0, 0, 3), // null + (1, 0, 3), // null (1, 0, 4), // d ], &acc.stacked_group_indices @@ -346,8 +421,9 @@ mod tests { assert_eq!(vec![Arc::clone(backed_arr)], acc.stacked_batches); // backed_arr: ["a","b","c", null, "d"] - // group_indices: [2,4,0,0,0] - // filter_opt as [true,true,false,true,false] + // group_indices: [2,4,1,1,1] + // filter_opt as [true,true,false,true,false] meaning group 1 and 3 will result + // into empty array let opt_filter = Some(BooleanArray::from_iter(vec![ Some(true), Some(true), @@ -355,30 +431,32 @@ mod tests { Some(true), None, ])); - let group_indices = vec![2, 4, 0, 0, 0]; + let group_indices = vec![2, 5, 1, 1, 1]; acc.update_batch( &[Arc::clone(backed_arr)], &group_indices, opt_filter.as_ref(), - 5, - ); - + 6, + )?; + assert_eq!(6, acc.max_seen_group); assert_eq!( &vec![ // from the prev merge_batch call (2, 0, 1), // b (2, 0, 2), // c - (0, 0, 3), // null + (1, 0, 3), // null (1, 0, 4), // d // from the update_batch call (2, 1, 0), // a - (4, 1, 1), // b - (0, 1, 3), // null + (5, 1, 1), // b + // (1, 1, 2) c but filtered out + (1, 1, 3), // null + // (1, 1, 4) // d but filterd out ], &acc.stacked_group_indices ); assert_eq!( - vec![Arc::clone(&backed_arr), Arc::clone(&backed_arr),], + vec![Arc::clone(backed_arr), Arc::clone(backed_arr),], acc.stacked_batches ); { @@ -387,24 +465,25 @@ mod tests { let expected_final_state = build_list_arr( vec![ - // group0 - None, - None, + // group0 is empty // group1 + None, Some("d"), + None, // group2 Some("b"), Some("c"), Some("a"), - // group3 not exist - // group4 + // group3,group4 is empty + // group5 Some("b"), ], - vec![0, 2, 3, 6, 7], + vec![0, 0, 3, 6, 6, 6, 7], None, ); assert_eq!(vec![expected_final_state], final_state); + assert_eq!(6, acc2.groups_consumed); } { let mut acc2 = acc.clone(); @@ -413,13 +492,14 @@ mod tests { let expected_final_state = build_list_arr( vec![ // group0 - None, None, ], - vec![0, 2], + vec![0, 0], None, ); assert_eq!(vec![expected_final_state], final_state); + assert_eq!(6, acc2.max_seen_group); + assert_eq!(1, acc2.groups_consumed); } { let mut acc2 = acc.clone(); @@ -427,17 +507,19 @@ mod tests { let expected_final_state = build_list_arr( vec![ - // group0 - None, - None, + // group0 is empty // group1 + None, Some("d"), + None, ], - vec![0, 2, 3], + vec![0, 0, 3], None, ); assert_eq!(vec![expected_final_state], final_state); + assert_eq!(6, acc2.max_seen_group); + assert_eq!(2, acc2.groups_consumed); } { let mut acc2 = acc.clone(); @@ -445,25 +527,28 @@ mod tests { let expected_final_state = build_list_arr( vec![ - // group0 - None, - None, + // group0 is empty // group1 + None, Some("d"), + None, // group2 Some("b"), Some("c"), Some("a"), ], - vec![0, 2, 3, 6], + vec![0, 0, 3, 6], None, ); + + assert_eq!(6, acc2.max_seen_group); + assert_eq!(3, acc2.groups_consumed); assert_eq!(vec![expected_final_state], final_state); assert_eq!( &vec![ - (1, 1, 1), // shift downward from (4,1,1) representing b + (2, 1, 1), // shift downward from (5,1,1) representing b ], &acc2.stacked_group_indices ); @@ -474,24 +559,59 @@ mod tests { let expected_final_state = build_list_arr( vec![ - // group0 + // group0 is empty + // group1 None, + Some("d"), None, + // group2 + Some("b"), + Some("c"), + Some("a"), + // group3 is empty + ], + vec![0, 0, 3, 6, 6], + None, + ); + + assert_eq!(6, acc2.max_seen_group); + assert_eq!(4, acc2.groups_consumed); + assert_eq!(vec![expected_final_state], final_state); + assert_eq!( + &vec![ + (1, 1, 1), // shift downward from (5,1,1) representing b + ], + &acc2.stacked_group_indices + ); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(5))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 is empty // group1 + None, Some("d"), + None, // group2 Some("b"), Some("c"), Some("a"), + // group3,group4 is empty ], - vec![0, 2, 3, 6], + vec![0, 0, 3, 6, 6, 6], None, ); + + assert_eq!(6, acc2.max_seen_group); + assert_eq!(5, acc2.groups_consumed); assert_eq!(vec![expected_final_state], final_state); assert_eq!( &vec![ - (0, 1, 1), // shift downward from (4,1,1) representing b + (0, 1, 1), // shift downward from (5,1,1) representing b ], &acc2.stacked_group_indices ); diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index af0edd25a147..4e321c6cf466 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -108,9 +108,23 @@ impl AggregateUDFImpl for ArrayAgg { fn create_groups_accumulator( &self, - _acc_args: AccumulatorArgs, + acc_args: AccumulatorArgs, ) -> Result> { - Ok(Box::new(AggGroupAccumulator::new())) + match acc_args.return_field.data_type() { + DataType::List(field) => { + return Ok(Box::new(AggGroupAccumulator::::new( + field.clone(), + ))); + } + DataType::LargeList(field) => { + return Ok(Box::new(AggGroupAccumulator::::new( + field.clone(), + ))); + } + _ => { + return internal_err!("expects list field"); + } + }; } fn signature(&self) -> &Signature { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 9d6c7b11add6..430743dea56e 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7595,3 +7595,50 @@ NULL NULL NULL NULL statement ok drop table distinct_avg; + + +statement ok +CREATE TABLE test_array_agg_table AS +SELECT + arrow_cast(r, 'UInt32') AS c1, + arrow_cast( + case when r % 2 == 0 then r/2 + else r + end + , 'UInt64') AS c2 +FROM range(1, 10001) AS t(r); + +# TODO: run explain and ensure partial aggregation is enabled + + + + +query I? +select c2, array_agg(c1) filter (where c1%2 = 0) from test_array_agg_table group by c2 order by c2 asc limit 10; +---- +1 [2] +2 [4] +3 [6] +4 [8] +5 [10] +6 [12] +7 [14] +8 [16] +9 [18] +10 [20] + + + +query I?? +select c2, array_agg(c1),array_agg(c1) filter (where c1%2 = 0) from ( + select * from test_array_agg_table order by c2 asc limit 10 +) group by c2 order by c2; +---- +1 [2, 1] [2] +2 [4] [4] +3 [6, 3] [6] +4 [8] [8] +5 [10, 5] [10] +6 [12] [12] +7 [7] [] + From 5d7a78f1f2910eca0d924f2fa161998ccb3322b9 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Mon, 27 Oct 2025 14:31:21 +0100 Subject: [PATCH 13/17] fix: lint --- .../src/aggregate/array_agg.rs | 4 +--- datafusion/functions-aggregate/src/array_agg.rs | 14 ++++++-------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index 828b561aca37..0527c150cba5 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -82,7 +82,7 @@ impl AggGroupAccumulator { if self.stacked_group_indices.is_empty() { let offsets = OffsetBuffer::from_lengths(vec![0; stop_at_group + 1]); return Ok(GenericListArray::::new( - self.inner_field.clone(), + Arc::clone(&self.inner_field), offsets, new_empty_array(self.inner_field.data_type()), None, @@ -541,7 +541,6 @@ mod tests { None, ); - assert_eq!(6, acc2.max_seen_group); assert_eq!(3, acc2.groups_consumed); assert_eq!(vec![expected_final_state], final_state); @@ -605,7 +604,6 @@ mod tests { None, ); - assert_eq!(6, acc2.max_seen_group); assert_eq!(5, acc2.groups_consumed); assert_eq!(vec![expected_final_state], final_state); diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 4e321c6cf466..ca302b61aad2 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -112,19 +112,17 @@ impl AggregateUDFImpl for ArrayAgg { ) -> Result> { match acc_args.return_field.data_type() { DataType::List(field) => { - return Ok(Box::new(AggGroupAccumulator::::new( - field.clone(), - ))); + Ok(Box::new(AggGroupAccumulator::::new(Arc::clone(field))) + as Box) } DataType::LargeList(field) => { - return Ok(Box::new(AggGroupAccumulator::::new( - field.clone(), - ))); + Ok(Box::new(AggGroupAccumulator::::new(Arc::clone(field))) + as Box) } _ => { - return internal_err!("expects list field"); + internal_err!("expects list field") } - }; + } } fn signature(&self) -> &Signature { From 982e51b50c6c5ff672b826837e0eb74bf837142b Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Mon, 27 Oct 2025 14:44:39 +0100 Subject: [PATCH 14/17] test: add explain test --- .../sqllogictest/test_files/aggregate.slt | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 430743dea56e..4be79fd4c85e 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7608,7 +7608,22 @@ SELECT , 'UInt64') AS c2 FROM range(1, 10001) AS t(r); -# TODO: run explain and ensure partial aggregation is enabled +# This test is to ensure that partial aggregation is enabled +# for the query +query TT +explain select c2, array_agg(c1) filter (where c1%2 = 0) from test_array_agg_table group by c2 order by c2 asc limit 10; +logical_plan +01)Sort: test_array_agg_table.c2 ASC NULLS LAST, fetch=10 +02)--Aggregate: groupBy=[[test_array_agg_table.c2]], aggr=[[array_agg(test_array_agg_table.c1) FILTER (WHERE CAST(test_array_agg_table.c1 AS Int64) % Int64(2) = Int64(0)) AS array_agg(test_array_agg_table.c1) FILTER (WHERE test_array_agg_table.c1 % Int64(2) = Int64(0))]] +03)----TableScan: test_array_agg_table projection=[c1, c2] +physical_plan +01)SortPreservingMergeExec: [c2@0 ASC NULLS LAST], fetch=10 +02)--SortExec: TopK(fetch=10), expr=[c2@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[array_agg(test_array_agg_table.c1) FILTER (WHERE test_array_agg_table.c1 % Int64(2) = Int64(0))] +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4 +06)----------AggregateExec: mode=Partial, gby=[c2@1 as c2], aggr=[array_agg(test_array_agg_table.c1) FILTER (WHERE test_array_agg_table.c1 % Int64(2) = Int64(0))] +07)------------DataSourceExec: partitions=4, partition_sizes=[1, 1, 0, 0] From 35726a4e33014db2add00eb256ce93b6bad97ef7 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Tue, 28 Oct 2025 13:16:58 +0100 Subject: [PATCH 15/17] chore: rm debug --- .../src/aggregate/array_agg.rs | 29 ------------------- 1 file changed, 29 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index 0527c150cba5..039455b76ce9 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -154,19 +154,6 @@ impl AggGroupAccumulator { group_windows.push(end_offset); } - if self.stacked_group_indices.len() > 11 { - println!("debug first items {:?}", &self.stacked_group_indices[..10]); - println!("debug first group windows {:?}", &group_windows[..10]); - println!( - "debug last items {:?}", - &self.stacked_group_indices[self.stacked_group_indices.len() - 10..] - ); - println!( - "debug group windows {:?}", - &group_windows[group_windows.len() - 10..] - ); - println!("group windows len {}", group_windows.len()); - } mem::take(&mut self.stacked_group_indices); }; @@ -254,29 +241,13 @@ impl GroupsAccumulator for AggGroupAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let emit_to_str = match emit_to { - EmitTo::All => "all".to_string(), - EmitTo::First(n) => format!("first {n}"), - }; let arr = self.consume_stacked_batches(emit_to)?; - println!("finalize {} {}", arr.len(), emit_to_str); Ok(Arc::new(arr) as ArrayRef) } // filtered_null_mask(opt_filter, &values); fn state(&mut self, emit_to: EmitTo) -> Result> { - let emit_to_str = match emit_to { - EmitTo::All => "all".to_string(), - EmitTo::First(n) => format!("first {n}"), - }; let arr = self.consume_stacked_batches(emit_to)?; - println!( - "evaluate state {} {} {} {}", - arr.len(), - emit_to_str, - self.max_seen_group, - self.groups_consumed - ); Ok(vec![Arc::new(arr) as ArrayRef]) } From 79607638514aa20e1b484e9fa62adc17bc0d64d1 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Tue, 28 Oct 2025 14:55:59 +0100 Subject: [PATCH 16/17] fix: correctly reset length --- .../src/aggregate/array_agg.rs | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index 039455b76ce9..6f19108528c0 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -30,6 +30,7 @@ use arrow::datatypes::{ArrowNativeType, Field, FieldRef}; use datafusion_common::{internal_datafusion_err, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; + #[derive(Clone)] pub struct AggGroupAccumulator { _virtual: PhantomData, @@ -50,8 +51,7 @@ pub struct AggGroupAccumulator { // after the call to `evaluate` this needs to be offseted by the number of // group consumed // zero means there is no state accumulated - max_seen_group: usize, - groups_consumed: usize, + total_groups_in_state: usize, } impl AggGroupAccumulator { @@ -63,8 +63,7 @@ impl AggGroupAccumulator { stacked_batches_size: 0, stacked_group_indices: vec![], indice_sorted: false, - max_seen_group: 0, - groups_consumed: 0, + total_groups_in_state: 0, } } fn consume_stacked_batches( @@ -73,7 +72,7 @@ impl AggGroupAccumulator { ) -> Result> { // this is inclusive, zero-based let stop_at_group = match emit_to { - EmitTo::All => self.max_seen_group - self.groups_consumed - 1, + EmitTo::All => self.total_groups_in_state - 1, EmitTo::First(groups_taken) => groups_taken - 1, }; // this can still happen, if all the groups have not been consumed @@ -106,7 +105,7 @@ impl AggGroupAccumulator { let mut group_windows = Vec::::with_capacity(stop_at_group + 1); group_windows.push(T::zero()); let mut split_offset = None; - self.groups_consumed += stop_at_group + 1; + self.total_groups_in_state -= stop_at_group + 1; // TODO: init with a good cap if possible via some stats during accumulation phase let mut interleave_offsets = vec![]; @@ -236,7 +235,7 @@ impl GroupsAccumulator for AggGroupAccumulator { } } self.indice_sorted = false; - self.max_seen_group = total_num_groups; + self.total_groups_in_state = total_num_groups; Ok(()) } @@ -283,7 +282,7 @@ impl GroupsAccumulator for AggGroupAccumulator { self.stacked_batches.push(Arc::clone(backed_arr)); self.stacked_batches_size += backed_arr.get_array_memory_size(); self.indice_sorted = false; - self.max_seen_group = total_num_groups; + self.total_groups_in_state = total_num_groups; Ok(()) } @@ -409,7 +408,7 @@ mod tests { opt_filter.as_ref(), 6, )?; - assert_eq!(6, acc.max_seen_group); + assert_eq!(6, acc.total_groups_in_state); assert_eq!( &vec![ // from the prev merge_batch call @@ -454,7 +453,7 @@ mod tests { ); assert_eq!(vec![expected_final_state], final_state); - assert_eq!(6, acc2.groups_consumed); + assert_eq!(0, acc2.total_groups_in_state); } { let mut acc2 = acc.clone(); @@ -469,8 +468,7 @@ mod tests { ); assert_eq!(vec![expected_final_state], final_state); - assert_eq!(6, acc2.max_seen_group); - assert_eq!(1, acc2.groups_consumed); + assert_eq!(5, acc2.total_groups_in_state); } { let mut acc2 = acc.clone(); @@ -489,8 +487,7 @@ mod tests { ); assert_eq!(vec![expected_final_state], final_state); - assert_eq!(6, acc2.max_seen_group); - assert_eq!(2, acc2.groups_consumed); + assert_eq!(4, acc2.total_groups_in_state); } { let mut acc2 = acc.clone(); @@ -512,8 +509,7 @@ mod tests { None, ); - assert_eq!(6, acc2.max_seen_group); - assert_eq!(3, acc2.groups_consumed); + assert_eq!(3, acc2.total_groups_in_state); assert_eq!(vec![expected_final_state], final_state); assert_eq!( @@ -544,8 +540,7 @@ mod tests { None, ); - assert_eq!(6, acc2.max_seen_group); - assert_eq!(4, acc2.groups_consumed); + assert_eq!(2, acc2.total_groups_in_state); assert_eq!(vec![expected_final_state], final_state); assert_eq!( &vec![ @@ -575,8 +570,7 @@ mod tests { None, ); - assert_eq!(6, acc2.max_seen_group); - assert_eq!(5, acc2.groups_consumed); + assert_eq!(1, acc2.total_groups_in_state); assert_eq!(vec![expected_final_state], final_state); assert_eq!( &vec![ From b837fc38451dc19338b193c0371a8edc714ffe95 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai Date: Tue, 28 Oct 2025 19:10:28 +0100 Subject: [PATCH 17/17] fix: sqllogictest --- .../functions-aggregate-common/src/aggregate/array_agg.rs | 5 ++--- datafusion/sqllogictest/test_files/aggregate.slt | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs index 6f19108528c0..e8e49cdfa082 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -30,7 +30,6 @@ use arrow::datatypes::{ArrowNativeType, Field, FieldRef}; use datafusion_common::{internal_datafusion_err, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; - #[derive(Clone)] pub struct AggGroupAccumulator { _virtual: PhantomData, @@ -48,7 +47,7 @@ pub struct AggGroupAccumulator { stacked_batches_size: usize, indice_sorted: bool, // max group seen so far, 1 based offset - // after the call to `evaluate` this needs to be offseted by the number of + // after the call to `evaluate` this needs to be offsetted by the number of // group consumed // zero means there is no state accumulated total_groups_in_state: usize, @@ -421,7 +420,7 @@ mod tests { (5, 1, 1), // b // (1, 1, 2) c but filtered out (1, 1, 3), // null - // (1, 1, 4) // d but filterd out + // (1, 1, 4) d but filtered out ], &acc.stacked_group_indices ); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 4be79fd4c85e..558ada49fe70 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7612,6 +7612,7 @@ FROM range(1, 10001) AS t(r); # for the query query TT explain select c2, array_agg(c1) filter (where c1%2 = 0) from test_array_agg_table group by c2 order by c2 asc limit 10; +---- logical_plan 01)Sort: test_array_agg_table.c2 ASC NULLS LAST, fetch=10 02)--Aggregate: groupBy=[[test_array_agg_table.c2]], aggr=[[array_agg(test_array_agg_table.c1) FILTER (WHERE CAST(test_array_agg_table.c1 AS Int64) % Int64(2) = Int64(0)) AS array_agg(test_array_agg_table.c1) FILTER (WHERE test_array_agg_table.c1 % Int64(2) = Int64(0))]]