diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 683983d9e6979..a284012614a66 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -29,7 +29,8 @@ use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; use crate::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, + self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, + SpillMetrics, }; use crate::projection::{make_with_child, update_expr, ProjectionExec}; use crate::sorts::streaming_merge::StreamingMergeBuilder; @@ -63,6 +64,9 @@ struct ExternalSorterMetrics { baseline: BaselineMetrics, spill_metrics: SpillMetrics, + /// Peak memory used for buffered data. + /// Calculated as sum of peak memory values across partitions + peak_mem_used: metrics::Gauge, } impl ExternalSorterMetrics { @@ -70,6 +74,7 @@ impl ExternalSorterMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), spill_metrics: SpillMetrics::new(metrics, partition), + peak_mem_used: MetricBuilder::new(metrics).gauge("peak_mem_used", partition), } } } @@ -538,6 +543,7 @@ impl ExternalSorter { self.consume_and_spill_append(&mut globally_sorted_batches) .await?; // reservation is freed in spill() } else { + self.metrics.peak_mem_used.set_max(self.used()); globally_sorted_batches.push(batch); } } @@ -658,6 +664,8 @@ impl ExternalSorter { self.reservation .try_resize(get_reserved_byte_for_record_batch(&batch)) .map_err(Self::err_with_oom_context)?; + // TODO(ding-young) can reservation grow here? + self.metrics.peak_mem_used.set_max(self.used()); let reservation = self.reservation.take(); return self.sort_batch_stream(batch, metrics, reservation); } @@ -743,7 +751,7 @@ impl ExternalSorter { ) -> Result<()> { let size = get_reserved_byte_for_record_batch(input); - match self.reservation.try_grow(size) { + let result = match self.reservation.try_grow(size) { Ok(_) => Ok(()), Err(e) => { if self.in_mem_batches.is_empty() { @@ -756,7 +764,9 @@ impl ExternalSorter { .try_grow(size) .map_err(Self::err_with_oom_context) } - } + }; + self.metrics.peak_mem_used.set_max(self.used()); + result } /// Wraps the error with a context message suggesting settings to tweak.