diff --git a/datafusion/physical-plan/src/metrics/custom.rs b/datafusion/physical-plan/src/metrics/custom.rs new file mode 100644 index 000000000000..546af6f3335e --- /dev/null +++ b/datafusion/physical-plan/src/metrics/custom.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Custom metric value type. + +use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc}; + +/// A trait for implementing custom metric values. +/// +/// This trait enables defining application- or operator-specific metric types +/// that can be aggregated and displayed alongside standard metrics. These +/// custom metrics integrate with [`MetricValue::Custom`] and support +/// aggregation logic, introspection, and optional numeric representation. +/// +/// # Requirements +/// Implementations of `CustomMetricValue` must satisfy the following: +/// +/// 1. [`Self::aggregate`]: Defines how two metric values are combined +/// 2. [`Self::new_empty`]: Returns a new, zero-value instance for accumulation +/// 3. [`Self::as_any`]: Enables dynamic downcasting for type-specific operations +/// 4. [`Self::as_usize`]: Optionally maps the value to a `usize` (for sorting, display, etc.) +/// 5. [`Self::is_eq`]: Implements comparison between two values, this isn't reusing the std +/// PartialEq trait because this trait is used dynamically in the context of +/// [`MetricValue::Custom`] +/// +/// # Examples +/// ``` +/// # use std::sync::Arc; +/// # use std::fmt::{Debug, Display}; +/// # use std::any::Any; +/// # use std::sync::atomic::{AtomicUsize, Ordering}; +/// +/// # use datafusion_physical_plan::metrics::CustomMetricValue; +/// +/// #[derive(Debug, Default)] +/// struct MyCounter { +/// count: AtomicUsize, +/// } +/// +/// impl Display for MyCounter { +/// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { +/// write!(f, "count: {}", self.count.load(Ordering::Relaxed)) +/// } +/// } +/// +/// impl CustomMetricValue for MyCounter { +/// fn new_empty(&self) -> Arc { +/// Arc::new(Self::default()) +/// } +/// +/// fn aggregate(&self, other: Arc) { +/// let other = other.as_any().downcast_ref::().unwrap(); +/// self.count.fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed); +/// } +/// +/// fn as_any(&self) -> &dyn Any { +/// self +/// } +/// +/// fn as_usize(&self) -> usize { +/// self.count.load(Ordering::Relaxed) +/// } +/// +/// fn is_eq(&self, other: &Arc) -> bool { +/// let Some(other) = other.as_any().downcast_ref::() else { +/// return false; +/// }; +/// +/// self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed) +/// } +/// } +/// ``` +/// +/// [`MetricValue::Custom`]: super::MetricValue::Custom +pub trait CustomMetricValue: Display + Debug + Send + Sync { + /// Returns a new, zero-initialized version of this metric value. + /// + /// This value is used during metric aggregation to accumulate results. + fn new_empty(&self) -> Arc; + + /// Merges another metric value into this one. + /// + /// The type of `other` could be of a different custom type as long as it's aggregatable into self. + fn aggregate(&self, other: Arc); + + /// Returns this value as a [`Any`] to support dynamic downcasting. + fn as_any(&self) -> &dyn Any; + + /// Optionally returns a numeric representation of the value, if meaningful. + /// Otherwise will default to zero. + /// + /// This is used for sorting and summarizing metrics. + fn as_usize(&self) -> usize { + 0 + } + + /// Compares this value with another custom value. + fn is_eq(&self, other: &Arc) -> bool; +} diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 2ac7ac1299a0..87783eada8b0 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -19,6 +19,7 @@ mod baseline; mod builder; +mod custom; mod value; use parking_lot::Mutex; @@ -33,6 +34,7 @@ use datafusion_common::HashMap; // public exports pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics}; pub use builder::MetricBuilder; +pub use custom::CustomMetricValue; pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// Something that tracks a value of interest (metric) of a DataFusion @@ -263,6 +265,7 @@ impl MetricsSet { MetricValue::Gauge { name, .. } => name == metric_name, MetricValue::StartTimestamp(_) => false, MetricValue::EndTimestamp(_) => false, + MetricValue::Custom { .. } => false, }) } diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 249cd5edb133..1cc4a4fbcb05 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -17,9 +17,14 @@ //! Value representation of metrics +use super::CustomMetricValue; +use chrono::{DateTime, Utc}; +use datafusion_common::instant::Instant; +use datafusion_execution::memory_pool::human_readable_size; +use parking_lot::Mutex; use std::{ borrow::{Borrow, Cow}, - fmt::Display, + fmt::{Debug, Display}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -27,11 +32,6 @@ use std::{ time::Duration, }; -use chrono::{DateTime, Utc}; -use datafusion_common::instant::Instant; -use datafusion_execution::memory_pool::human_readable_size; -use parking_lot::Mutex; - /// A counter to record things such as number of input or output rows /// /// Note `clone`ing counters update the same underlying metrics @@ -344,7 +344,7 @@ impl Drop for ScopedTimerGuard<'_> { /// Among other differences, the metric types have different ways to /// logically interpret their underlying values and some metrics are /// so common they are given special treatment. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum MetricValue { /// Number of output rows produced: "output_rows" metric OutputRows(Count), @@ -401,6 +401,78 @@ pub enum MetricValue { StartTimestamp(Timestamp), /// The time at which execution ended EndTimestamp(Timestamp), + Custom { + /// The provided name of this metric + name: Cow<'static, str>, + /// A custom implementation of the metric value. + value: Arc, + }, +} + +// Manually implement PartialEq for `MetricValue` because it contains CustomMetricValue in its +// definition which is a dyn trait. This wouldn't allow us to just derive PartialEq. +impl PartialEq for MetricValue { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => { + count == other + } + (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => { + time == other + } + (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => { + count == other + } + (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => { + count == other + } + (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => { + count == other + } + ( + MetricValue::CurrentMemoryUsage(gauge), + MetricValue::CurrentMemoryUsage(other), + ) => gauge == other, + ( + MetricValue::Count { name, count }, + MetricValue::Count { + name: other_name, + count: other_count, + }, + ) => name == other_name && count == other_count, + ( + MetricValue::Gauge { name, gauge }, + MetricValue::Gauge { + name: other_name, + gauge: other_gauge, + }, + ) => name == other_name && gauge == other_gauge, + ( + MetricValue::Time { name, time }, + MetricValue::Time { + name: other_name, + time: other_time, + }, + ) => name == other_name && time == other_time, + + ( + MetricValue::StartTimestamp(timestamp), + MetricValue::StartTimestamp(other), + ) => timestamp == other, + (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => { + timestamp == other + } + ( + MetricValue::Custom { name, value }, + MetricValue::Custom { + name: other_name, + value: other_value, + }, + ) => name == other_name && value.is_eq(other_value), + // Default case when the two sides do not have the same type. + _ => false, + } + } } impl MetricValue { @@ -418,6 +490,7 @@ impl MetricValue { Self::Time { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", + Self::Custom { name, .. } => name.borrow(), } } @@ -443,6 +516,7 @@ impl MetricValue { .and_then(|ts| ts.timestamp_nanos_opt()) .map(|nanos| nanos as usize) .unwrap_or(0), + Self::Custom { value, .. } => value.as_usize(), } } @@ -470,6 +544,10 @@ impl MetricValue { }, Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()), Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()), + Self::Custom { name, value } => Self::Custom { + name: name.clone(), + value: value.new_empty(), + }, } } @@ -516,6 +594,14 @@ impl MetricValue { (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => { timestamp.update_to_max(other_timestamp); } + ( + Self::Custom { value, .. }, + Self::Custom { + value: other_value, .. + }, + ) => { + value.aggregate(Arc::clone(other_value)); + } m @ (_, _) => { panic!( "Mismatched metric types. Can not aggregate {:?} with value {:?}", @@ -540,6 +626,7 @@ impl MetricValue { Self::Time { .. } => 8, Self::StartTimestamp(_) => 9, // show timestamps last Self::EndTimestamp(_) => 10, + Self::Custom { .. } => 11, } } @@ -578,17 +665,103 @@ impl Display for MetricValue { Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => { write!(f, "{timestamp}") } + Self::Custom { name, value } => { + write!(f, "name:{name} {value}") + } } } } #[cfg(test)] mod tests { + use std::any::Any; + use chrono::TimeZone; use datafusion_execution::memory_pool::units::MB; use super::*; + #[derive(Debug, Default)] + pub struct CustomCounter { + count: AtomicUsize, + } + + impl Display for CustomCounter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "count: {}", self.count.load(Ordering::Relaxed)) + } + } + + impl CustomMetricValue for CustomCounter { + fn new_empty(&self) -> Arc { + Arc::new(CustomCounter::default()) + } + + fn aggregate(&self, other: Arc) { + let other = other.as_any().downcast_ref::().unwrap(); + self.count + .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed); + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn is_eq(&self, other: &Arc) -> bool { + let Some(other) = other.as_any().downcast_ref::() else { + return false; + }; + + self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed) + } + } + + fn new_custom_counter(name: &'static str, value: usize) -> MetricValue { + let custom_counter = CustomCounter::default(); + custom_counter.count.fetch_add(value, Ordering::Relaxed); + let custom_val = MetricValue::Custom { + name: Cow::Borrowed(name), + value: Arc::new(custom_counter), + }; + + custom_val + } + + #[test] + fn test_custom_metric_with_mismatching_names() { + let mut custom_val = new_custom_counter("Hi", 1); + let other_custom_val = new_custom_counter("Hello", 1); + + // Not equal since the name differs. + assert!(other_custom_val != custom_val); + + // Should work even though the name differs + custom_val.aggregate(&other_custom_val); + + let expected_val = new_custom_counter("Hi", 2); + assert!(expected_val == custom_val); + } + + #[test] + fn test_custom_metric() { + let mut custom_val = new_custom_counter("hi", 11); + let other_custom_val = new_custom_counter("hi", 20); + + custom_val.aggregate(&other_custom_val); + + assert!(custom_val != other_custom_val); + + if let MetricValue::Custom { value, .. } = custom_val { + let counter = value + .as_any() + .downcast_ref::() + .expect("Expected CustomCounter"); + assert_eq!(counter.count.load(Ordering::Relaxed), 31); + } else { + panic!("Unexpected value"); + } + } + #[test] fn test_display_output_rows() { let count = Count::new();