Skip to content

Commit 39d2bdc

Browse files
committed
feat: implement MemoryUsage struct and MemoryExplain trait for memory reporting
1 parent f7169df commit 39d2bdc

File tree

1 file changed

+173
-0
lines changed

1 file changed

+173
-0
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
use serde::Serialize;
2+
3+
/// A node in a memory-usage tree, suitable for pretty-printing or JSON
4+
/// serialization. `MemoryUsage` values can be nested, allowing callers to
5+
/// inspect how memory is distributed across sub components.
6+
#[derive(Debug, Serialize)]
7+
pub struct MemoryUsage {
8+
/// Identifier (e.g. operator name or field)
9+
pub name: String,
10+
/// Approximate total bytes used by this node
11+
pub bytes: usize,
12+
/// Breakdown of sub-components
13+
pub children: Vec<MemoryUsage>,
14+
}
15+
16+
/// Trait for types that can report their approximate memory consumption.
17+
///
18+
/// Implementors should provide a hierarchical [`MemoryUsage`] describing all
19+
/// relevant allocations. The provided [`size`](MemoryExplain::size) method
20+
/// simply returns the top level number of bytes.
21+
///
22+
/// The [`bytes`](MemoryUsage::bytes) field of the value returned by
23+
/// [`explain_memory`](MemoryExplain::explain_memory) must match the value
24+
/// returned by [`size`](MemoryExplain::size).
25+
pub trait MemoryExplain {
26+
/// Returns the total bytes used by `self`.
27+
fn size(&self) -> usize {
28+
self.explain_memory().bytes
29+
}
30+
31+
/// Returns a breakdown of memory usage for `self`.
32+
fn explain_memory(&self) -> MemoryUsage;
33+
}
34+
35+
use crate::accumulator::Accumulator;
36+
use crate::groups_accumulator::GroupsAccumulator;
37+
38+
impl MemoryExplain for dyn Accumulator {
39+
fn explain_memory(&self) -> MemoryUsage {
40+
MemoryUsage {
41+
name: std::any::type_name_of_val(self).to_string(),
42+
bytes: self.size(),
43+
children: vec![],
44+
}
45+
}
46+
}
47+
48+
impl MemoryExplain for dyn GroupsAccumulator {
49+
fn explain_memory(&self) -> MemoryUsage {
50+
MemoryUsage {
51+
name: std::any::type_name_of_val(self).to_string(),
52+
bytes: self.size(),
53+
children: vec![],
54+
}
55+
}
56+
}
57+
58+
#[cfg(test)]
59+
mod tests {
60+
use super::*;
61+
use crate::accumulator::Accumulator;
62+
use crate::groups_accumulator::{EmitTo, GroupsAccumulator};
63+
use arrow::array::{ArrayRef, BooleanArray};
64+
use datafusion_common::{Result, ScalarValue};
65+
66+
#[derive(Debug)]
67+
struct MockAcc {
68+
buf: Vec<u8>,
69+
}
70+
71+
impl Default for MockAcc {
72+
fn default() -> Self {
73+
Self {
74+
buf: Vec::with_capacity(4),
75+
}
76+
}
77+
}
78+
79+
impl Accumulator for MockAcc {
80+
fn update_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
81+
Ok(())
82+
}
83+
fn evaluate(&mut self) -> Result<ScalarValue> {
84+
Ok(ScalarValue::from(0u64))
85+
}
86+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
87+
Ok(vec![])
88+
}
89+
fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> {
90+
Ok(())
91+
}
92+
fn size(&self) -> usize {
93+
self.buf.capacity()
94+
}
95+
fn supports_retract_batch(&self) -> bool {
96+
false
97+
}
98+
fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
99+
Ok(())
100+
}
101+
}
102+
103+
#[test]
104+
fn test_accumulator_memory() {
105+
let acc = MockAcc::default();
106+
let usage = (&acc as &dyn Accumulator).explain_memory();
107+
assert_eq!(usage.bytes, 4);
108+
// Name should be the trait object type name
109+
assert_eq!(
110+
usage.name,
111+
std::any::type_name::<dyn Accumulator>().to_string()
112+
);
113+
}
114+
115+
#[derive(Debug)]
116+
struct MockGroupsAcc {
117+
size: usize,
118+
}
119+
120+
impl GroupsAccumulator for MockGroupsAcc {
121+
fn update_batch(
122+
&mut self,
123+
_values: &[ArrayRef],
124+
_groups: &[usize],
125+
_filter: Option<&BooleanArray>,
126+
_n: usize,
127+
) -> Result<()> {
128+
Ok(())
129+
}
130+
fn evaluate(&mut self, _emit_to: EmitTo) -> Result<ArrayRef> {
131+
Err(datafusion_common::DataFusionError::Internal(
132+
"not used".into(),
133+
))
134+
}
135+
fn state(&mut self, _emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
136+
Ok(vec![])
137+
}
138+
fn merge_batch(
139+
&mut self,
140+
_values: &[ArrayRef],
141+
_groups: &[usize],
142+
_filter: Option<&BooleanArray>,
143+
_n: usize,
144+
) -> Result<()> {
145+
Ok(())
146+
}
147+
fn convert_to_state(
148+
&self,
149+
_values: &[ArrayRef],
150+
_filter: Option<&BooleanArray>,
151+
) -> Result<Vec<ArrayRef>> {
152+
Ok(vec![])
153+
}
154+
fn supports_convert_to_state(&self) -> bool {
155+
false
156+
}
157+
fn size(&self) -> usize {
158+
self.size
159+
}
160+
}
161+
162+
#[test]
163+
fn test_groups_acc_memory() {
164+
let acc = MockGroupsAcc { size: 8 };
165+
let usage = (&acc as &dyn GroupsAccumulator).explain_memory();
166+
assert_eq!(usage.bytes, 8);
167+
// Name should be the trait object type name
168+
assert_eq!(
169+
usage.name,
170+
std::any::type_name::<dyn GroupsAccumulator>().to_string()
171+
);
172+
}
173+
}

0 commit comments

Comments
 (0)