Skip to content

Commit f5d0c56

Browse files
authored
enhancement(observability): Add metrics to measure total event processing time (#24481)
* chore(core): Add `ingest_timestamp` to `EventMetadata` We want to record the time at which events are ingested in order to track latency as each travels through the topology. This is currently recorded for log events using the Vector namespace in the `vector.ingest_timestamp` metadata field, but we want it to be usable for all event types. As such, we need a new field in `struct EventMetadata`. This change adds the new field as an `Option` so as to retain sane semantics for `Default` implementations and to avoid extra calls to `Utc::now`. The `SourceSender::send` method sets this if the source doesn't so as to ensure complete coverage. For backward compatibility, this metadata is still inserted into the Vector log namespace metadata, taken from this new field. Since this metadata is set up before passing the events to the `SourceSender`, the ingest timestamp is set manually in sources that can create Vector namespace logs. * Add new `EwmaGauge` wrapper to handle both in one update step * enhancement(observability): Add metrics to measure total event processing time This adds an optional `trait BufferInstrumentation` hook to `struct BufferSender` which is called at the very start of the buffer send path. We use that hook to take the previously added universal event `ingest_timestamp` metadata and from it calculate the total time spent processing the event, including buffering delays. This time is emitted in internal metrics as a `event_processing_time_seconds` histogram and `event_processing_time_mean_seconds` gauge, the latter using an EWMA to smooth the mean over time. * Update wording on the lognamespacing tutorial docs. * Pre-create metrics to avoid extraneous labels * Fix tests * Move setting ingest timestamp back into `insert_standard_vector_source_metadata` * Assert an upper limit on the average latency gauge too
1 parent 249657b commit f5d0c56

File tree

42 files changed

+673
-67
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+673
-67
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ serial_test = { version = "3.2" }
213213
cfg-if.workspace = true
214214
clap.workspace = true
215215
clap_complete.workspace = true
216+
dashmap.workspace = true
216217
indoc.workspace = true
217218
paste.workspace = true
218219
pin-project.workspace = true
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Added the `event_processing_time_seconds` histogram and `event_processing_time_mean_seconds` gauge internal_metrics,
2+
exposing the total time events spend between the originating source and final sink in a topology.
3+
4+
authors: bruceg

docs/tutorials/lognamespacing.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ separate namespace to the event data.
112112

113113
The actual value to be placed into the field.
114114

115-
For the ingest timestamp this will be `chrono::Utc::now()`. Source type will be
115+
The ingest timestamp should be recorded once per batch (e.g. `let now =
116+
chrono::Utc::now();`) and passed into `insert_standard_vector_source_metadata`,
117+
which takes care of updating the event metadata. The source type will be
116118
the `NAME` property of the `Config` struct. `NAME` is provided by the
117119
`configurable_component` macro. You may need to include `use vector_config::NamedComponent;`.
118120

lib/vector-buffers/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ pub trait Bufferable: InMemoryBufferable + Encodable {}
108108
// Blanket implementation for anything that is already bufferable.
109109
impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
110110

111+
/// Hook for observing items as they are sent into a `BufferSender`.
112+
pub trait BufferInstrumentation<T: Bufferable>: Send + Sync + 'static {
113+
/// Called immediately before the item is emitted to the underlying buffer.
114+
/// The underlying type is stored in an `Arc`, so we cannot have `&mut self`.
115+
fn on_send(&self, item: &T);
116+
}
117+
111118
pub trait EventCount {
112119
fn event_count(&self) -> usize;
113120
}

lib/vector-buffers/src/topology/channel/limited_queue.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,10 @@ use crossbeam_queue::{ArrayQueue, SegQueue};
1616
use futures::Stream;
1717
use metrics::{Gauge, Histogram, gauge, histogram};
1818
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
19-
use vector_common::stats::AtomicEwma;
19+
use vector_common::stats::EwmaGauge;
2020

2121
use crate::{InMemoryBufferable, config::MemoryBufferSize};
2222

