Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions datafusion/physical-expr/src/aggregate/count_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ macro_rules! float_distinct_count_accumulator {
}};
}

/// Returns the estimated number of hashbrown hashtables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could potentially re-use the comments here:

https://github.com/apache/arrow-datafusion/blob/819d3577872a082f2aea7a68ae83d68534049662/datafusion/physical-plan/src/joins/hash_join.rs#L734-L749

I think the value of this PR / issue is to consolidate the logic in datafusion/physical-expr/src/aggregate/count_distinct.rs with the logic in arrow-datafusion/datafusion/physical-plan/src/joins /hash_join.rs with comments explaining the rationale (aka answering @crepererum 's in comments)

To that end what would you think about:

  1. Adding the code to arrow-datafusion/datafusion/physical-plan/src/common.rs
  2. Use the comments from https://github.com/apache/arrow-datafusion/blob/819d3577872a082f2aea7a68ae83d68534049662/datafusion/physical-plan/src/joins/hash_join.rs#L734-L749 to explain the calculation
  3. Change the code in hash_join.rs to use it too

I think this may require changing the signature to something like

/// Estimates the memory allocated by a [`hashbrown::HashTable`].
///
/// (add explanation about size calculation here)
///
/// Note a [`hashbrown::HashSet`] is implemented as a HashTable with a zero sized key
pub fn estimated_hashtable_size<T>(table: &HashTable<T, RandomState>) -> usize {
...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm 100% cool with the changes!
Thank you all for the review. :)

fn estimated_buckets<T>(hashset: &HashSet<T, RandomState>) -> usize {
(hashset.len().checked_mul(8).unwrap_or(usize::MAX) / 7).next_power_of_two()
}

impl AggregateExpr for DistinctCount {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -336,9 +341,7 @@ where
}

fn size(&self) -> usize {
let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX)
/ 7)
.next_power_of_two();
let estimated_buckets = estimated_buckets(&self.values);

// Size of accumulator
// + size of entry * number of buckets
Expand Down Expand Up @@ -423,9 +426,7 @@ where
}

fn size(&self) -> usize {
let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX)
/ 7)
.next_power_of_two();
let estimated_buckets = estimated_buckets(&self.values);

// Size of accumulator
// + size of entry * number of buckets
Expand Down