Skip to content

Commit c5afe67

Browse files
marvinlanhenkefindepi
authored andcommitted
Minor: Refactor memory size estimation for HashTable (apache#10748)
* refactor: extract estimate_memory_size * refactor: cap at usize::MAX * refactor: use estimate_memory_size * chore: add examples * refactor: return Result<usize>; add testcase * fix: docs * fix: remove unneccessary checked_div * fix: remove additional and_then
1 parent 8c211cc commit c5afe67

File tree

4 files changed

+153
-42
lines changed

4 files changed

+153
-42
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This module provides a function to estimate the memory size of a HashTable prior to alloaction
19+
20+
use crate::{DataFusionError, Result};
21+
22+
/// Estimates the memory size required for a hash table prior to allocation.
23+
///
24+
/// # Parameters
25+
/// - `num_elements`: The number of elements expected in the hash table.
26+
/// - `fixed_size`: A fixed overhead size associated with the collection
27+
/// (e.g., HashSet or HashTable).
28+
/// - `T`: The type of elements stored in the hash table.
29+
///
30+
/// # Details
31+
/// This function calculates the estimated memory size by considering:
32+
/// - An overestimation of buckets to keep approximately 1/8 of them empty.
33+
/// - The total memory size is computed as:
34+
/// - The size of each entry (`T`) multiplied by the estimated number of
35+
/// buckets.
36+
/// - One byte overhead for each bucket.
37+
/// - The fixed size overhead of the collection.
38+
/// - If the estimation overflows, we return a [`DataFusionError`]
39+
///
40+
/// # Examples
41+
/// ---
42+
///
43+
/// ## From within a struct
44+
///
45+
/// ```rust
46+
/// # use datafusion_common::utils::memory::estimate_memory_size;
47+
/// # use datafusion_common::Result;
48+
///
49+
/// struct MyStruct<T> {
50+
/// values: Vec<T>,
51+
/// other_data: usize,
52+
/// }
53+
///
54+
/// impl<T> MyStruct<T> {
55+
/// fn size(&self) -> Result<usize> {
56+
/// let num_elements = self.values.len();
57+
/// let fixed_size = std::mem::size_of_val(self) +
58+
/// std::mem::size_of_val(&self.values);
59+
///
60+
/// estimate_memory_size::<T>(num_elements, fixed_size)
61+
/// }
62+
/// }
63+
/// ```
64+
/// ---
65+
/// ## With a simple collection
66+
///
67+
/// ```rust
68+
/// # use datafusion_common::utils::memory::estimate_memory_size;
69+
/// # use std::collections::HashMap;
70+
///
71+
/// let num_rows = 100;
72+
/// let fixed_size = std::mem::size_of::<HashMap<u64, u64>>();
73+
/// let estimated_hashtable_size =
74+
/// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size)
75+
/// .expect("Size estimation failed");
76+
/// ```
77+
pub fn estimate_memory_size<T>(num_elements: usize, fixed_size: usize) -> Result<usize> {
78+
// For the majority of cases hashbrown overestimates the bucket quantity
79+
// to keep ~1/8 of them empty. We take this factor into account by
80+
// multiplying the number of elements with a fixed ratio of 8/7 (~1.14).
81+
// This formula leads to overallocation for small tables (< 8 elements)
82+
// but should be fine overall.
83+
num_elements
84+
.checked_mul(8)
85+
.and_then(|overestimate| {
86+
let estimated_buckets = (overestimate / 7).next_power_of_two();
87+
// + size of entry * number of buckets
88+
// + 1 byte for each bucket
89+
// + fixed size of collection (HashSet/HashTable)
90+
std::mem::size_of::<T>()
91+
.checked_mul(estimated_buckets)?
92+
.checked_add(estimated_buckets)?
93+
.checked_add(fixed_size)
94+
})
95+
.ok_or_else(|| {
96+
DataFusionError::Execution(
97+
"usize overflow while estimating the number of buckets".to_string(),
98+
)
99+
})
100+
}
101+
102+
#[cfg(test)]
103+
mod tests {
104+
use std::collections::HashSet;
105+
106+
use super::estimate_memory_size;
107+
108+
#[test]
109+
fn test_estimate_memory() {
110+
// size (bytes): 48
111+
let fixed_size = std::mem::size_of::<HashSet<u32>>();
112+
113+
// estimated buckets: 16 = (8 * 8 / 7).next_power_of_two()
114+
let num_elements = 8;
115+
// size (bytes): 128 = 16 * 4 + 16 + 48
116+
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size).unwrap();
117+
assert_eq!(estimated, 128);
118+
119+
// estimated buckets: 64 = (40 * 8 / 7).next_power_of_two()
120+
let num_elements = 40;
121+
// size (bytes): 368 = 64 * 4 + 64 + 48
122+
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size).unwrap();
123+
assert_eq!(estimated, 368);
124+
}
125+
126+
#[test]
127+
fn test_estimate_memory_overflow() {
128+
let num_elements = usize::MAX;
129+
let fixed_size = std::mem::size_of::<HashSet<u32>>();
130+
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size);
131+
132+
assert!(estimated.is_err());
133+
}
134+
}