23-
/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
24-
/// measure of how much weight to give to the current value versus the previous values. A value of
25-
/// 0.9 results in a "half life" of 6-7 measurements.
26-
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;
27-
2823
/// Error returned by `LimitedSender::send` when the receiver has disconnected.
2924
#[derive(Debug, PartialEq, Eq)]
3025
pub struct SendError<T>(pub T);
@@ -114,8 +109,7 @@ impl ChannelMetricMetadata {
114109
struct Metrics {
115110
histogram: Histogram,
116111
gauge: Gauge,
117-
mean_gauge: Gauge,
118-
ewma: Arc<AtomicEwma>,
112+
mean_gauge: EwmaGauge,
119113
// We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
120114
// since the value is static, we never need to update it. The compiler detects this as an unused
121115
// field, so we need to suppress the warning here.
@@ -150,37 +144,36 @@ impl Metrics {
150144
let histogram_name = format!("{prefix}_utilization");
151145
let gauge_name = format!("{prefix}_utilization_level");
152146
let mean_name = format!("{prefix}_utilization_mean");
153-
let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
154147
#[cfg(test)]
155148
let recorded_values = Arc::new(Mutex::new(Vec::new()));
156149
if let Some(label_value) = output {
157150
let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
158151
max_gauge.set(max_value);
152+
let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
159153
// DEPRECATED: buffer-bytes-events-metrics
160154
let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
161155
legacy_max_gauge.set(max_value);
162156
Self {
163157
histogram: histogram!(histogram_name, "output" => label_value.clone()),
164158
gauge: gauge!(gauge_name, "output" => label_value.clone()),
165-
mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
159+
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
166160
max_gauge,
167-
ewma,
168161
legacy_max_gauge,
169162
#[cfg(test)]
170163
recorded_values,
171164
}
172165
} else {
173166
let max_gauge = gauge!(max_gauge_name);
174167
max_gauge.set(max_value);
168+
let mean_gauge_handle = gauge!(mean_name);
175169
// DEPRECATED: buffer-bytes-events-metrics
176170
let legacy_max_gauge = gauge!(legacy_max_gauge_name);
177171
legacy_max_gauge.set(max_value);
178172
Self {
179173
histogram: histogram!(histogram_name),
180174
gauge: gauge!(gauge_name),
181-
mean_gauge: gauge!(mean_name),
175+
mean_gauge: EwmaGauge::new(mean_gauge_handle, ewma_alpha),
182176
max_gauge,
183-
ewma,
184177
legacy_max_gauge,
185178
#[cfg(test)]
186179
recorded_values,
@@ -192,8 +185,7 @@ impl Metrics {
192185
fn record(&self, value: usize) {
193186
self.histogram.record(value as f64);
194187
self.gauge.set(value as f64);
195-
let avg = self.ewma.update(value as f64);
196-
self.mean_gauge.set(avg);
188+
self.mean_gauge.record(value as f64);
197189
#[cfg(test)]
198190
if let Ok(mut recorded) = self.recorded_values.lock() {
199191
recorded.push(value);

lib/vector-buffers/src/topology/channel/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ mod receiver;
33
mod sender;
44

55
pub use limited_queue::{
6-
ChannelMetricMetadata, DEFAULT_EWMA_ALPHA, LimitedReceiver, LimitedSender, SendError, limited,
6+
ChannelMetricMetadata, LimitedReceiver, LimitedSender, SendError, limited,
77
};
88
pub use receiver::*;
99
pub use sender::*;
10+
pub use vector_common::stats::DEFAULT_EWMA_ALPHA;
1011

1112
#[cfg(test)]
1213
mod tests;

lib/vector-buffers/src/topology/channel/sender.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use vector_common::internal_event::{InternalEventHandle, Registered, register};
88

99
use super::limited_queue::LimitedSender;
1010
use crate::{
11-
Bufferable, WhenFull,
11+
BufferInstrumentation, Bufferable, WhenFull,
1212
buffer_usage_data::BufferUsageHandle,
1313
internal_events::BufferSendDuration,
1414
variants::disk_v2::{self, ProductionFilesystem},
@@ -134,9 +134,11 @@ pub struct BufferSender<T: Bufferable> {
134134
base: SenderAdapter<T>,
135135
overflow: Option<Box<BufferSender<T>>>,
136136
when_full: WhenFull,
137-
instrumentation: Option<BufferUsageHandle>,
137+
usage_instrumentation: Option<BufferUsageHandle>,
138138
#[derivative(Debug = "ignore")]
139139
send_duration: Option<Registered<BufferSendDuration>>,
140+
#[derivative(Debug = "ignore")]
141+
custom_instrumentation: Option<Arc<dyn BufferInstrumentation<T>>>,
140142
}
141143

142144
impl<T: Bufferable> BufferSender<T> {
@@ -146,8 +148,9 @@ impl<T: Bufferable> BufferSender<T> {
146148
base,
147149
overflow: None,
148150
when_full,
149-
instrumentation: None,
151+
usage_instrumentation: None,
150152
send_duration: None,
153+
custom_instrumentation: None,
151154
}
152155
}
153156

@@ -157,8 +160,9 @@ impl<T: Bufferable> BufferSender<T> {
157160
base,
158161
overflow: Some(Box::new(overflow)),
159162
when_full: WhenFull::Overflow,
160-
instrumentation: None,
163+
usage_instrumentation: None,
161164
send_duration: None,
165+
custom_instrumentation: None,
162166
}
163167
}
164168

@@ -174,14 +178,19 @@ impl<T: Bufferable> BufferSender<T> {
174178

175179
/// Configures this sender to instrument the items passing through it.
176180
pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
177-
self.instrumentation = Some(handle);
181+
self.usage_instrumentation = Some(handle);
178182
}
179183

180184
/// Configures this sender to instrument the send duration.
181185
pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) {
182186
let _enter = span.enter();
183187
self.send_duration = Some(register(BufferSendDuration { stage }));
184188
}
189+
190+
/// Configures this sender to invoke a custom instrumentation hook.
191+
pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation<T>) {
192+
self.custom_instrumentation = Some(Arc::new(instrumentation));
193+
}
185194
}
186195

187196
impl<T: Bufferable> BufferSender<T> {
@@ -197,14 +206,17 @@ impl<T: Bufferable> BufferSender<T> {
197206

198207
#[async_recursion]
199208
pub async fn send(&mut self, item: T, send_reference: Option<Instant>) -> crate::Result<()> {
209+
if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
210+
instrumentation.on_send(&item);
211+
}
200212
let item_sizing = self
201-
.instrumentation
213+
.usage_instrumentation
202214
.as_ref()
203215
.map(|_| (item.event_count(), item.size_of()));
204216

205217
let mut was_dropped = false;
206218

207-
if let Some(instrumentation) = self.instrumentation.as_ref()
219+
if let Some(instrumentation) = self.usage_instrumentation.as_ref()
208220
&& let Some((item_count, item_size)) = item_sizing
209221
{
210222
instrumentation
@@ -229,7 +241,7 @@ impl<T: Bufferable> BufferSender<T> {
229241
}
230242
}
231243

232-
if let Some(instrumentation) = self.instrumentation.as_ref()
244+
if let Some(instrumentation) = self.usage_instrumentation.as_ref()
233245
&& let Some((item_count, item_size)) = item_sizing
234246
&& was_dropped
235247
{
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use std::sync::Arc;
2+
3+
use metrics::Gauge;
4+
5+
use super::AtomicEwma;
6+
7+
/// The default alpha parameter used when constructing EWMA-backed gauges.
8+
pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;
9+
10+
/// Couples a [`Gauge`] with an [`AtomicEwma`] so gauge readings reflect the EWMA.
11+
#[derive(Clone, Debug)]
12+
pub struct EwmaGauge {
13+
gauge: Gauge,
14+
// Note that the `Gauge` internally is equivalent to an `Arc<AtomicF64>` so we need to use the
15+
// same semantics for the EWMA calculation as well.
16+
ewma: Arc<AtomicEwma>,
17+
}
18+
19+
impl EwmaGauge {
20+
#[must_use]
21+
pub fn new(gauge: Gauge, alpha: Option<f64>) -> Self {
22+
let alpha = alpha.unwrap_or(DEFAULT_EWMA_ALPHA);
23+
let ewma = Arc::new(AtomicEwma::new(alpha));
24+
Self { gauge, ewma }
25+
}
26+
27+
/// Records a new value, updates the EWMA, and sets the gauge accordingly.
28+
pub fn record(&self, value: f64) {
29+
let average = self.ewma.update(value);
30+
self.gauge.set(average);
31+
}
32+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
#![allow(missing_docs)]
22

3+
pub mod ewma_gauge;
4+
5+
pub use ewma_gauge::{DEFAULT_EWMA_ALPHA, EwmaGauge};
6+
37
use std::sync::atomic::Ordering;
48

59
use crate::atomic::AtomicF64;

0 commit comments

Comments
 (0)