diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 94808ce188509..f965451715df3 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -24,7 +24,7 @@ use mz_ore::task::spawn; use mz_persist::location::Blob; use mz_persist_types::codec_impls::VecU8Schema; use mz_persist_types::{Codec, Codec64}; -use timely::progress::Timestamp; +use timely::progress::{Antichain, Timestamp}; use timely::PartialOrder; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot, TryAcquireError}; @@ -410,6 +410,31 @@ where schemas: Schemas, ) -> Result, anyhow::Error> { let () = Self::validate_req(&req)?; + + // We introduced a fast-path optimization in https://github.com/MaterializeInc/materialize/pull/15363 + // but had to revert it due to a very scary bug. Here we count how many of our compaction reqs + // could be eligible for the optimization to better understand whether it's worth trying to + // reintroduce it. + let mut single_nonempty_batch = None; + for batch in &req.inputs { + if batch.len > 0 { + match single_nonempty_batch { + None => single_nonempty_batch = Some(batch), + Some(_previous_nonempty_batch) => { + single_nonempty_batch = None; + break; + } + } + } + } + if let Some(single_nonempty_batch) = single_nonempty_batch { + if single_nonempty_batch.runs.len() == 0 + && single_nonempty_batch.desc.since() != &Antichain::from_elem(T::minimum()) + { + metrics.compaction.fast_path_eligible.inc(); + } + } + // compaction needs memory enough for at least 2 runs and 2 in-progress parts assert!(cfg.compaction_memory_bound_bytes >= 4 * cfg.batch.blob_target_size); // reserve space for the in-progress part to be held in-mem representation and columnar diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 67493d846a457..ae470cd6c1f3b 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -660,6 +660,7 @@ pub struct CompactionMetrics { pub(crate) not_all_prefetched: IntCounter, pub(crate) parts_prefetched: IntCounter, pub(crate) parts_waited: IntCounter, + pub(crate) fast_path_eligible: IntCounter, pub(crate) applied_exact_match: IntCounter, pub(crate) applied_subset_match: IntCounter, @@ -748,6 +749,10 @@ impl CompactionMetrics { name: "mz_persist_compaction_parts_waited", help: "count of compaction parts that had to be waited on", )), + fast_path_eligible: registry.register(metric!( + name: "mz_persist_compaction_fast_path_eligible", + help: "count of compaction requests that could have used the fast-path optimization", + )), applied_exact_match: registry.register(metric!( name: "mz_persist_compaction_applied_exact_match", help: "count of merge results that exactly replaced a SpineBatch",