datafusion/common/src/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! This module provides the bisect function, which implements binary search.
1919
20+
pub mod memory;
2021
pub mod proxy;
2122

2223
use crate::error::{_internal_datafusion_err, _internal_err};

datafusion/physical-expr/src/aggregate/count_distinct/native.rs

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use arrow_schema::DataType;
3333

3434
use datafusion_common::cast::{as_list_array, as_primitive_array};
3535
use datafusion_common::utils::array_into_list_array;
36+
use datafusion_common::utils::memory::estimate_memory_size;
3637
use datafusion_common::ScalarValue;
3738
use datafusion_expr::Accumulator;
3839

@@ -115,18 +116,11 @@ where
115116
}
116117

117118
fn size(&self) -> usize {
118-
let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX)
119-
/ 7)
120-
.next_power_of_two();
121-
122-
// Size of accumulator
123-
// + size of entry * number of buckets
124-
// + 1 byte for each bucket
125-
// + fixed size of HashSet
126-
std::mem::size_of_val(self)
127-
+ std::mem::size_of::<T::Native>() * estimated_buckets
128-
+ estimated_buckets
129-
+ std::mem::size_of_val(&self.values)
119+
let num_elements = self.values.len();
120+
let fixed_size =
121+
std::mem::size_of_val(self) + std::mem::size_of_val(&self.values);
122+
123+
estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
130124
}
131125
}
132126

@@ -202,17 +196,10 @@ where
202196
}
203197

204198
fn size(&self) -> usize {
205-
let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX)
206-
/ 7)
207-
.next_power_of_two();
208-
209-
// Size of accumulator
210-
// + size of entry * number of buckets
211-
// + 1 byte for each bucket
212-
// + fixed size of HashSet
213-
std::mem::size_of_val(self)
214-
+ std::mem::size_of::<T::Native>() * estimated_buckets
215-
+ estimated_buckets
216-
+ std::mem::size_of_val(&self.values)
199+
let num_elements = self.values.len();
200+
let fixed_size =
201+
std::mem::size_of_val(self) + std::mem::size_of_val(&self.values);
202+
203+
estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
217204
}
218205
}

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! [`HashJoinExec`] Partitioned Hash Join Operator
1919
2020
use std::fmt;
21-
use std::mem::size_of;
2221
use std::sync::atomic::{AtomicUsize, Ordering};
2322
use std::sync::Arc;
2423
use std::task::Poll;
@@ -59,6 +58,7 @@ use arrow::record_batch::RecordBatch;
5958
use arrow::util::bit_util;
6059
use arrow_array::cast::downcast_array;
6160
use arrow_schema::ArrowError;
61+
use datafusion_common::utils::memory::estimate_memory_size;
6262
use datafusion_common::{
6363
internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError,
6464
JoinSide, JoinType, Result,
@@ -875,23 +875,12 @@ async fn collect_left_input(
875875

876876
// Estimation of memory size, required for hashtable, prior to allocation.
877877
// Final result can be verified using `RawTable.allocation_info()`
878-
//
879-
// For majority of cases hashbrown overestimates buckets qty to keep ~1/8 of them empty.
880-
// This formula leads to overallocation for small tables (< 8 elements) but fine overall.
881-
let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| {
882-
DataFusionError::Execution(
883-
"usize overflow while estimating number of hasmap buckets".to_string(),
884-
)
885-
})? / 7)
886-
.next_power_of_two();
887-
// 16 bytes per `(u64, u64)`
888-
// + 1 byte for each bucket
889-
// + fixed size of JoinHashMap (RawTable + Vec)
890-
let estimated_hastable_size =
891-
16 * estimated_buckets + estimated_buckets + size_of::<JoinHashMap>();
892-
893-
reservation.try_grow(estimated_hastable_size)?;
894-
metrics.build_mem_used.add(estimated_hastable_size);
878+
let fixed_size = std::mem::size_of::<JoinHashMap>();
879+
let estimated_hashtable_size =
880+
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?;
881+
882+
reservation.try_grow(estimated_hashtable_size)?;
883+
metrics.build_mem_used.add(estimated_hashtable_size);
895884

896885
let mut hashmap = JoinHashMap::with_capacity(num_rows);
897886
let mut hashes_buffer = Vec::new();

0 commit comments

Comments
 (0)