Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_physical_expr::binary_map::OutputType;

use hashbrown::raw::RawTable;

/// Compare GroupValue Rows column by column
/// A [`GroupValues`] that stores multiple columns of group values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very familiar with it and thus the question, what is group values?
is it group keys, or exact values attached for specific key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the following example, (1, 'a') is one of the group values, so is (2, 'b') and (3, 'c')

statement ok
create table t(a int, b varchar) as values (1, 'a'), (2, 'b'), (1, 'a'), (3, 'c');

query ITI
select a, b, count(*) from t group by a, b;
----
1 a 2
2 b 1
3 c 1

Copy link
Contributor

@jayzhan211 jayzhan211 Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Rows implementation, we convert (1, 'a') to row and compare against it. In Column implementation, we compare iteratively from 1 to 'a' in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great example. I added in to the docs

pub struct GroupValuesColumn {
/// The output schema
schema: SchemaRef,
Expand All @@ -55,7 +55,7 @@ pub struct GroupValuesColumn {
map_size: usize,

/// The actual group by values, stored column-wise. Compare from
/// the left to right, each column is stored as `ArrayRowEq`.
/// the left to right, each column is stored as [`ArrayRowEq`].
/// This is shown faster than the row format
group_values: Vec<Box<dyn ArrayRowEq>>,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ use std::vec;

use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};

/// Trait for group values column-wise row comparison
/// Trait for storing a single column of group values in [`GroupValuesColumn`]
///
/// Implementations of this trait store a in-progress collection of group values
/// (similar to various builders in Arrow-rs) that allow for quick comparison to
/// incoming rows.
///
/// [`GroupValuesColumn`]: crate::aggregates::group_values::column_wise::GroupValuesColumn
pub trait ArrayRowEq: Send + Sync {
/// Returns equal if the row stored in this builder at `lhs_row` is equal to
/// the row in `array` at `rhs_row`
Expand All @@ -60,11 +61,13 @@ pub trait ArrayRowEq: Send + Sync {
fn take_n(&mut self, n: usize) -> ArrayRef;
}

/// An implementation of [`ArrayRowEq`] for primitive types.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was it renamed in #12619?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good call -- will fix

pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
group_values: Vec<T::Native>,
nulls: Vec<bool>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be it a BooleanArray? so this null checks will be faster and in 1 place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are foreshadowing :) This is an excellent point. @jayzhan211 and I are working on exactly this topic. #12623

// whether the array contains at least one null, for fast non-null path
/// whether the array contains at least one null, for fast non-null path
has_null: bool,
/// Can the input array contain nulls?
nullable: bool,
}

Expand Down Expand Up @@ -154,13 +157,14 @@ impl<T: ArrowPrimitiveType> ArrayRowEq for PrimitiveGroupValueBuilder<T> {
}
}

/// An implementation of [`ArrayRowEq`] for binary and utf8 types.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed?

pub struct ByteGroupValueBuilder<O>
where
O: OffsetSizeTrait,
{
output_type: OutputType,
buffer: BufferBuilder<u8>,
/// Offsets into `buffer` for each distinct value. These offsets as used
/// Offsets into `buffer` for each distinct value. These offsets as used
/// directly to create the final `GenericBinaryArray`. The `i`th string is
/// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
/// are stored as a zero length string.
Expand Down
27 changes: 23 additions & 4 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! [`GroupValues`] trait for storing and interning group keys

use arrow::record_batch::RecordBatch;
use arrow_array::{downcast_primitive, ArrayRef};
use arrow_schema::{DataType, SchemaRef};
Expand All @@ -37,18 +39,34 @@ use datafusion_physical_expr::binary_map::OutputType;

mod group_value_row;

/// An interning store for group keys
/// Stores the group values during hash aggregation.
///
/// There are multiple specialized implementations of this trait optimized for
/// different data types and number of columns, instantiated by
/// [`new_group_values`].
///
/// Each distinct group in a hash aggregation is identified by a unique group id
/// (usize) which is assigned by instances of this trait. Group ids are
/// continuous without gaps, starting from 0.
pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
/// Calculates the group id for each input row of `cols`, assigning new
/// group ids as necessary.
///
/// When the function returns, `groups` must contain the group id for each
/// row in `cols`.
///
/// If a row has the same value as a previous row, the same group id is
/// assigned. If a row has a new value, the next available group id is
/// assigned.
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;

/// Returns the number of bytes used by this [`GroupValues`]
/// Returns the number of bytes of memory used by this [`GroupValues`]
fn size(&self) -> usize;

/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;

/// The number of values stored in this [`GroupValues`]
/// The number of values (distinct group values) stored in this [`GroupValues`]
fn len(&self) -> usize;

/// Emits the group values
Expand All @@ -58,6 +76,7 @@ pub trait GroupValues: Send {
fn clear_shrink(&mut self, batch: &RecordBatch);
}

/// Return a specialized implementation of [`GroupValues`] for the given schema.
pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();
Expand Down
10 changes: 9 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ use hashbrown::raw::RawTable;
use std::sync::Arc;

/// A [`GroupValues`] making use of [`Rows`]
///
/// This is a general implementation of [`GroupValues`] that works for any
/// combination of data types and number of columns, including nested types such as
/// structs and lists.
///
/// It uses the arrow-rs [`Rows`] to store the group values, which is a row-wise
/// representation.
pub struct GroupValuesRows {
/// The output schema
schema: SchemaRef,
Expand Down Expand Up @@ -219,7 +226,8 @@ impl GroupValues for GroupValuesRows {
}
};

// TODO: Materialize dictionaries in group keys (#7647)
// TODO: Materialize dictionaries in group keys
// https://github.com/apache/datafusion/issues/7647
for (field, array) in self.schema.fields.iter().zip(&mut output) {
let expected = field.data_type();
*array = dictionary_encode_if_necessary(
Expand Down