diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index 6c1bd316cdd3..9fd75714f15f 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -468,7 +468,7 @@ async fn test_aggregate_with_high_cardinality_with_limited_memory_and_different_ #[tokio::test] async fn test_aggregate_with_high_cardinality_with_limited_memory_and_large_record_batch( ) -> Result<()> { - let record_batch_size = 8192; + let record_batch_size = 1024; let pool_size = 2 * MB as usize; let task_ctx = { let memory_pool = Arc::new(FairSpillPool::new(pool_size)); @@ -530,7 +530,7 @@ async fn run_test_aggregate_with_high_cardinality( futures::stream::iter((0..args.number_of_record_batches as u64).map( move |index| { let mut record_batch_memory_size = - get_size_of_record_batch_to_generate(index as usize); + get_size_of_record_batch_to_generate(index as usize); // 333 record_batch_memory_size = record_batch_memory_size .saturating_sub(size_of::() * record_batch_size as usize); diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs index aadce907e7cc..09f71232cc37 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod array_agg; pub mod avg_distinct; pub mod count_distinct; pub mod groups_accumulator; diff --git a/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs new file mode 100644 index 000000000000..e8e49cdfa082 --- /dev/null +++ b/datafusion/functions-aggregate-common/src/aggregate/array_agg.rs @@ -0,0 +1,583 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Dedicated implementation of `GroupsAccumulator` for `array_agg` + +use std::iter::repeat_n; +use std::marker::PhantomData; +use std::mem; +use std::sync::Arc; + +use arrow::array::{new_empty_array, Array, GenericListArray, OffsetSizeTrait}; +use arrow::array::{ArrayRef, AsArray, BooleanArray}; +use arrow::buffer::OffsetBuffer; +use arrow::compute::kernels; +use arrow::datatypes::{ArrowNativeType, Field, FieldRef}; +use datafusion_common::{internal_datafusion_err, Result}; +use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; + +#[derive(Clone)] +pub struct AggGroupAccumulator { + _virtual: PhantomData, + + inner_field: FieldRef, + // invoke of update_batch and [1,2,3] [4,5,6] + stacked_batches: Vec, + // address items of each group within the stacked_batches + // this is maintained to perform kernel::interleave + stacked_group_indices: Vec<( + /*group_number*/ usize, + /*array_number*/ usize, + /*offset_in_array*/ usize, + )>, + stacked_batches_size: usize, + indice_sorted: bool, + // max group seen so far, 1 based offset + // after the call to `evaluate` this needs to be offsetted by the number of + // group consumed + // zero means there is no state accumulated + total_groups_in_state: usize, +} + +impl AggGroupAccumulator { + pub fn new(inner_field: FieldRef) -> Self { + Self { + _virtual: PhantomData, + inner_field, + stacked_batches: vec![], + stacked_batches_size: 0, + stacked_group_indices: vec![], + indice_sorted: false, + total_groups_in_state: 0, + } + } + fn consume_stacked_batches( + &mut self, + emit_to: EmitTo, + ) -> Result> { + // this is inclusive, zero-based + let stop_at_group = match emit_to { + EmitTo::All => self.total_groups_in_state - 1, + EmitTo::First(groups_taken) => groups_taken - 1, + }; + // this can still happen, if all the groups have not been consumed + // but internally they are filtered out (by filter option) + // because stacked_group_indices only contains valid entry + if self.stacked_group_indices.is_empty() { + let offsets = OffsetBuffer::from_lengths(vec![0; stop_at_group + 1]); + return Ok(GenericListArray::::new( + Arc::clone(&self.inner_field), + offsets, + new_empty_array(self.inner_field.data_type()), + None, + )); + } + + // in the case of continuous calls to function `evaluate` happen, + // (without any interleaving calls to `merge_batch` or `update_batch`) + // the first call will basically sort everything beforehand + // so the second one does not need to + if !self.indice_sorted { + self.indice_sorted = true; + self.stacked_group_indices.sort_by_key(|a| { + // TODO: array_agg with distinct and custom order can be implemented here + a.0 + }); + } + + let mut current_group = 0; + + let mut group_windows = Vec::::with_capacity(stop_at_group + 1); + group_windows.push(T::zero()); + let mut split_offset = None; + self.total_groups_in_state -= stop_at_group + 1; + + // TODO: init with a good cap if possible via some stats during accumulation phase + let mut interleave_offsets = vec![]; + for (offset, (group_index, array_number, offset_in_array)) in + self.stacked_group_indices.iter().enumerate() + { + // stop consuming from this offset + if *group_index > stop_at_group { + split_offset = Some(offset); + break; + } + if *group_index > current_group { + // there can be interleaving empty group indices + // i.e if the indices are like [0 1 1 4] + // then the group_windows should be [0 1 3 3 3 4] + group_windows.push(T::usize_as(offset)); + for _ in current_group + 1..*group_index { + group_windows.push(T::usize_as(offset)); + } + current_group = *group_index; + } + interleave_offsets.push((*array_number, *offset_in_array)); + } + + if let Some(split_offset) = split_offset { + let mut tail_part = self.stacked_group_indices.split_off(split_offset); + mem::swap(&mut self.stacked_group_indices, &mut tail_part); + for item in self.stacked_group_indices.iter_mut() { + // shift down the number of group being taken + item.0 -= stop_at_group + 1 + } + // i.e given this offset arrays + // [1 1 1 2 7] + // and if stop_at_group = 5 the loop will break + // at current_group = 2 + // we have to backfill the group_windows with + // 5 items + // [0 3 4 4 4 4] + for _ in current_group..=stop_at_group { + group_windows.push(T::usize_as(split_offset)); + } + } else { + let end_offset = T::usize_as(self.stacked_group_indices.len()); + for _ in current_group..=stop_at_group { + group_windows.push(end_offset); + } + + mem::take(&mut self.stacked_group_indices); + }; + + let stacked_batches = self + .stacked_batches + .iter() + .map(|a| a.as_ref()) + .collect::>(); + + let offsets_buffer = OffsetBuffer::new(group_windows.into()); + + // group indices like [1,1,1,2,2,2] + // backend_array like [a,b,c,d,e,f] + // offsets is like: [0,3,6] + // then result should be [a,b,c], [d,e,f] + + // backend_array is a flatten list of individual values before aggregation + let backend_array = + kernels::interleave::interleave(&stacked_batches, &interleave_offsets)?; + let dt = backend_array.data_type(); + let field = Arc::new(Field::new_list_field(dt.clone(), true)); + + let arr = GenericListArray::::new(field, offsets_buffer, backend_array, None); + // Only when this happen, we know that the stacked_batches are no longer needed + if self.stacked_group_indices.is_empty() { + mem::take(&mut self.stacked_batches); + self.stacked_batches_size = 0; + } + Ok(arr) + } +} + +impl GroupsAccumulator for AggGroupAccumulator { + // given the stacked_batch as: + // - batch1 [1,4,5,6,7] + // - batch2 [5,1,1,1,1] + + // and group_indices as + // indices g1: [(0,0), (1,1), (1,2) ...] + // indices g2: [] + // indices g3: [] + // indices g4: [(0,1)] + // each tuple represents (batch_index, and offset within the batch index) + // for example + // - (0,0) means the 0th item inside batch1, which is `1` + // - (1,1) means the 1th item inside batch2, which is `1` + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + let singular_col = values + .first() + .ok_or(internal_datafusion_err!("invalid agg input"))?; + + self.stacked_batches.push(Arc::clone(singular_col)); + self.stacked_batches_size += singular_col.get_array_memory_size(); + let batch_index = self.stacked_batches.len() - 1; + if let Some(filter) = opt_filter { + for (array_offset, (group_index, filter_value)) in + group_indices.iter().zip(filter.iter()).enumerate() + { + if let Some(true) = filter_value { + self.stacked_group_indices.push(( + *group_index, + batch_index, + array_offset, + )); + } + } + } else { + for (array_offset, group_index) in group_indices.iter().enumerate() { + self.stacked_group_indices.push(( + *group_index, + batch_index, + array_offset, + )); + } + } + self.indice_sorted = false; + self.total_groups_in_state = total_num_groups; + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let arr = self.consume_stacked_batches(emit_to)?; + Ok(Arc::new(arr) as ArrayRef) + } + + // filtered_null_mask(opt_filter, &values); + fn state(&mut self, emit_to: EmitTo) -> Result> { + let arr = self.consume_stacked_batches(emit_to)?; + Ok(vec![Arc::new(arr) as ArrayRef]) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + // for merge_batch which happens at final stage + // opt_filter will always be none + _opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + let singular_col = values + .first() + .ok_or(internal_datafusion_err!("invalid agg input"))?; + let list_arr = singular_col.as_list::(); + let new_array_number = self.stacked_batches.len(); + // TODO: the backed_arr contains redundant data + // make sure that flatten_group_index has the same length with backed_arr + let flatten_group_index = + group_indices + .iter() + .enumerate() + .flat_map(|(row, group_index)| { + let end = list_arr.value_offsets()[row + 1].as_usize(); + assert!(!list_arr.is_null(row)); + let start = list_arr.value_offsets()[row].as_usize(); + (start..end).map(|offset| (*group_index, new_array_number, offset)) + }); + self.stacked_group_indices.extend(flatten_group_index); + + let backed_arr = list_arr.values(); + self.stacked_batches.push(Arc::clone(backed_arr)); + self.stacked_batches_size += backed_arr.get_array_memory_size(); + self.indice_sorted = false; + self.total_groups_in_state = total_num_groups; + Ok(()) + } + + fn size(&self) -> usize { + size_of_val(self) + + self.stacked_group_indices.capacity() * size_of::<(usize, usize, usize)>() + + self.stacked_batches.capacity() * size_of::>() + + self.stacked_batches_size + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + assert!(opt_filter.is_none()); + assert!(values.len() == 1); + let col_array = values + .first() + .ok_or(internal_datafusion_err!("invalid state for array agg"))?; + + let num_rows = col_array.len(); + // If there are no rows, return empty arrays + if num_rows == 0 { + return Ok(vec![new_empty_array(col_array.data_type())]); + } + let dt = col_array.data_type(); + + let offsets = OffsetBuffer::from_lengths(repeat_n(1, num_rows)); + let field = Arc::new(Field::new_list_field(dt.clone(), true)); + + let arr = GenericListArray::::new( + field, + OffsetBuffer::new(offsets.into()), + Arc::clone(col_array), + None, + ); + Ok(vec![Arc::new(arr) as Arc]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::{ + array::{Array, AsArray, BooleanArray, GenericListArray, StringArray}, + buffer::{NullBuffer, OffsetBuffer}, + datatypes::{DataType, Field}, + }; + use datafusion_common::Result; + use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; + + use crate::aggregate::array_agg::AggGroupAccumulator; + + fn build_list_arr( + values: Vec>, + offsets: Vec, + nulls: Option>, + ) -> Arc { + let field = Arc::new(Field::new_list_field(DataType::Utf8, true)); + let backed_arr = Arc::new(StringArray::from_iter(values)) as Arc; + + let arr = GenericListArray::::new( + field, + OffsetBuffer::new(offsets.into()), + backed_arr, + nulls.map(NullBuffer::from_iter), + ); + Arc::new(arr) as Arc + } + + #[test] + fn test_agg_group_accumulator() -> Result<()> { + // backed_arr: ["a","b","c", null, "d"] + // partial_state: ["b","c"],[null], ["d"]] + // group_indices: [2,1,1] + // total num_group = 3 + let partial_state = build_list_arr( + vec![Some("a"), Some("b"), Some("c"), None, Some("d")], + vec![1, 3, 4, 5], + Some(vec![true, true, true]), + ); + let group_indices = vec![2, 1, 1]; + + let mut acc = AggGroupAccumulator::::new(Arc::new(Field::new( + "item", + DataType::Utf8, + true, + ))); + acc.merge_batch(&[Arc::clone(&partial_state)], &group_indices, None, 3)?; + assert_eq!( + &vec![ + (2, 0, 1), // b + (2, 0, 2), // c + (1, 0, 3), // null + (1, 0, 4), // d + ], + &acc.stacked_group_indices + ); + let backed_arr = partial_state.as_list::().values(); + assert_eq!(vec![Arc::clone(backed_arr)], acc.stacked_batches); + + // backed_arr: ["a","b","c", null, "d"] + // group_indices: [2,4,1,1,1] + // filter_opt as [true,true,false,true,false] meaning group 1 and 3 will result + // into empty array + let opt_filter = Some(BooleanArray::from_iter(vec![ + Some(true), + Some(true), + None, + Some(true), + None, + ])); + let group_indices = vec![2, 5, 1, 1, 1]; + acc.update_batch( + &[Arc::clone(backed_arr)], + &group_indices, + opt_filter.as_ref(), + 6, + )?; + assert_eq!(6, acc.total_groups_in_state); + assert_eq!( + &vec![ + // from the prev merge_batch call + (2, 0, 1), // b + (2, 0, 2), // c + (1, 0, 3), // null + (1, 0, 4), // d + // from the update_batch call + (2, 1, 0), // a + (5, 1, 1), // b + // (1, 1, 2) c but filtered out + (1, 1, 3), // null + // (1, 1, 4) d but filtered out + ], + &acc.stacked_group_indices + ); + assert_eq!( + vec![Arc::clone(backed_arr), Arc::clone(backed_arr),], + acc.stacked_batches + ); + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::All)?; + + let expected_final_state = build_list_arr( + vec![ + // group0 is empty + // group1 + None, + Some("d"), + None, + // group2 + Some("b"), + Some("c"), + Some("a"), + // group3,group4 is empty + // group5 + Some("b"), + ], + vec![0, 0, 3, 6, 6, 6, 7], + None, + ); + + assert_eq!(vec![expected_final_state], final_state); + assert_eq!(0, acc2.total_groups_in_state); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(1))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 + ], + vec![0, 0], + None, + ); + + assert_eq!(vec![expected_final_state], final_state); + assert_eq!(5, acc2.total_groups_in_state); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(2))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 is empty + // group1 + None, + Some("d"), + None, + ], + vec![0, 0, 3], + None, + ); + + assert_eq!(vec![expected_final_state], final_state); + assert_eq!(4, acc2.total_groups_in_state); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(3))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 is empty + // group1 + None, + Some("d"), + None, + // group2 + Some("b"), + Some("c"), + Some("a"), + ], + vec![0, 0, 3, 6], + None, + ); + + assert_eq!(3, acc2.total_groups_in_state); + assert_eq!(vec![expected_final_state], final_state); + + assert_eq!( + &vec![ + (2, 1, 1), // shift downward from (5,1,1) representing b + ], + &acc2.stacked_group_indices + ); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(4))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 is empty + // group1 + None, + Some("d"), + None, + // group2 + Some("b"), + Some("c"), + Some("a"), + // group3 is empty + ], + vec![0, 0, 3, 6, 6], + None, + ); + + assert_eq!(2, acc2.total_groups_in_state); + assert_eq!(vec![expected_final_state], final_state); + assert_eq!( + &vec![ + (1, 1, 1), // shift downward from (5,1,1) representing b + ], + &acc2.stacked_group_indices + ); + } + { + let mut acc2 = acc.clone(); + let final_state = acc2.state(EmitTo::First(5))?; + + let expected_final_state = build_list_arr( + vec![ + // group0 is empty + // group1 + None, + Some("d"), + None, + // group2 + Some("b"), + Some("c"), + Some("a"), + // group3,group4 is empty + ], + vec![0, 0, 3, 6, 6, 6], + None, + ); + + assert_eq!(1, acc2.total_groups_in_state); + assert_eq!(vec![expected_final_state], final_state); + assert_eq!( + &vec![ + (0, 1, 1), // shift downward from (5,1,1) representing b + ], + &acc2.stacked_group_indices + ); + } + Ok(()) + } +} diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index b830588d404b..e03319f1a4dd 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -38,6 +38,7 @@ use datafusion_expr::utils::format_state_name; use datafusion_expr::{ Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility, }; +use datafusion_functions_aggregate_common::aggregate::array_agg::AggGroupAccumulator; use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays; use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; use datafusion_functions_aggregate_common::utils::ordering_fields; @@ -99,6 +100,30 @@ impl AggregateUDFImpl for ArrayAgg { fn name(&self) -> &str { "array_agg" } + // use groups accumulator only when no order and no distinct required + // because current groups_acc impl produce indeterministic output + fn groups_accumulator_supported(&self, acc_args: AccumulatorArgs) -> bool { + acc_args.order_bys.is_empty() && (!acc_args.is_distinct) + } + + fn create_groups_accumulator( + &self, + acc_args: AccumulatorArgs, + ) -> Result> { + match acc_args.return_field.data_type() { + DataType::List(field) => { + Ok(Box::new(AggGroupAccumulator::::new(Arc::clone(field))) + as Box) + } + DataType::LargeList(field) => { + Ok(Box::new(AggGroupAccumulator::::new(Arc::clone(field))) + as Box) + } + _ => { + internal_err!("expects list field") + } + } + } fn signature(&self) -> &Signature { &self.signature @@ -229,6 +254,7 @@ impl AggregateUDFImpl for ArrayAgg { } } +/// Note that this is order insensitive #[derive(Debug)] pub struct ArrayAggAccumulator { values: Vec, diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index a5973afc0a93..5d4f2871c5e0 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7931,3 +7931,66 @@ NULL NULL NULL NULL statement ok drop table distinct_avg; + + +statement ok +CREATE TABLE test_array_agg_table AS +SELECT + arrow_cast(r, 'UInt32') AS c1, + arrow_cast( + case when r % 2 == 0 then r/2 + else r + end + , 'UInt64') AS c2 +FROM range(1, 10001) AS t(r); + +# This test is to ensure that partial aggregation is enabled +# for the query +query TT +explain select c2, array_agg(c1) filter (where c1%2 = 0) from test_array_agg_table group by c2 order by c2 asc limit 10; +---- +logical_plan +01)Sort: test_array_agg_table.c2 ASC NULLS LAST, fetch=10 +02)--Aggregate: groupBy=[[test_array_agg_table.c2]], aggr=[[array_agg(test_array_agg_table.c1) FILTER (WHERE CAST(test_array_agg_table.c1 AS Int64) % Int64(2) = Int64(0)) AS array_agg(test_array_agg_table.c1) FILTER (WHERE test_array_agg_table.c1 % Int64(2) = Int64(0))]] +03)----TableScan: test_array_agg_table projection=[c1, c2] +physical_plan +01)SortPreservingMergeExec: [c2@0 ASC NULLS LAST], fetch=10 +02)--SortExec: TopK(fetch=10), expr=[c2@0 ASC NULLS LAST], preserve_partitioning=[true] +03)----AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[array_agg(test_array_agg_table.c1) FILTER (WHERE test_array_agg_table.c1 % Int64(2) = Int64(0))] +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4 +06)----------AggregateExec: mode=Partial, gby=[c2@1 as c2], aggr=[array_agg(test_array_agg_table.c1) FILTER (WHERE test_array_agg_table.c1 % Int64(2) = Int64(0))] +07)------------DataSourceExec: partitions=4, partition_sizes=[1, 1, 0, 0] + + + + +query I? +select c2, array_agg(c1) filter (where c1%2 = 0) from test_array_agg_table group by c2 order by c2 asc limit 10; +---- +1 [2] +2 [4] +3 [6] +4 [8] +5 [10] +6 [12] +7 [14] +8 [16] +9 [18] +10 [20] + + + +query I?? +select c2, array_agg(c1),array_agg(c1) filter (where c1%2 = 0) from ( + select * from test_array_agg_table order by c2 asc limit 10 +) group by c2 order by c2; +---- +1 [2, 1] [2] +2 [4] [4] +3 [6, 3] [6] +4 [8] [8] +5 [10, 5] [10] +6 [12] [12] +7 [7] [] +