From aa7199e1aab401c816e8089d4a4dab79e6e04855 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Tue, 19 Dec 2023 22:18:32 +0300 Subject: [PATCH 1/6] DistinctCountGroupsAccumulator --- .../src/aggregate/count_distinct.rs | 258 +++++++++++++++++- .../src/aggregate/sum_distinct.rs | 22 +- .../physical-expr/src/aggregate/utils.rs | 20 +- 3 files changed, 272 insertions(+), 28 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index c2fd32a96c4f..ae940a471345 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -15,21 +15,30 @@ // specific language governing permissions and limitations // under the License. -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, TimeUnit}; +use arrow_array::types::{ + ArrowPrimitiveType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, + Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, +}; +use arrow_array::{Int64Array, ListArray}; +use itertools::Itertools; use std::any::Any; use std::fmt::Debug; use std::sync::Arc; use ahash::RandomState; -use arrow::array::{Array, ArrayRef}; -use std::collections::HashSet; +use arrow::array::{Array, ArrayRef, AsArray}; +use hashbrown::HashSet; -use crate::aggregate::utils::down_cast_any_ref; +use crate::aggregate::utils::{down_cast_any_ref, Hashable}; use crate::expressions::format_state_name; -use crate::{AggregateExpr, PhysicalExpr}; -use datafusion_common::Result; -use datafusion_common::ScalarValue; +use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; +use datafusion_common::cast::as_list_array; +use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; type DistinctScalarValues = ScalarValue; @@ -60,6 +69,12 @@ impl DistinctCount { } } +macro_rules! distinct_count_groups_accumulator { + ($SELF:expr, $TYPE:ident) => {{ + Ok(Box::new(DistinctCountGroupsAccumulator::<$TYPE>::new())) + }}; +} + impl AggregateExpr for DistinctCount { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -82,6 +97,28 @@ impl AggregateExpr for DistinctCount { vec![self.expr.clone()] } + fn groups_accumulator_supported(&self) -> bool { + use DataType::*; + matches!( + self.state_data_type, + Int8 | Int16 + | Int32 + | Int64 + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Float32 + | Float64 + | Decimal128(_, _) + | Decimal256(_, _) + | Date32 + | Date64 + | Time32(_) + | Time64(_) // | Timestamp(_, _) + ) + } + fn create_accumulator(&self) -> Result> { Ok(Box::new(DistinctCountAccumulator { values: HashSet::default(), @@ -89,6 +126,61 @@ impl AggregateExpr for DistinctCount { })) } + fn create_groups_accumulator(&self) -> Result> { + use DataType::*; + use TimeUnit::*; + + match self.state_data_type { + Int8 => distinct_count_groups_accumulator!(self, Int8Type), + Int16 => distinct_count_groups_accumulator!(self, Int16Type), + Int32 => distinct_count_groups_accumulator!(self, Int32Type), + Int64 => distinct_count_groups_accumulator!(self, Int64Type), + UInt8 => distinct_count_groups_accumulator!(self, UInt8Type), + UInt16 => distinct_count_groups_accumulator!(self, UInt16Type), + UInt32 => distinct_count_groups_accumulator!(self, UInt32Type), + UInt64 => distinct_count_groups_accumulator!(self, UInt64Type), + Float32 => distinct_count_groups_accumulator!(self, Float32Type), + Float64 => distinct_count_groups_accumulator!(self, Float64Type), + Decimal128(_, _) => { + distinct_count_groups_accumulator!(self, Decimal128Type) + } + Decimal256(_, _) => { + distinct_count_groups_accumulator!(self, Decimal256Type) + } + Date32 => distinct_count_groups_accumulator!(self, Date32Type), + Date64 => distinct_count_groups_accumulator!(self, Date64Type), + Time32(Millisecond) => { + distinct_count_groups_accumulator!(self, Time32MillisecondType) + } + Time32(Second) => { + distinct_count_groups_accumulator!(self, Time32SecondType) + } + Time64(Microsecond) => { + distinct_count_groups_accumulator!(self, Time64MicrosecondType) + } + Time64(Nanosecond) => { + distinct_count_groups_accumulator!(self, Time64NanosecondType) + } + Timestamp(Microsecond, _) => { + distinct_count_groups_accumulator!(self, TimestampMicrosecondType) + } + Timestamp(Millisecond, _) => { + distinct_count_groups_accumulator!(self, TimestampMillisecondType) + } + Timestamp(Nanosecond, _) => { + distinct_count_groups_accumulator!(self, TimestampNanosecondType) + } + Timestamp(Second, _) => { + distinct_count_groups_accumulator!(self, TimestampSecondType) + } + + _ => internal_err!( + "DistinctCountGroupsAccumulator is not supported for {}", + self.state_data_type + ), + } + } + fn name(&self) -> &str { &self.name } @@ -192,6 +284,158 @@ impl Accumulator for DistinctCountAccumulator { } } +struct DistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send, +{ + /// Vector for storing unique values sets for each group index + unique_values: Vec>>, +} + +impl DistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send, +{ + fn new() -> Self { + Self { + unique_values: vec![], + } + } +} + +impl GroupsAccumulator for DistinctCountGroupsAccumulator +where + T: ArrowPrimitiveType + Send, +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow_array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + let values = values[0].as_primitive::(); + + self.unique_values.resize(total_num_groups, HashSet::new()); + + // Update current state from incoming values + match opt_filter { + None => group_indices.iter().zip(values.iter()).for_each( + |(group_index, value)| { + if let Some(value) = value { + self.unique_values[*group_index].insert(Hashable(value)); + } + }, + ), + Some(filter) => group_indices + .iter() + .zip(values.iter()) + .zip(filter.iter()) + .for_each(|((group_index, value), filter_value)| { + if let Some(true) = filter_value { + if let Some(value) = value { + self.unique_values[*group_index].insert(Hashable(value)); + } + } + }), + }; + + Ok(()) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow_array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!( + values.len(), + 1, + "DistinctCountGroupsAccumulator::merge_batch requires single array as state argument" + ); + + let values = as_list_array(&values[0])?; + + self.unique_values.resize(total_num_groups, HashSet::new()); + + // Update current state with `list.values()` for each list in incoming state. + // It's safe to iterate over values(), as lists in incoming state don't contain + // null values. + match opt_filter { + None => group_indices.iter().zip(values.iter()).for_each( + |(group_idx, maybe_set)| { + if let Some(array) = maybe_set { + self.unique_values[*group_idx].extend( + array + .as_primitive::() + .values() + .iter() + .map(|entry| Hashable(*entry)), + ) + }; + }, + ), + Some(filter) => group_indices + .iter() + .zip(values.iter()) + .zip(filter.iter()) + .for_each(|((group_idx, maybe_set), filter_value)| { + if let Some(true) = filter_value { + if let Some(array) = maybe_set { + self.unique_values[*group_idx].extend( + array + .as_primitive::() + .values() + .iter() + .map(|entry| Hashable(*entry)), + ) + } + } + }), + }; + + Ok(()) + } + + fn evaluate(&mut self, emit_to: crate::EmitTo) -> Result { + let unique_values = emit_to.take_needed(&mut self.unique_values); + + // Final state is represented by Int64 array, where each element is a length of corresponding HashSet from `self.unique_values` + return Ok(Arc::new(Int64Array::from( + unique_values + .into_iter() + .map(|set| set.len() as i64) + .collect_vec(), + ))); + } + + fn state(&mut self, emit_to: crate::EmitTo) -> Result> { + let unique_values = emit_to.take_needed(&mut self.unique_values); + + // Intermediate state is represented as ListArray, where each element (list) is a collected HashSet from `self.unique_values`. + let uniques = ListArray::from_iter_primitive::( + unique_values + .iter() + .map(|set| Some(set.iter().map(|val| Some(val.0)).collect_vec())) + .collect_vec(), + ); + + Ok(vec![Arc::new(uniques) as ArrayRef]) + } + + fn size(&self) -> usize { + return + // Size of vector of HashSets + self.unique_values.capacity() * std::mem::size_of::>() + + // Each HashSet size + self.unique_values.iter() + .map(|set| set.capacity() * std::mem::size_of::()) + .sum::(); + } +} + #[cfg(test)] mod tests { use crate::expressions::NoOp; diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs index 0cf4a90ab8cc..6dbb39224629 100644 --- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs @@ -25,11 +25,11 @@ use arrow::array::{Array, ArrayRef}; use arrow_array::cast::AsArray; use arrow_array::types::*; use arrow_array::{ArrowNativeTypeOp, ArrowPrimitiveType}; -use arrow_buffer::{ArrowNativeType, ToByteSlice}; +use arrow_buffer::ArrowNativeType; use std::collections::HashSet; use crate::aggregate::sum::downcast_sum; -use crate::aggregate::utils::down_cast_any_ref; +use crate::aggregate::utils::{down_cast_any_ref, Hashable}; use crate::{AggregateExpr, PhysicalExpr}; use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::type_coercion::aggregates::sum_return_type; @@ -119,24 +119,6 @@ impl PartialEq for DistinctSum { } } -/// A wrapper around a type to provide hash for floats -#[derive(Copy, Clone)] -struct Hashable(T); - -impl std::hash::Hash for Hashable { - fn hash(&self, state: &mut H) { - self.0.to_byte_slice().hash(state) - } -} - -impl PartialEq for Hashable { - fn eq(&self, other: &Self) -> bool { - self.0.is_eq(other.0) - } -} - -impl Eq for Hashable {} - struct DistinctSumAccumulator { values: HashSet, RandomState>, data_type: DataType, diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index e5421ef5ab7e..1176c7401871 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -25,7 +25,7 @@ use arrow_array::types::{ TimestampNanosecondType, TimestampSecondType, }; use arrow_array::ArrowNativeTypeOp; -use arrow_buffer::ArrowNativeType; +use arrow_buffer::{ArrowNativeType, ToByteSlice}; use arrow_schema::{DataType, Field}; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::Accumulator; @@ -205,3 +205,21 @@ pub(crate) fn ordering_fields( }) .collect() } + +/// A wrapper around a type to provide hash for primitive arrow types +#[derive(Copy, Clone)] +pub(crate) struct Hashable(pub T); + +impl std::hash::Hash for Hashable { + fn hash(&self, state: &mut H) { + self.0.to_byte_slice().hash(state) + } +} + +impl PartialEq for Hashable { + fn eq(&self, other: &Self) -> bool { + self.0.is_eq(other.0) + } +} + +impl Eq for Hashable {} From 251fed2fea85e60692870bd6596d5895247dfb30 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Mon, 1 Jan 2024 21:36:25 +0200 Subject: [PATCH 2/6] test coverage --- .../src/aggregate/count_distinct.rs | 658 +++++++++++++++++- 1 file changed, 657 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index ae940a471345..df6b3b1945b9 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -439,17 +439,20 @@ where #[cfg(test)] mod tests { use crate::expressions::NoOp; + use crate::EmitTo; use super::*; use arrow::array::{ ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, - Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + Int64Array, Int8Array, PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, + UInt8Array, }; use arrow::datatypes::DataType; use arrow::datatypes::{ Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; + use arrow_buffer::i256; use datafusion_common::cast::{as_boolean_array, as_list_array, as_primitive_array}; use datafusion_common::internal_err; use datafusion_common::DataFusionError; @@ -806,4 +809,657 @@ mod tests { assert_eq!(result, ScalarValue::Int64(Some(2))); Ok(()) } + + macro_rules! assert_distinct_count_groups_states_eq { + ($ACTUAL:expr, $EXPECTED:expr, $DATA_TYPE: ident) => { + $ACTUAL + .iter() + .zip($EXPECTED.iter()) + .for_each(|(actual, expected)| { + let mut actual = actual + .unwrap() + .as_primitive::<$DATA_TYPE>() + .values() + .to_vec(); + actual.sort(); + + let mut expected = expected.clone(); + expected.sort(); + + assert_eq!(actual, expected); + }); + }; + } + + macro_rules! assert_float_distinct_count_groups_states_eq { + ($ACTUAL:expr, $EXPECTED:expr, $DATA_TYPE: ident) => { + $ACTUAL + .iter() + .zip($EXPECTED.iter()) + .for_each(|(actual, expected)| { + let mut actual = actual + .unwrap() + .as_primitive::<$DATA_TYPE>() + .values() + .to_vec(); + actual.sort_by(|a, b| a.total_cmp(b)); + + let mut expected = expected.clone(); + expected.sort_by(|a, b| a.total_cmp(b)); + + assert_eq!(actual, expected); + }); + }; + } + + macro_rules! integer_count_distinct_groups_accumulator { + ($DATA_TYPE:ident, $PRIM_TYPE:ty, $ACCUM:ident) => { + let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4]; + let input_values: Vec> = vec![ + Some(1), + Some(1), + Some(1), + None, + Some(2), + Some(4), + Some(5), + None, + Some(7), + ]; + let values = Arc::new(PrimitiveArray::<$DATA_TYPE>::from(input_values)); + + // Prepare accumulator and update it with input values + let mut $ACCUM = DistinctCountGroupsAccumulator::<$DATA_TYPE>::new(); + $ACCUM.update_batch(&[values], &group_indices, None, 5)?; + }; + } + + macro_rules! test_count_distinct_groups_evaluate { + ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { + test_count_distinct_groups_evaluate!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); + test_count_distinct_groups_evaluate!( + $DATA_TYPE, + $PRIM_TYPE, + EmitTo::First(3) + ); + + return Ok(()) + }; + + ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { + integer_count_distinct_groups_accumulator!( + $DATA_TYPE, + $PRIM_TYPE, + accumulator + ); + + // Get accumulator evaluation result + let evaluated = accumulator.evaluate($EMIT_TO)?; + let actual = evaluated.as_primitive::(); + + let mut expected_values = vec![2, 1, 0, 2, 1]; + let expected = Int64Array::from($EMIT_TO.take_needed(&mut expected_values)); + + assert_eq!(*actual, expected); + }; + } + + #[test] + fn count_distinct_groups_evaluate_i8() -> Result<()> { + test_count_distinct_groups_evaluate!(Int8Type, i8); + } + + #[test] + fn count_distinct_groups_evaluate_i16() -> Result<()> { + test_count_distinct_groups_evaluate!(Int16Type, i16); + } + + #[test] + fn count_distinct_groups_evaluate_i32() -> Result<()> { + test_count_distinct_groups_evaluate!(Int32Type, i32); + } + + #[test] + fn count_distinct_groups_evaluate_i64() -> Result<()> { + test_count_distinct_groups_evaluate!(Int64Type, i64); + } + + #[test] + fn count_distinct_groups_evaluate_u8() -> Result<()> { + test_count_distinct_groups_evaluate!(UInt8Type, u8); + } + + #[test] + fn count_distinct_groups_evaluate_u16() -> Result<()> { + test_count_distinct_groups_evaluate!(UInt16Type, u16); + } + + #[test] + fn count_distinct_groups_evaluate_u32() -> Result<()> { + test_count_distinct_groups_evaluate!(UInt32Type, u32); + } + + #[test] + fn count_distinct_groups_evaluate_u64() -> Result<()> { + test_count_distinct_groups_evaluate!(UInt64Type, u64); + } + + #[test] + fn count_distinct_groups_evaluate_i128() -> Result<()> { + test_count_distinct_groups_evaluate!(Decimal128Type, i128); + } + + macro_rules! test_count_distinct_groups_state { + ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { + test_count_distinct_groups_state!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); + test_count_distinct_groups_state!($DATA_TYPE, $PRIM_TYPE, EmitTo::First(3)); + + return Ok(()) + }; + ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { + // Prepare accumulator and update it with input values + integer_count_distinct_groups_accumulator!( + $DATA_TYPE, + $PRIM_TYPE, + accumulator + ); + + // Get result state from accumulator + let state = accumulator.state($EMIT_TO)?; + let actual = as_list_array(&state[0])?; + + let expected: Vec> = + vec![vec![1, 2], vec![1], vec![], vec![4, 5], vec![7]]; + + assert_distinct_count_groups_states_eq!(actual, expected, $DATA_TYPE); + }; + } + + #[test] + fn count_distinct_groups_state_i8() -> Result<()> { + test_count_distinct_groups_state!(Int8Type, i8); + } + + #[test] + fn count_distinct_groups_state_i16() -> Result<()> { + test_count_distinct_groups_state!(Int16Type, i16); + } + + #[test] + fn count_distinct_groups_state_i32() -> Result<()> { + test_count_distinct_groups_state!(Int32Type, i32); + } + + #[test] + fn count_distinct_groups_state_i64() -> Result<()> { + test_count_distinct_groups_state!(Int64Type, i64); + } + + #[test] + fn count_distinct_groups_state_u8() -> Result<()> { + test_count_distinct_groups_state!(UInt8Type, u8); + } + + #[test] + fn count_distinct_groups_state_u16() -> Result<()> { + test_count_distinct_groups_state!(UInt16Type, u16); + } + + #[test] + fn count_distinct_groups_state_u32() -> Result<()> { + test_count_distinct_groups_state!(UInt32Type, u32); + } + + #[test] + fn count_distinct_groups_state_u64() -> Result<()> { + test_count_distinct_groups_state!(UInt64Type, u64); + } + + #[test] + fn count_distinct_groups_state_i128() -> Result<()> { + test_count_distinct_groups_state!(Decimal128Type, i128); + } + + macro_rules! test_count_distinct_groups_merge { + ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { + test_count_distinct_groups_merge!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); + test_count_distinct_groups_merge!($DATA_TYPE, $PRIM_TYPE, EmitTo::First(3)); + + return Ok(()) + }; + ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { + // Prepare accumulator and update it with input values + integer_count_distinct_groups_accumulator!( + $DATA_TYPE, + $PRIM_TYPE, + accumulator + ); + + // Prepare merge input and merge it into accumulators state + let merge_group_indices = vec![0, 1, 2, 3, 5]; + let merge_input_values: Vec>>> = vec![ + Some(vec![Some(10), Some(12)]), + Some(vec![]), + Some(vec![]), + Some(vec![Some(4)]), + Some(vec![Some(8)]), + ]; + let merge_input = Arc::new( + ListArray::from_iter_primitive::<$DATA_TYPE, _, _>(merge_input_values), + ) as ArrayRef; + + accumulator.merge_batch(&[merge_input], &merge_group_indices, None, 6)?; + + // Get result state from accumulator + let state = accumulator.state($EMIT_TO)?; + let actual = as_list_array(&state[0])?; + + let expected: Vec> = vec![ + vec![1, 2, 10, 12], + vec![1], + vec![], + vec![4, 5], + vec![7], + vec![8], + ]; + + assert_distinct_count_groups_states_eq!(actual, expected, $DATA_TYPE); + }; + } + + #[test] + fn count_distinct_groups_merge_i8() -> Result<()> { + test_count_distinct_groups_merge!(Int8Type, i8); + } + + #[test] + fn count_distinct_groups_merge_i16() -> Result<()> { + test_count_distinct_groups_merge!(Int16Type, i16); + } + + #[test] + fn count_distinct_groups_merge_i32() -> Result<()> { + test_count_distinct_groups_merge!(Int32Type, i32); + } + + #[test] + fn count_distinct_groups_merge_i64() -> Result<()> { + test_count_distinct_groups_merge!(Int64Type, i64); + } + + #[test] + fn count_distinct_groups_merge_u8() -> Result<()> { + test_count_distinct_groups_merge!(UInt8Type, u8); + } + + #[test] + fn count_distinct_groups_merge_u16() -> Result<()> { + test_count_distinct_groups_merge!(UInt16Type, u16); + } + + #[test] + fn count_distinct_groups_merge_u32() -> Result<()> { + test_count_distinct_groups_merge!(UInt32Type, u32); + } + + #[test] + fn count_distinct_groups_merge_u64() -> Result<()> { + test_count_distinct_groups_merge!(UInt64Type, u64); + } + + #[test] + fn count_distinct_groups_merge_i128() -> Result<()> { + test_count_distinct_groups_merge!(Decimal128Type, i128); + } + + macro_rules! float_count_distinct_groups_accumulator { + ($DATA_TYPE:ident, $PRIM_TYPE:ty, $ACCUM:ident) => { + let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4, 5, 5]; + let input_values: Vec> = vec![ + Some(1.2345), + Some(1.234), + Some(1.234), + None, + Some(2.6543), + Some(4.752399), + Some(5.239493), + None, + Some(7.34058740), + Some(<$PRIM_TYPE>::INFINITY), + Some(<$PRIM_TYPE>::NEG_INFINITY), + ]; + let values = Arc::new(PrimitiveArray::<$DATA_TYPE>::from(input_values)); + + // Prepare accumulator and update it with input values + let mut $ACCUM = DistinctCountGroupsAccumulator::<$DATA_TYPE>::new(); + $ACCUM.update_batch(&[values], &group_indices, None, 6)?; + }; + } + + macro_rules! test_float_count_distinct_groups_evaluate { + ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { + test_float_count_distinct_groups_evaluate!( + $DATA_TYPE, + $PRIM_TYPE, + EmitTo::All + ); + test_float_count_distinct_groups_evaluate!( + $DATA_TYPE, + $PRIM_TYPE, + EmitTo::First(3) + ); + + return Ok(()) + }; + + ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { + // Prepare accumulator and update it with input values + float_count_distinct_groups_accumulator!($DATA_TYPE, $PRIM_TYPE, accumulator); + + // Get accumulator evaluation result + let evaluated = accumulator.evaluate($EMIT_TO)?; + let actual = evaluated.as_primitive::(); + + let mut expected_values = vec![2, 1, 0, 2, 1, 2]; + let expected = Int64Array::from($EMIT_TO.take_needed(&mut expected_values)); + + assert_eq!(*actual, expected); + }; + } + + #[test] + fn count_distinct_groups_evaluate_f32() -> Result<()> { + test_float_count_distinct_groups_evaluate!(Float32Type, f32); + } + + #[test] + fn count_distinct_groups_evaluate_f64() -> Result<()> { + test_float_count_distinct_groups_evaluate!(Float64Type, f64); + } + + macro_rules! test_float_count_distinct_groups_state { + ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { + test_float_count_distinct_groups_state!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); + test_float_count_distinct_groups_state!( + $DATA_TYPE, + $PRIM_TYPE, + EmitTo::First(3) + ); + + return Ok(()) + }; + ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { + float_count_distinct_groups_accumulator!($DATA_TYPE, $PRIM_TYPE, accumulator); + + // Get result state from accumulator + let state = accumulator.state($EMIT_TO)?; + let actual = as_list_array(&state[0])?; + + let expected: Vec> = vec![ + vec![1.2345, 2.6543], + vec![1.234], + vec![], + vec![4.752399, 5.239493], + vec![7.34058740], + vec![<$PRIM_TYPE>::INFINITY, <$PRIM_TYPE>::NEG_INFINITY], + ]; + + assert_float_distinct_count_groups_states_eq!(actual, expected, $DATA_TYPE); + }; + } + + #[test] + fn count_distinct_groups_state_f32() -> Result<()> { + test_float_count_distinct_groups_state!(Float32Type, f32); + } + + #[test] + fn count_distinct_groups_state_f64() -> Result<()> { + test_float_count_distinct_groups_state!(Float64Type, f64); + } + + macro_rules! test_float_count_distinct_groups_merge { + ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { + test_float_count_distinct_groups_merge!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); + test_float_count_distinct_groups_merge!( + $DATA_TYPE, + $PRIM_TYPE, + EmitTo::First(3) + ); + + return Ok(()) + }; + ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { + // Prepare accumulator and update it with input values + float_count_distinct_groups_accumulator!($DATA_TYPE, $PRIM_TYPE, accumulator); + + // Prepare merge input and merge it into accumulators state + let merge_group_indices = vec![0, 1, 2, 3, 6]; + let merge_input_values: Vec>>> = vec![ + Some(vec![Some(10.123), Some(12.633434)]), + Some(vec![]), + Some(vec![]), + Some(vec![Some(4.752399)]), + Some(vec![Some(8.23329)]), + ]; + let merge_input = Arc::new( + ListArray::from_iter_primitive::<$DATA_TYPE, _, _>(merge_input_values), + ) as ArrayRef; + + accumulator.merge_batch(&[merge_input], &merge_group_indices, None, 7)?; + + // Get result state from accumulator + let state = accumulator.state($EMIT_TO)?; + let actual = as_list_array(&state[0])?; + + let expected: Vec> = vec![ + vec![1.2345, 2.6543, 10.123, 12.633434], + vec![1.234], + vec![], + vec![4.752399, 5.239493], + vec![7.34058740], + vec![<$PRIM_TYPE>::INFINITY, <$PRIM_TYPE>::NEG_INFINITY], + vec![8.23329], + ]; + + assert_float_distinct_count_groups_states_eq!(actual, expected, $DATA_TYPE); + }; + } + + #[test] + fn count_distinct_groups_merge_f32() -> Result<()> { + test_float_count_distinct_groups_merge!(Float32Type, f32); + } + + #[test] + fn count_distinct_groups_merge_f64() -> Result<()> { + test_float_count_distinct_groups_merge!(Float64Type, f64); + } + + fn bigint_count_distinct_groups_accumulator( + ) -> Result> { + let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4]; + let input_values: Vec> = vec![ + Some(i256::from(1)), + Some(i256::from(1)), + Some(i256::from(1)), + None, + Some(i256::from(2)), + Some(i256::from(4)), + Some(i256::from(5)), + None, + Some(i256::from(7)), + ]; + let values = Arc::new(PrimitiveArray::::from(input_values)); + + // Prepare accumulator and update it with input values + let mut accumulator = DistinctCountGroupsAccumulator::::new(); + accumulator.update_batch(&[values], &group_indices, None, 5)?; + + Ok(accumulator) + } + + #[test] + fn count_distinct_groups_evaluate_i256() -> Result<()> { + let mut accumulator = bigint_count_distinct_groups_accumulator()?; + + // Get accumulator evaluation result + let evaluated = accumulator.evaluate(EmitTo::All)?; + let actual = evaluated.as_primitive::(); + + let expected = PrimitiveArray::::from(vec![2, 1, 0, 2, 1]); + + // Assert that evaluation result and expected counts for each group are equal + assert_eq!(*actual, expected); + + Ok(()) + } + + #[test] + fn count_distinct_groups_state_i256() -> Result<()> { + let mut accumulator = bigint_count_distinct_groups_accumulator()?; + + // Get accumulator state + let state = accumulator.state(EmitTo::All)?; + let actual = as_list_array(&state[0])?; + + let expected = vec![ + vec![i256::from(1), i256::from(2)], + vec![i256::from(1)], + vec![], + vec![i256::from(4), i256::from(5)], + vec![i256::from(7)], + ]; + + assert_distinct_count_groups_states_eq!(actual, expected, Decimal256Type); + Ok(()) + } + + #[test] + fn count_distinct_groups_merge_i256() -> Result<()> { + let mut accumulator = bigint_count_distinct_groups_accumulator()?; + + // Prepare merge input and merge it into accumulators state + let merge_group_indices = vec![0, 1, 2, 3, 5]; + let merge_input_values = vec![ + Some(vec![Some(i256::from(10)), Some(i256::from(12))]), + Some(vec![]), + Some(vec![]), + Some(vec![Some(i256::from(4))]), + Some(vec![Some(i256::from(8))]), + ]; + let merge_input = Arc::new( + ListArray::from_iter_primitive::(merge_input_values), + ) as ArrayRef; + + accumulator.merge_batch(&[merge_input], &merge_group_indices, None, 6)?; + + // Get result state from accumulator + let state = accumulator.state(EmitTo::All)?; + let actual = as_list_array(&state[0])?; + + let expected = vec![ + vec![i256::from(1), i256::from(2), i256::from(10), i256::from(12)], + vec![i256::from(1)], + vec![], + vec![i256::from(4), i256::from(5)], + vec![i256::from(7)], + vec![i256::from(8)], + ]; + + assert_distinct_count_groups_states_eq!(actual, expected, Decimal256Type); + + Ok(()) + } + + #[test] + fn count_distinct_groups_update_filtered() -> Result<()> { + let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4]; + let input_values = vec![ + Some(1), + Some(1), + Some(1), + None, + Some(2), + Some(4), + Some(5), + None, + Some(7), + ]; + let values = Arc::new(PrimitiveArray::::from(input_values)); + let filter: BooleanArray = + vec![true, true, false, true, true, true, false, true, false].into(); + + // Prepare accumulator and update it with input values + let mut accumulator = DistinctCountGroupsAccumulator::::new(); + accumulator.update_batch(&[values], &group_indices, Some(&filter), 5)?; + + // Get accumulator state + let state = accumulator.state(EmitTo::All)?; + let actual = as_list_array(&state[0])?; + + let expected = vec![vec![1, 2], vec![1], vec![], vec![4], vec![]]; + + assert_distinct_count_groups_states_eq!(actual, expected, Int32Type); + + Ok(()) + } + + #[test] + fn count_distinct_groups_merge_filtered() -> Result<()> { + let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4]; + let input_values = vec![ + Some(1), + Some(1), + Some(1), + None, + Some(2), + Some(4), + Some(5), + None, + Some(7), + ]; + let values = Arc::new(PrimitiveArray::::from(input_values)); + + // Prepare accumulator and update it with input values + let mut accumulator = DistinctCountGroupsAccumulator::::new(); + accumulator.update_batch(&[values], &group_indices, None, 5)?; + + // Prepare merge input and merge it into accumulators state + let merge_group_indices = vec![0, 1, 2, 3, 5]; + let merge_input_values = vec![ + Some(vec![Some(10), Some(12)]), + Some(vec![Some(13)]), + Some(vec![]), + Some(vec![Some(4)]), + Some(vec![Some(18)]), + ]; + let merge_input = Arc::new(ListArray::from_iter_primitive::( + merge_input_values, + )) as ArrayRef; + let merge_filter: BooleanArray = vec![true, false, true, true, false].into(); + + accumulator.merge_batch( + &[merge_input], + &merge_group_indices, + Some(&merge_filter), + 6, + )?; + + // Get result state from accumulator + let state = accumulator.state(EmitTo::All)?; + let actual = as_list_array(&state[0])?; + + let expected = vec![ + vec![1, 2, 10, 12], + vec![1], + vec![], + vec![4, 5], + vec![7], + vec![], + ]; + + assert_distinct_count_groups_states_eq!(actual, expected, Int32Type); + Ok(()) + } } From ac870f92d966fa42e983fe718152a90cdbc661ea Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Tue, 2 Jan 2024 18:12:37 +0200 Subject: [PATCH 3/6] clippy warnings --- .../physical-expr/src/aggregate/count_distinct.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index df6b3b1945b9..9d5bd9c89cb9 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -115,7 +115,8 @@ impl AggregateExpr for DistinctCount { | Date32 | Date64 | Time32(_) - | Time64(_) // | Timestamp(_, _) + | Time64(_) + | Timestamp(_, _) ) } @@ -403,12 +404,12 @@ where let unique_values = emit_to.take_needed(&mut self.unique_values); // Final state is represented by Int64 array, where each element is a length of corresponding HashSet from `self.unique_values` - return Ok(Arc::new(Int64Array::from( + Ok(Arc::new(Int64Array::from( unique_values .into_iter() .map(|set| set.len() as i64) .collect_vec(), - ))); + ))) } fn state(&mut self, emit_to: crate::EmitTo) -> Result> { @@ -1124,7 +1125,7 @@ mod tests { Some(4.752399), Some(5.239493), None, - Some(7.34058740), + Some(7.3405874), Some(<$PRIM_TYPE>::INFINITY), Some(<$PRIM_TYPE>::NEG_INFINITY), ]; @@ -1200,7 +1201,7 @@ mod tests { vec![1.234], vec![], vec![4.752399, 5.239493], - vec![7.34058740], + vec![7.3405874], vec![<$PRIM_TYPE>::INFINITY, <$PRIM_TYPE>::NEG_INFINITY], ]; @@ -1257,7 +1258,7 @@ mod tests { vec![1.234], vec![], vec![4.752399, 5.239493], - vec![7.34058740], + vec![7.3405874], vec![<$PRIM_TYPE>::INFINITY, <$PRIM_TYPE>::NEG_INFINITY], vec![8.23329], ]; From 5e7dfdbb2af81d94fa76768504315aaaeeab446b Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Thu, 4 Jan 2024 10:45:42 +0200 Subject: [PATCH 4/6] count distinct for primitive types --- .../src/aggregate/count_distinct.rs | 1043 ++++------------- .../physical-expr/src/aggregate/utils.rs | 4 +- 2 files changed, 205 insertions(+), 842 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index 9d5bd9c89cb9..e95c3385516b 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -18,27 +18,29 @@ use arrow::datatypes::{DataType, Field, TimeUnit}; use arrow_array::types::{ ArrowPrimitiveType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, - Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; -use arrow_array::{Int64Array, ListArray}; -use itertools::Itertools; +use arrow_array::PrimitiveArray; use std::any::Any; +use std::cmp::Eq; use std::fmt::Debug; +use std::hash::Hash; use std::sync::Arc; use ahash::RandomState; -use arrow::array::{Array, ArrayRef, AsArray}; +use arrow::array::{Array, ArrayRef}; use hashbrown::HashSet; use crate::aggregate::utils::{down_cast_any_ref, Hashable}; use crate::expressions::format_state_name; -use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; -use datafusion_common::cast::as_list_array; -use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use crate::{AggregateExpr, PhysicalExpr}; +use datafusion_common::cast::{as_list_array, as_primitive_array}; +use datafusion_common::utils::array_into_list_array; +use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Accumulator; type DistinctScalarValues = ScalarValue; @@ -69,9 +71,15 @@ impl DistinctCount { } } -macro_rules! distinct_count_groups_accumulator { - ($SELF:expr, $TYPE:ident) => {{ - Ok(Box::new(DistinctCountGroupsAccumulator::<$TYPE>::new())) +macro_rules! native_distinct_count_accumulator { + ($TYPE:ident) => {{ + Ok(Box::new(NativeDistinctCountAccumulator::<$TYPE>::new())) + }}; +} + +macro_rules! float_distinct_count_accumulator { + ($TYPE:ident) => {{ + Ok(Box::new(FloatDistinctCountAccumulator::<$TYPE>::new())) }}; } @@ -97,88 +105,57 @@ impl AggregateExpr for DistinctCount { vec![self.expr.clone()] } - fn groups_accumulator_supported(&self) -> bool { - use DataType::*; - matches!( - self.state_data_type, - Int8 | Int16 - | Int32 - | Int64 - | UInt8 - | UInt16 - | UInt32 - | UInt64 - | Float32 - | Float64 - | Decimal128(_, _) - | Decimal256(_, _) - | Date32 - | Date64 - | Time32(_) - | Time64(_) - | Timestamp(_, _) - ) - } - fn create_accumulator(&self) -> Result> { - Ok(Box::new(DistinctCountAccumulator { - values: HashSet::default(), - state_data_type: self.state_data_type.clone(), - })) - } - - fn create_groups_accumulator(&self) -> Result> { use DataType::*; use TimeUnit::*; - match self.state_data_type { - Int8 => distinct_count_groups_accumulator!(self, Int8Type), - Int16 => distinct_count_groups_accumulator!(self, Int16Type), - Int32 => distinct_count_groups_accumulator!(self, Int32Type), - Int64 => distinct_count_groups_accumulator!(self, Int64Type), - UInt8 => distinct_count_groups_accumulator!(self, UInt8Type), - UInt16 => distinct_count_groups_accumulator!(self, UInt16Type), - UInt32 => distinct_count_groups_accumulator!(self, UInt32Type), - UInt64 => distinct_count_groups_accumulator!(self, UInt64Type), - Float32 => distinct_count_groups_accumulator!(self, Float32Type), - Float64 => distinct_count_groups_accumulator!(self, Float64Type), - Decimal128(_, _) => { - distinct_count_groups_accumulator!(self, Decimal128Type) - } - Decimal256(_, _) => { - distinct_count_groups_accumulator!(self, Decimal256Type) - } - Date32 => distinct_count_groups_accumulator!(self, Date32Type), - Date64 => distinct_count_groups_accumulator!(self, Date64Type), + match &self.state_data_type { + Int8 => native_distinct_count_accumulator!(Int8Type), + Int16 => native_distinct_count_accumulator!(Int16Type), + Int32 => native_distinct_count_accumulator!(Int32Type), + Int64 => native_distinct_count_accumulator!(Int64Type), + UInt8 => native_distinct_count_accumulator!(UInt8Type), + UInt16 => native_distinct_count_accumulator!(UInt16Type), + UInt32 => native_distinct_count_accumulator!(UInt32Type), + UInt64 => native_distinct_count_accumulator!(UInt64Type), + Decimal128(_, _) => native_distinct_count_accumulator!(Decimal128Type), + Decimal256(_, _) => native_distinct_count_accumulator!(Decimal256Type), + + Date32 => native_distinct_count_accumulator!(Date32Type), + Date64 => native_distinct_count_accumulator!(Date64Type), Time32(Millisecond) => { - distinct_count_groups_accumulator!(self, Time32MillisecondType) + native_distinct_count_accumulator!(Time32MillisecondType) } Time32(Second) => { - distinct_count_groups_accumulator!(self, Time32SecondType) + native_distinct_count_accumulator!(Time32SecondType) } Time64(Microsecond) => { - distinct_count_groups_accumulator!(self, Time64MicrosecondType) + native_distinct_count_accumulator!(Time64MicrosecondType) } Time64(Nanosecond) => { - distinct_count_groups_accumulator!(self, Time64NanosecondType) + native_distinct_count_accumulator!(Time64NanosecondType) } Timestamp(Microsecond, _) => { - distinct_count_groups_accumulator!(self, TimestampMicrosecondType) + native_distinct_count_accumulator!(TimestampMicrosecondType) } Timestamp(Millisecond, _) => { - distinct_count_groups_accumulator!(self, TimestampMillisecondType) + native_distinct_count_accumulator!(TimestampMillisecondType) } Timestamp(Nanosecond, _) => { - distinct_count_groups_accumulator!(self, TimestampNanosecondType) + native_distinct_count_accumulator!(TimestampNanosecondType) } Timestamp(Second, _) => { - distinct_count_groups_accumulator!(self, TimestampSecondType) + native_distinct_count_accumulator!(TimestampSecondType) } - _ => internal_err!( - "DistinctCountGroupsAccumulator is not supported for {}", - self.state_data_type - ), + Float16 => float_distinct_count_accumulator!(Float16Type), + Float32 => float_distinct_count_accumulator!(Float32Type), + Float64 => float_distinct_count_accumulator!(Float64Type), + + _ => Ok(Box::new(DistinctCountAccumulator { + values: HashSet::default(), + state_data_type: self.state_data_type.clone(), + })), } } @@ -285,174 +262,179 @@ impl Accumulator for DistinctCountAccumulator { } } -struct DistinctCountGroupsAccumulator +#[derive(Debug)] +struct NativeDistinctCountAccumulator where T: ArrowPrimitiveType + Send, + T::Native: Eq + Hash, { - /// Vector for storing unique values sets for each group index - unique_values: Vec>>, + values: HashSet, } -impl DistinctCountGroupsAccumulator +impl NativeDistinctCountAccumulator where T: ArrowPrimitiveType + Send, + T::Native: Eq + Hash, { fn new() -> Self { Self { - unique_values: vec![], + values: HashSet::default(), } } } -impl GroupsAccumulator for DistinctCountGroupsAccumulator +impl Accumulator for NativeDistinctCountAccumulator where - T: ArrowPrimitiveType + Send, + T: ArrowPrimitiveType + Send + Debug, + T::Native: Eq + Hash, { - fn update_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&arrow_array::BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { - let values = values[0].as_primitive::(); - - self.unique_values.resize(total_num_groups, HashSet::new()); - - // Update current state from incoming values - match opt_filter { - None => group_indices.iter().zip(values.iter()).for_each( - |(group_index, value)| { - if let Some(value) = value { - self.unique_values[*group_index].insert(Hashable(value)); - } - }, - ), - Some(filter) => group_indices - .iter() - .zip(values.iter()) - .zip(filter.iter()) - .for_each(|((group_index, value), filter_value)| { - if let Some(true) = filter_value { - if let Some(value) = value { - self.unique_values[*group_index].insert(Hashable(value)); - } - } - }), - }; + fn state(&self) -> Result> { + let arr = Arc::new(PrimitiveArray::::from_iter_values( + self.values.iter().cloned(), + )) as ArrayRef; + let list = Arc::new(array_into_list_array(arr)) as ArrayRef; + Ok(vec![ScalarValue::List(list)]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + arr.iter().for_each(|value| { + if let Some(value) = value { + self.values.insert(value); + } + }); Ok(()) } - fn merge_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&arrow_array::BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if states.is_empty() { + return Ok(()); + } assert_eq!( - values.len(), + states.len(), 1, - "DistinctCountGroupsAccumulator::merge_batch requires single array as state argument" + "count_distinct states must be single array" ); - let values = as_list_array(&values[0])?; - - self.unique_values.resize(total_num_groups, HashSet::new()); - - // Update current state with `list.values()` for each list in incoming state. - // It's safe to iterate over values(), as lists in incoming state don't contain - // null values. - match opt_filter { - None => group_indices.iter().zip(values.iter()).for_each( - |(group_idx, maybe_set)| { - if let Some(array) = maybe_set { - self.unique_values[*group_idx].extend( - array - .as_primitive::() - .values() - .iter() - .map(|entry| Hashable(*entry)), - ) - }; - }, - ), - Some(filter) => group_indices - .iter() - .zip(values.iter()) - .zip(filter.iter()) - .for_each(|((group_idx, maybe_set), filter_value)| { - if let Some(true) = filter_value { - if let Some(array) = maybe_set { - self.unique_values[*group_idx].extend( - array - .as_primitive::() - .values() - .iter() - .map(|entry| Hashable(*entry)), - ) - } - } - }), - }; + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + self.values.extend(list.values()) + }; + Ok(()) + }) + } - Ok(()) + fn evaluate(&self) -> Result { + Ok(ScalarValue::Int64(Some(self.values.len() as i64))) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + + std::mem::size_of_val(&self.values) + + (std::mem::size_of::() * self.values.capacity()) } +} - fn evaluate(&mut self, emit_to: crate::EmitTo) -> Result { - let unique_values = emit_to.take_needed(&mut self.unique_values); +#[derive(Debug)] +struct FloatDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send, +{ + values: HashSet, RandomState>, +} - // Final state is represented by Int64 array, where each element is a length of corresponding HashSet from `self.unique_values` - Ok(Arc::new(Int64Array::from( - unique_values - .into_iter() - .map(|set| set.len() as i64) - .collect_vec(), - ))) +impl FloatDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send, +{ + fn new() -> Self { + Self { + values: HashSet::default(), + } } +} - fn state(&mut self, emit_to: crate::EmitTo) -> Result> { - let unique_values = emit_to.take_needed(&mut self.unique_values); +impl Accumulator for FloatDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send + Debug, +{ + fn state(&self) -> Result> { + let arr = Arc::new(PrimitiveArray::::from_iter_values( + self.values.iter().map(|v| v.0), + )) as ArrayRef; + let list = Arc::new(array_into_list_array(arr)) as ArrayRef; + Ok(vec![ScalarValue::List(list)]) + } - // Intermediate state is represented as ListArray, where each element (list) is a collected HashSet from `self.unique_values`. - let uniques = ListArray::from_iter_primitive::( - unique_values - .iter() - .map(|set| Some(set.iter().map(|val| Some(val.0)).collect_vec())) - .collect_vec(), + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + arr.iter().for_each(|value| { + if let Some(value) = value { + self.values.insert(Hashable(value)); + } + }); + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if states.is_empty() { + return Ok(()); + } + assert_eq!( + states.len(), + 1, + "count_distinct states must be single array" ); - Ok(vec![Arc::new(uniques) as ArrayRef]) + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + self.values + .extend(list.values().iter().map(|v| Hashable(*v))); + }; + Ok(()) + }) + } + + fn evaluate(&self) -> Result { + Ok(ScalarValue::Int64(Some(self.values.len() as i64))) } fn size(&self) -> usize { - return - // Size of vector of HashSets - self.unique_values.capacity() * std::mem::size_of::>() + - // Each HashSet size - self.unique_values.iter() - .map(|set| set.capacity() * std::mem::size_of::()) - .sum::(); + std::mem::size_of_val(self) + + std::mem::size_of_val(&self.values) + + (std::mem::size_of::() * self.values.capacity()) } } #[cfg(test)] mod tests { use crate::expressions::NoOp; - use crate::EmitTo; use super::*; use arrow::array::{ ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, - Int64Array, Int8Array, PrimitiveArray, UInt16Array, UInt32Array, UInt64Array, - UInt8Array, + Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow::datatypes::DataType; use arrow::datatypes::{ Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; + use arrow_array::Decimal256Array; use arrow_buffer::i256; use datafusion_common::cast::{as_boolean_array, as_list_array, as_primitive_array}; use datafusion_common::internal_err; @@ -615,6 +597,35 @@ mod tests { }}; } + macro_rules! test_count_distinct_update_batch_bigint { + ($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{ + let values: Vec> = vec![ + Some(i256::from(1)), + Some(i256::from(1)), + None, + Some(i256::from(3)), + Some(i256::from(2)), + None, + Some(i256::from(2)), + Some(i256::from(3)), + Some(i256::from(1)), + ]; + + let arrays = vec![Arc::new($ARRAY_TYPE::from(values)) as ArrayRef]; + + let (states, result) = run_update_batch(&arrays)?; + + let mut state_vec = state_to_vec_primitive!(&states[0], $DATA_TYPE); + state_vec.sort(); + + assert_eq!(states.len(), 1); + assert_eq!(state_vec, vec![i256::from(1), i256::from(2), i256::from(3)]); + assert_eq!(result, ScalarValue::Int64(Some(3))); + + Ok(()) + }}; + } + #[test] fn count_distinct_update_batch_i8() -> Result<()> { test_count_distinct_update_batch_numeric!(Int8Array, Int8Type, i8) @@ -665,6 +676,11 @@ mod tests { test_count_distinct_update_batch_floating_point!(Float64Array, Float64Type, f64) } + #[test] + fn count_distinct_update_batch_i256() -> Result<()> { + test_count_distinct_update_batch_bigint!(Decimal256Array, Decimal256Type, i256) + } + #[test] fn count_distinct_update_batch_boolean() -> Result<()> { let get_count = |data: BooleanArray| -> Result<(Vec, i64)> { @@ -810,657 +826,4 @@ mod tests { assert_eq!(result, ScalarValue::Int64(Some(2))); Ok(()) } - - macro_rules! assert_distinct_count_groups_states_eq { - ($ACTUAL:expr, $EXPECTED:expr, $DATA_TYPE: ident) => { - $ACTUAL - .iter() - .zip($EXPECTED.iter()) - .for_each(|(actual, expected)| { - let mut actual = actual - .unwrap() - .as_primitive::<$DATA_TYPE>() - .values() - .to_vec(); - actual.sort(); - - let mut expected = expected.clone(); - expected.sort(); - - assert_eq!(actual, expected); - }); - }; - } - - macro_rules! assert_float_distinct_count_groups_states_eq { - ($ACTUAL:expr, $EXPECTED:expr, $DATA_TYPE: ident) => { - $ACTUAL - .iter() - .zip($EXPECTED.iter()) - .for_each(|(actual, expected)| { - let mut actual = actual - .unwrap() - .as_primitive::<$DATA_TYPE>() - .values() - .to_vec(); - actual.sort_by(|a, b| a.total_cmp(b)); - - let mut expected = expected.clone(); - expected.sort_by(|a, b| a.total_cmp(b)); - - assert_eq!(actual, expected); - }); - }; - } - - macro_rules! integer_count_distinct_groups_accumulator { - ($DATA_TYPE:ident, $PRIM_TYPE:ty, $ACCUM:ident) => { - let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4]; - let input_values: Vec> = vec![ - Some(1), - Some(1), - Some(1), - None, - Some(2), - Some(4), - Some(5), - None, - Some(7), - ]; - let values = Arc::new(PrimitiveArray::<$DATA_TYPE>::from(input_values)); - - // Prepare accumulator and update it with input values - let mut $ACCUM = DistinctCountGroupsAccumulator::<$DATA_TYPE>::new(); - $ACCUM.update_batch(&[values], &group_indices, None, 5)?; - }; - } - - macro_rules! test_count_distinct_groups_evaluate { - ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { - test_count_distinct_groups_evaluate!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); - test_count_distinct_groups_evaluate!( - $DATA_TYPE, - $PRIM_TYPE, - EmitTo::First(3) - ); - - return Ok(()) - }; - - ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { - integer_count_distinct_groups_accumulator!( - $DATA_TYPE, - $PRIM_TYPE, - accumulator - ); - - // Get accumulator evaluation result - let evaluated = accumulator.evaluate($EMIT_TO)?; - let actual = evaluated.as_primitive::(); - - let mut expected_values = vec![2, 1, 0, 2, 1]; - let expected = Int64Array::from($EMIT_TO.take_needed(&mut expected_values)); - - assert_eq!(*actual, expected); - }; - } - - #[test] - fn count_distinct_groups_evaluate_i8() -> Result<()> { - test_count_distinct_groups_evaluate!(Int8Type, i8); - } - - #[test] - fn count_distinct_groups_evaluate_i16() -> Result<()> { - test_count_distinct_groups_evaluate!(Int16Type, i16); - } - - #[test] - fn count_distinct_groups_evaluate_i32() -> Result<()> { - test_count_distinct_groups_evaluate!(Int32Type, i32); - } - - #[test] - fn count_distinct_groups_evaluate_i64() -> Result<()> { - test_count_distinct_groups_evaluate!(Int64Type, i64); - } - - #[test] - fn count_distinct_groups_evaluate_u8() -> Result<()> { - test_count_distinct_groups_evaluate!(UInt8Type, u8); - } - - #[test] - fn count_distinct_groups_evaluate_u16() -> Result<()> { - test_count_distinct_groups_evaluate!(UInt16Type, u16); - } - - #[test] - fn count_distinct_groups_evaluate_u32() -> Result<()> { - test_count_distinct_groups_evaluate!(UInt32Type, u32); - } - - #[test] - fn count_distinct_groups_evaluate_u64() -> Result<()> { - test_count_distinct_groups_evaluate!(UInt64Type, u64); - } - - #[test] - fn count_distinct_groups_evaluate_i128() -> Result<()> { - test_count_distinct_groups_evaluate!(Decimal128Type, i128); - } - - macro_rules! test_count_distinct_groups_state { - ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { - test_count_distinct_groups_state!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); - test_count_distinct_groups_state!($DATA_TYPE, $PRIM_TYPE, EmitTo::First(3)); - - return Ok(()) - }; - ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { - // Prepare accumulator and update it with input values - integer_count_distinct_groups_accumulator!( - $DATA_TYPE, - $PRIM_TYPE, - accumulator - ); - - // Get result state from accumulator - let state = accumulator.state($EMIT_TO)?; - let actual = as_list_array(&state[0])?; - - let expected: Vec> = - vec![vec![1, 2], vec![1], vec![], vec![4, 5], vec![7]]; - - assert_distinct_count_groups_states_eq!(actual, expected, $DATA_TYPE); - }; - } - - #[test] - fn count_distinct_groups_state_i8() -> Result<()> { - test_count_distinct_groups_state!(Int8Type, i8); - } - - #[test] - fn count_distinct_groups_state_i16() -> Result<()> { - test_count_distinct_groups_state!(Int16Type, i16); - } - - #[test] - fn count_distinct_groups_state_i32() -> Result<()> { - test_count_distinct_groups_state!(Int32Type, i32); - } - - #[test] - fn count_distinct_groups_state_i64() -> Result<()> { - test_count_distinct_groups_state!(Int64Type, i64); - } - - #[test] - fn count_distinct_groups_state_u8() -> Result<()> { - test_count_distinct_groups_state!(UInt8Type, u8); - } - - #[test] - fn count_distinct_groups_state_u16() -> Result<()> { - test_count_distinct_groups_state!(UInt16Type, u16); - } - - #[test] - fn count_distinct_groups_state_u32() -> Result<()> { - test_count_distinct_groups_state!(UInt32Type, u32); - } - - #[test] - fn count_distinct_groups_state_u64() -> Result<()> { - test_count_distinct_groups_state!(UInt64Type, u64); - } - - #[test] - fn count_distinct_groups_state_i128() -> Result<()> { - test_count_distinct_groups_state!(Decimal128Type, i128); - } - - macro_rules! test_count_distinct_groups_merge { - ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { - test_count_distinct_groups_merge!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); - test_count_distinct_groups_merge!($DATA_TYPE, $PRIM_TYPE, EmitTo::First(3)); - - return Ok(()) - }; - ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { - // Prepare accumulator and update it with input values - integer_count_distinct_groups_accumulator!( - $DATA_TYPE, - $PRIM_TYPE, - accumulator - ); - - // Prepare merge input and merge it into accumulators state - let merge_group_indices = vec![0, 1, 2, 3, 5]; - let merge_input_values: Vec>>> = vec![ - Some(vec![Some(10), Some(12)]), - Some(vec![]), - Some(vec![]), - Some(vec![Some(4)]), - Some(vec![Some(8)]), - ]; - let merge_input = Arc::new( - ListArray::from_iter_primitive::<$DATA_TYPE, _, _>(merge_input_values), - ) as ArrayRef; - - accumulator.merge_batch(&[merge_input], &merge_group_indices, None, 6)?; - - // Get result state from accumulator - let state = accumulator.state($EMIT_TO)?; - let actual = as_list_array(&state[0])?; - - let expected: Vec> = vec![ - vec![1, 2, 10, 12], - vec![1], - vec![], - vec![4, 5], - vec![7], - vec![8], - ]; - - assert_distinct_count_groups_states_eq!(actual, expected, $DATA_TYPE); - }; - } - - #[test] - fn count_distinct_groups_merge_i8() -> Result<()> { - test_count_distinct_groups_merge!(Int8Type, i8); - } - - #[test] - fn count_distinct_groups_merge_i16() -> Result<()> { - test_count_distinct_groups_merge!(Int16Type, i16); - } - - #[test] - fn count_distinct_groups_merge_i32() -> Result<()> { - test_count_distinct_groups_merge!(Int32Type, i32); - } - - #[test] - fn count_distinct_groups_merge_i64() -> Result<()> { - test_count_distinct_groups_merge!(Int64Type, i64); - } - - #[test] - fn count_distinct_groups_merge_u8() -> Result<()> { - test_count_distinct_groups_merge!(UInt8Type, u8); - } - - #[test] - fn count_distinct_groups_merge_u16() -> Result<()> { - test_count_distinct_groups_merge!(UInt16Type, u16); - } - - #[test] - fn count_distinct_groups_merge_u32() -> Result<()> { - test_count_distinct_groups_merge!(UInt32Type, u32); - } - - #[test] - fn count_distinct_groups_merge_u64() -> Result<()> { - test_count_distinct_groups_merge!(UInt64Type, u64); - } - - #[test] - fn count_distinct_groups_merge_i128() -> Result<()> { - test_count_distinct_groups_merge!(Decimal128Type, i128); - } - - macro_rules! float_count_distinct_groups_accumulator { - ($DATA_TYPE:ident, $PRIM_TYPE:ty, $ACCUM:ident) => { - let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4, 5, 5]; - let input_values: Vec> = vec![ - Some(1.2345), - Some(1.234), - Some(1.234), - None, - Some(2.6543), - Some(4.752399), - Some(5.239493), - None, - Some(7.3405874), - Some(<$PRIM_TYPE>::INFINITY), - Some(<$PRIM_TYPE>::NEG_INFINITY), - ]; - let values = Arc::new(PrimitiveArray::<$DATA_TYPE>::from(input_values)); - - // Prepare accumulator and update it with input values - let mut $ACCUM = DistinctCountGroupsAccumulator::<$DATA_TYPE>::new(); - $ACCUM.update_batch(&[values], &group_indices, None, 6)?; - }; - } - - macro_rules! test_float_count_distinct_groups_evaluate { - ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { - test_float_count_distinct_groups_evaluate!( - $DATA_TYPE, - $PRIM_TYPE, - EmitTo::All - ); - test_float_count_distinct_groups_evaluate!( - $DATA_TYPE, - $PRIM_TYPE, - EmitTo::First(3) - ); - - return Ok(()) - }; - - ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { - // Prepare accumulator and update it with input values - float_count_distinct_groups_accumulator!($DATA_TYPE, $PRIM_TYPE, accumulator); - - // Get accumulator evaluation result - let evaluated = accumulator.evaluate($EMIT_TO)?; - let actual = evaluated.as_primitive::(); - - let mut expected_values = vec![2, 1, 0, 2, 1, 2]; - let expected = Int64Array::from($EMIT_TO.take_needed(&mut expected_values)); - - assert_eq!(*actual, expected); - }; - } - - #[test] - fn count_distinct_groups_evaluate_f32() -> Result<()> { - test_float_count_distinct_groups_evaluate!(Float32Type, f32); - } - - #[test] - fn count_distinct_groups_evaluate_f64() -> Result<()> { - test_float_count_distinct_groups_evaluate!(Float64Type, f64); - } - - macro_rules! test_float_count_distinct_groups_state { - ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { - test_float_count_distinct_groups_state!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); - test_float_count_distinct_groups_state!( - $DATA_TYPE, - $PRIM_TYPE, - EmitTo::First(3) - ); - - return Ok(()) - }; - ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { - float_count_distinct_groups_accumulator!($DATA_TYPE, $PRIM_TYPE, accumulator); - - // Get result state from accumulator - let state = accumulator.state($EMIT_TO)?; - let actual = as_list_array(&state[0])?; - - let expected: Vec> = vec![ - vec![1.2345, 2.6543], - vec![1.234], - vec![], - vec![4.752399, 5.239493], - vec![7.3405874], - vec![<$PRIM_TYPE>::INFINITY, <$PRIM_TYPE>::NEG_INFINITY], - ]; - - assert_float_distinct_count_groups_states_eq!(actual, expected, $DATA_TYPE); - }; - } - - #[test] - fn count_distinct_groups_state_f32() -> Result<()> { - test_float_count_distinct_groups_state!(Float32Type, f32); - } - - #[test] - fn count_distinct_groups_state_f64() -> Result<()> { - test_float_count_distinct_groups_state!(Float64Type, f64); - } - - macro_rules! test_float_count_distinct_groups_merge { - ($DATA_TYPE:ident, $PRIM_TYPE:ty) => { - test_float_count_distinct_groups_merge!($DATA_TYPE, $PRIM_TYPE, EmitTo::All); - test_float_count_distinct_groups_merge!( - $DATA_TYPE, - $PRIM_TYPE, - EmitTo::First(3) - ); - - return Ok(()) - }; - ($DATA_TYPE:ident, $PRIM_TYPE:ty, $EMIT_TO:expr) => { - // Prepare accumulator and update it with input values - float_count_distinct_groups_accumulator!($DATA_TYPE, $PRIM_TYPE, accumulator); - - // Prepare merge input and merge it into accumulators state - let merge_group_indices = vec![0, 1, 2, 3, 6]; - let merge_input_values: Vec>>> = vec![ - Some(vec![Some(10.123), Some(12.633434)]), - Some(vec![]), - Some(vec![]), - Some(vec![Some(4.752399)]), - Some(vec![Some(8.23329)]), - ]; - let merge_input = Arc::new( - ListArray::from_iter_primitive::<$DATA_TYPE, _, _>(merge_input_values), - ) as ArrayRef; - - accumulator.merge_batch(&[merge_input], &merge_group_indices, None, 7)?; - - // Get result state from accumulator - let state = accumulator.state($EMIT_TO)?; - let actual = as_list_array(&state[0])?; - - let expected: Vec> = vec![ - vec![1.2345, 2.6543, 10.123, 12.633434], - vec![1.234], - vec![], - vec![4.752399, 5.239493], - vec![7.3405874], - vec![<$PRIM_TYPE>::INFINITY, <$PRIM_TYPE>::NEG_INFINITY], - vec![8.23329], - ]; - - assert_float_distinct_count_groups_states_eq!(actual, expected, $DATA_TYPE); - }; - } - - #[test] - fn count_distinct_groups_merge_f32() -> Result<()> { - test_float_count_distinct_groups_merge!(Float32Type, f32); - } - - #[test] - fn count_distinct_groups_merge_f64() -> Result<()> { - test_float_count_distinct_groups_merge!(Float64Type, f64); - } - - fn bigint_count_distinct_groups_accumulator( - ) -> Result> { - let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4]; - let input_values: Vec> = vec![ - Some(i256::from(1)), - Some(i256::from(1)), - Some(i256::from(1)), - None, - Some(i256::from(2)), - Some(i256::from(4)), - Some(i256::from(5)), - None, - Some(i256::from(7)), - ]; - let values = Arc::new(PrimitiveArray::::from(input_values)); - - // Prepare accumulator and update it with input values - let mut accumulator = DistinctCountGroupsAccumulator::::new(); - accumulator.update_batch(&[values], &group_indices, None, 5)?; - - Ok(accumulator) - } - - #[test] - fn count_distinct_groups_evaluate_i256() -> Result<()> { - let mut accumulator = bigint_count_distinct_groups_accumulator()?; - - // Get accumulator evaluation result - let evaluated = accumulator.evaluate(EmitTo::All)?; - let actual = evaluated.as_primitive::(); - - let expected = PrimitiveArray::::from(vec![2, 1, 0, 2, 1]); - - // Assert that evaluation result and expected counts for each group are equal - assert_eq!(*actual, expected); - - Ok(()) - } - - #[test] - fn count_distinct_groups_state_i256() -> Result<()> { - let mut accumulator = bigint_count_distinct_groups_accumulator()?; - - // Get accumulator state - let state = accumulator.state(EmitTo::All)?; - let actual = as_list_array(&state[0])?; - - let expected = vec![ - vec![i256::from(1), i256::from(2)], - vec![i256::from(1)], - vec![], - vec![i256::from(4), i256::from(5)], - vec![i256::from(7)], - ]; - - assert_distinct_count_groups_states_eq!(actual, expected, Decimal256Type); - Ok(()) - } - - #[test] - fn count_distinct_groups_merge_i256() -> Result<()> { - let mut accumulator = bigint_count_distinct_groups_accumulator()?; - - // Prepare merge input and merge it into accumulators state - let merge_group_indices = vec![0, 1, 2, 3, 5]; - let merge_input_values = vec![ - Some(vec![Some(i256::from(10)), Some(i256::from(12))]), - Some(vec![]), - Some(vec![]), - Some(vec![Some(i256::from(4))]), - Some(vec![Some(i256::from(8))]), - ]; - let merge_input = Arc::new( - ListArray::from_iter_primitive::(merge_input_values), - ) as ArrayRef; - - accumulator.merge_batch(&[merge_input], &merge_group_indices, None, 6)?; - - // Get result state from accumulator - let state = accumulator.state(EmitTo::All)?; - let actual = as_list_array(&state[0])?; - - let expected = vec![ - vec![i256::from(1), i256::from(2), i256::from(10), i256::from(12)], - vec![i256::from(1)], - vec![], - vec![i256::from(4), i256::from(5)], - vec![i256::from(7)], - vec![i256::from(8)], - ]; - - assert_distinct_count_groups_states_eq!(actual, expected, Decimal256Type); - - Ok(()) - } - - #[test] - fn count_distinct_groups_update_filtered() -> Result<()> { - let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4]; - let input_values = vec![ - Some(1), - Some(1), - Some(1), - None, - Some(2), - Some(4), - Some(5), - None, - Some(7), - ]; - let values = Arc::new(PrimitiveArray::::from(input_values)); - let filter: BooleanArray = - vec![true, true, false, true, true, true, false, true, false].into(); - - // Prepare accumulator and update it with input values - let mut accumulator = DistinctCountGroupsAccumulator::::new(); - accumulator.update_batch(&[values], &group_indices, Some(&filter), 5)?; - - // Get accumulator state - let state = accumulator.state(EmitTo::All)?; - let actual = as_list_array(&state[0])?; - - let expected = vec![vec![1, 2], vec![1], vec![], vec![4], vec![]]; - - assert_distinct_count_groups_states_eq!(actual, expected, Int32Type); - - Ok(()) - } - - #[test] - fn count_distinct_groups_merge_filtered() -> Result<()> { - let group_indices = vec![0, 1, 1, 2, 0, 3, 3, 3, 4]; - let input_values = vec![ - Some(1), - Some(1), - Some(1), - None, - Some(2), - Some(4), - Some(5), - None, - Some(7), - ]; - let values = Arc::new(PrimitiveArray::::from(input_values)); - - // Prepare accumulator and update it with input values - let mut accumulator = DistinctCountGroupsAccumulator::::new(); - accumulator.update_batch(&[values], &group_indices, None, 5)?; - - // Prepare merge input and merge it into accumulators state - let merge_group_indices = vec![0, 1, 2, 3, 5]; - let merge_input_values = vec![ - Some(vec![Some(10), Some(12)]), - Some(vec![Some(13)]), - Some(vec![]), - Some(vec![Some(4)]), - Some(vec![Some(18)]), - ]; - let merge_input = Arc::new(ListArray::from_iter_primitive::( - merge_input_values, - )) as ArrayRef; - let merge_filter: BooleanArray = vec![true, false, true, true, false].into(); - - accumulator.merge_batch( - &[merge_input], - &merge_group_indices, - Some(&merge_filter), - 6, - )?; - - // Get result state from accumulator - let state = accumulator.state(EmitTo::All)?; - let actual = as_list_array(&state[0])?; - - let expected = vec![ - vec![1, 2, 10, 12], - vec![1], - vec![], - vec![4, 5], - vec![7], - vec![], - ]; - - assert_distinct_count_groups_states_eq!(actual, expected, Int32Type); - Ok(()) - } } diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs index 026a5f3b06d8..d73c46a0f687 100644 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ b/datafusion/physical-expr/src/aggregate/utils.rs @@ -212,8 +212,8 @@ pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { ordering_req.iter().map(|item| item.options).collect() } -/// A wrapper around a type to provide hash for primitive arrow types -#[derive(Copy, Clone)] +/// A wrapper around a type to provide hash for floats +#[derive(Copy, Clone, Debug)] pub(crate) struct Hashable(pub T); impl std::hash::Hash for Hashable { From b6772dd1b915fc8e5b40d677b9193ddb031f1c1f Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Thu, 4 Jan 2024 11:25:27 +0200 Subject: [PATCH 5/6] revert hashset to std --- datafusion/physical-expr/src/aggregate/count_distinct.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index e95c3385516b..08e16ac05750 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -33,7 +33,7 @@ use std::sync::Arc; use ahash::RandomState; use arrow::array::{Array, ArrayRef}; -use hashbrown::HashSet; +use std::collections::HashSet; use crate::aggregate::utils::{down_cast_any_ref, Hashable}; use crate::expressions::format_state_name; From a3944cfa54c66df0d3d3a593ee9661c429ec2dbd Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Thu, 4 Jan 2024 17:00:12 +0200 Subject: [PATCH 6/6] fixed accumulator size estimation --- .../src/aggregate/count_distinct.rs | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index 08e16ac05750..f7c13948b2dc 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -336,9 +336,18 @@ where } fn size(&self) -> usize { + let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) + / 7) + .next_power_of_two(); + + // Size of accumulator + // + size of entry * number of buckets + // + 1 byte for each bucket + // + fixed size of HashSet std::mem::size_of_val(self) + + std::mem::size_of::() * estimated_buckets + + estimated_buckets + std::mem::size_of_val(&self.values) - + (std::mem::size_of::() * self.values.capacity()) } } @@ -414,9 +423,18 @@ where } fn size(&self) -> usize { + let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) + / 7) + .next_power_of_two(); + + // Size of accumulator + // + size of entry * number of buckets + // + 1 byte for each bucket + // + fixed size of HashSet std::mem::size_of_val(self) + + std::mem::size_of::() * estimated_buckets + + estimated_buckets + std::mem::size_of_val(&self.values) - + (std::mem::size_of::() * self.values.capacity()) } }