From b3e396bbd3fe7c8df9e781ba659095d90f5c8937 Mon Sep 17 00:00:00 2001 From: Sami Tabet Date: Tue, 27 May 2025 13:35:48 +0200 Subject: [PATCH] feat: Support defining custom MetricValues in PhysicalPlans See this issue: https://github.com/apache/datafusion/issues/16044 The MetricValue enum currently exposes only single-value statistics: counts, gauges, timers, timestamps, and a few hard-coded variants such as SpillCount or OutputRows. However there's often the need for custom metrics when using custom PhysicalPlans. At Datadog for instance we had the need for tracking the distribution of latencies of the sub-queries issued by a given phyiscal plan to be able to pin-point outliers. Similarly tracking the topN slowest sub-query is something that has been quite useful to help us debug slow queries. This PR allows each user to define their own MetricValue types as long as they are aggregatable. A very basic example is included in the PR using a custom counter. --- .../physical-plan/src/metrics/custom.rs | 113 +++++++++++ datafusion/physical-plan/src/metrics/mod.rs | 3 + datafusion/physical-plan/src/metrics/value.rs | 187 +++++++++++++++++- 3 files changed, 296 insertions(+), 7 deletions(-) create mode 100644 datafusion/physical-plan/src/metrics/custom.rs diff --git a/datafusion/physical-plan/src/metrics/custom.rs b/datafusion/physical-plan/src/metrics/custom.rs new file mode 100644 index 000000000000..546af6f3335e --- /dev/null +++ b/datafusion/physical-plan/src/metrics/custom.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Custom metric value type. + +use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc}; + +/// A trait for implementing custom metric values. +/// +/// This trait enables defining application- or operator-specific metric types +/// that can be aggregated and displayed alongside standard metrics. These +/// custom metrics integrate with [`MetricValue::Custom`] and support +/// aggregation logic, introspection, and optional numeric representation. +/// +/// # Requirements +/// Implementations of `CustomMetricValue` must satisfy the following: +/// +/// 1. [`Self::aggregate`]: Defines how two metric values are combined +/// 2. [`Self::new_empty`]: Returns a new, zero-value instance for accumulation +/// 3. [`Self::as_any`]: Enables dynamic downcasting for type-specific operations +/// 4. [`Self::as_usize`]: Optionally maps the value to a `usize` (for sorting, display, etc.) +/// 5. [`Self::is_eq`]: Implements comparison between two values, this isn't reusing the std +/// PartialEq trait because this trait is used dynamically in the context of +/// [`MetricValue::Custom`] +/// +/// # Examples +/// ``` +/// # use std::sync::Arc; +/// # use std::fmt::{Debug, Display}; +/// # use std::any::Any; +/// # use std::sync::atomic::{AtomicUsize, Ordering}; +/// +/// # use datafusion_physical_plan::metrics::CustomMetricValue; +/// +/// #[derive(Debug, Default)] +/// struct MyCounter { +/// count: AtomicUsize, +/// } +/// +/// impl Display for MyCounter { +/// fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { +/// write!(f, "count: {}", self.count.load(Ordering::Relaxed)) +/// } +/// } +/// +/// impl CustomMetricValue for MyCounter { +/// fn new_empty(&self) -> Arc { +/// Arc::new(Self::default()) +/// } +/// +/// fn aggregate(&self, other: Arc) { +/// let other = other.as_any().downcast_ref::().unwrap(); +/// self.count.fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed); +/// } +/// +/// fn as_any(&self) -> &dyn Any { +/// self +/// } +/// +/// fn as_usize(&self) -> usize { +/// self.count.load(Ordering::Relaxed) +/// } +/// +/// fn is_eq(&self, other: &Arc) -> bool { +/// let Some(other) = other.as_any().downcast_ref::() else { +/// return false; +/// }; +/// +/// self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed) +/// } +/// } +/// ``` +/// +/// [`MetricValue::Custom`]: super::MetricValue::Custom +pub trait CustomMetricValue: Display + Debug + Send + Sync { + /// Returns a new, zero-initialized version of this metric value. + /// + /// This value is used during metric aggregation to accumulate results. + fn new_empty(&self) -> Arc; + + /// Merges another metric value into this one. + /// + /// The type of `other` could be of a different custom type as long as it's aggregatable into self. + fn aggregate(&self, other: Arc); + + /// Returns this value as a [`Any`] to support dynamic downcasting. + fn as_any(&self) -> &dyn Any; + + /// Optionally returns a numeric representation of the value, if meaningful. + /// Otherwise will default to zero. + /// + /// This is used for sorting and summarizing metrics. + fn as_usize(&self) -> usize { + 0 + } + + /// Compares this value with another custom value. + fn is_eq(&self, other: &Arc) -> bool; +} diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 2ac7ac1299a0..87783eada8b0 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -19,6 +19,7 @@ mod baseline; mod builder; +mod custom; mod value; use parking_lot::Mutex; @@ -33,6 +34,7 @@ use datafusion_common::HashMap; // public exports pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics}; pub use builder::MetricBuilder; +pub use custom::CustomMetricValue; pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; /// Something that tracks a value of interest (metric) of a DataFusion @@ -263,6 +265,7 @@ impl MetricsSet { MetricValue::Gauge { name, .. } => name == metric_name, MetricValue::StartTimestamp(_) => false, MetricValue::EndTimestamp(_) => false, + MetricValue::Custom { .. } => false, }) } diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 249cd5edb133..1cc4a4fbcb05 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -17,9 +17,14 @@ //! Value representation of metrics +use super::CustomMetricValue; +use chrono::{DateTime, Utc}; +use datafusion_common::instant::Instant; +use datafusion_execution::memory_pool::human_readable_size; +use parking_lot::Mutex; use std::{ borrow::{Borrow, Cow}, - fmt::Display, + fmt::{Debug, Display}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -27,11 +32,6 @@ use std::{ time::Duration, }; -use chrono::{DateTime, Utc}; -use datafusion_common::instant::Instant; -use datafusion_execution::memory_pool::human_readable_size; -use parking_lot::Mutex; - /// A counter to record things such as number of input or output rows /// /// Note `clone`ing counters update the same underlying metrics @@ -344,7 +344,7 @@ impl Drop for ScopedTimerGuard<'_> { /// Among other differences, the metric types have different ways to /// logically interpret their underlying values and some metrics are /// so common they are given special treatment. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum MetricValue { /// Number of output rows produced: "output_rows" metric OutputRows(Count), @@ -401,6 +401,78 @@ pub enum MetricValue { StartTimestamp(Timestamp), /// The time at which execution ended EndTimestamp(Timestamp), + Custom { + /// The provided name of this metric + name: Cow<'static, str>, + /// A custom implementation of the metric value. + value: Arc, + }, +} + +// Manually implement PartialEq for `MetricValue` because it contains CustomMetricValue in its +// definition which is a dyn trait. This wouldn't allow us to just derive PartialEq. +impl PartialEq for MetricValue { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => { + count == other + } + (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => { + time == other + } + (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) => { + count == other + } + (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => { + count == other + } + (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => { + count == other + } + ( + MetricValue::CurrentMemoryUsage(gauge), + MetricValue::CurrentMemoryUsage(other), + ) => gauge == other, + ( + MetricValue::Count { name, count }, + MetricValue::Count { + name: other_name, + count: other_count, + }, + ) => name == other_name && count == other_count, + ( + MetricValue::Gauge { name, gauge }, + MetricValue::Gauge { + name: other_name, + gauge: other_gauge, + }, + ) => name == other_name && gauge == other_gauge, + ( + MetricValue::Time { name, time }, + MetricValue::Time { + name: other_name, + time: other_time, + }, + ) => name == other_name && time == other_time, + + ( + MetricValue::StartTimestamp(timestamp), + MetricValue::StartTimestamp(other), + ) => timestamp == other, + (MetricValue::EndTimestamp(timestamp), MetricValue::EndTimestamp(other)) => { + timestamp == other + } + ( + MetricValue::Custom { name, value }, + MetricValue::Custom { + name: other_name, + value: other_value, + }, + ) => name == other_name && value.is_eq(other_value), + // Default case when the two sides do not have the same type. + _ => false, + } + } } impl MetricValue { @@ -418,6 +490,7 @@ impl MetricValue { Self::Time { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", + Self::Custom { name, .. } => name.borrow(), } } @@ -443,6 +516,7 @@ impl MetricValue { .and_then(|ts| ts.timestamp_nanos_opt()) .map(|nanos| nanos as usize) .unwrap_or(0), + Self::Custom { value, .. } => value.as_usize(), } } @@ -470,6 +544,10 @@ impl MetricValue { }, Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()), Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()), + Self::Custom { name, value } => Self::Custom { + name: name.clone(), + value: value.new_empty(), + }, } } @@ -516,6 +594,14 @@ impl MetricValue { (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => { timestamp.update_to_max(other_timestamp); } + ( + Self::Custom { value, .. }, + Self::Custom { + value: other_value, .. + }, + ) => { + value.aggregate(Arc::clone(other_value)); + } m @ (_, _) => { panic!( "Mismatched metric types. Can not aggregate {:?} with value {:?}", @@ -540,6 +626,7 @@ impl MetricValue { Self::Time { .. } => 8, Self::StartTimestamp(_) => 9, // show timestamps last Self::EndTimestamp(_) => 10, + Self::Custom { .. } => 11, } } @@ -578,17 +665,103 @@ impl Display for MetricValue { Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => { write!(f, "{timestamp}") } + Self::Custom { name, value } => { + write!(f, "name:{name} {value}") + } } } } #[cfg(test)] mod tests { + use std::any::Any; + use chrono::TimeZone; use datafusion_execution::memory_pool::units::MB; use super::*; + #[derive(Debug, Default)] + pub struct CustomCounter { + count: AtomicUsize, + } + + impl Display for CustomCounter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "count: {}", self.count.load(Ordering::Relaxed)) + } + } + + impl CustomMetricValue for CustomCounter { + fn new_empty(&self) -> Arc { + Arc::new(CustomCounter::default()) + } + + fn aggregate(&self, other: Arc) { + let other = other.as_any().downcast_ref::().unwrap(); + self.count + .fetch_add(other.count.load(Ordering::Relaxed), Ordering::Relaxed); + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn is_eq(&self, other: &Arc) -> bool { + let Some(other) = other.as_any().downcast_ref::() else { + return false; + }; + + self.count.load(Ordering::Relaxed) == other.count.load(Ordering::Relaxed) + } + } + + fn new_custom_counter(name: &'static str, value: usize) -> MetricValue { + let custom_counter = CustomCounter::default(); + custom_counter.count.fetch_add(value, Ordering::Relaxed); + let custom_val = MetricValue::Custom { + name: Cow::Borrowed(name), + value: Arc::new(custom_counter), + }; + + custom_val + } + + #[test] + fn test_custom_metric_with_mismatching_names() { + let mut custom_val = new_custom_counter("Hi", 1); + let other_custom_val = new_custom_counter("Hello", 1); + + // Not equal since the name differs. + assert!(other_custom_val != custom_val); + + // Should work even though the name differs + custom_val.aggregate(&other_custom_val); + + let expected_val = new_custom_counter("Hi", 2); + assert!(expected_val == custom_val); + } + + #[test] + fn test_custom_metric() { + let mut custom_val = new_custom_counter("hi", 11); + let other_custom_val = new_custom_counter("hi", 20); + + custom_val.aggregate(&other_custom_val); + + assert!(custom_val != other_custom_val); + + if let MetricValue::Custom { value, .. } = custom_val { + let counter = value + .as_any() + .downcast_ref::() + .expect("Expected CustomCounter"); + assert_eq!(counter.count.load(Ordering::Relaxed), 31); + } else { + panic!("Unexpected value"); + } + } + #[test] fn test_display_output_rows() { let count = Count::new();