Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d68f883
feat(metrics): Add Unix domain socket support for DogStatsd
phacops Mar 4, 2026
c90c6f9
ref(metrics): DRY up statsd backend initialization in consumer
phacops Mar 4, 2026
c217abc
ref(metrics): Use cadence BufferedUnixMetricSink instead of custom Un…
phacops Mar 4, 2026
bb6f308
fix(metrics): Restore global tags for UDS statsd backend
phacops Mar 4, 2026
63f5ffc
ref(metrics): Replace cadence and statsdproxy with metrics-exporter-d…
phacops Mar 5, 2026
8451a89
Merge remote-tracking branch 'origin/master' into feat/dogstatsd-uds-…
phacops Mar 5, 2026
198adfa
fix(metrics): Remove double prefix in DogStatsDBackend
phacops Mar 5, 2026
981759b
fix(metrics): Default DOGSTATSD_SOCKET_PATH to None
phacops Mar 5, 2026
d52c39a
fix(metrics): Send histograms as DogStatsD histograms, not distributions
phacops Mar 5, 2026
2dc615f
fix(deps): Resolve sentry-core version split in Cargo.lock
phacops Mar 5, 2026
30cc6ef
build(rust): Use latest stable Rust toolchain
phacops Mar 5, 2026
11d0129
ci: Install libcurl-dev for rdkafka-sys compilation
phacops Mar 5, 2026
76d6792
fix(eap): Handle ProcessingError variant in TraceItemType match
phacops Mar 5, 2026
c3bf5da
fix(rust): Address new clippy lints from stable Rust 1.94
phacops Mar 5, 2026
258a665
fix(build): Upgrade rdkafka-sys to 4.10.0 and fix CI build failures
phacops Mar 5, 2026
7bf2c70
test(rust): Update schema snapshots for new kafka schema fields
phacops Mar 5, 2026
9434bef
fix(deps): Deduplicate sentry_protos to single 0.8.2 version
phacops Mar 5, 2026
a6593d4
fix(metrics): Guard UDS path against None socket path
phacops Mar 5, 2026
02dde3b
chore(deps): Align Cargo.toml versions with lockfile resolutions
phacops Mar 5, 2026
431aab5
feat(metrics): Propagate runtime global tags to DogStatsD metrics
phacops Mar 6, 2026
52b5cde
test(metrics): Add tests for runtime global tags
phacops Mar 6, 2026
4aa3be3
Merge branch 'master' into feat/dogstatsd-uds-support
phacops Mar 6, 2026
89f7802
fix(ci): Add libcurl-dev to docs and bump-version jobs, restore Sentr…
phacops Mar 9, 2026
a7ce0a7
Merge remote-tracking branch 'origin/master' into feat/dogstatsd-uds-…
phacops Mar 9, 2026
61a2e18
fix: Update accepted_outcomes_consumer to use DogStatsDBackend
phacops Mar 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,976 changes: 1,533 additions & 1,443 deletions rust_snuba/Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ crate-type = ["cdylib", "rlib"]
[dependencies]
adler = "1.0.2"
anyhow = { version = "1.0.69", features = ["backtrace"] }
cadence = "1.0.0"
chrono = { version = "0.4.26", features = ["serde"] }
clickhouse = { version = "0.13", features = ["uuid"] }
ctrlc = { version = "3.2.5", features = ["termination"] }
Expand All @@ -33,6 +32,8 @@ futures = "0.3.21"
hyper = "1.2.0"
json-schema-diff = "0.1.7"
md5 = "0.7.0"
metrics = "0.24"
metrics-exporter-dogstatsd = "0.9.6"
parking_lot = "0.12.1"
procspawn = { version = "1.0.0", features = ["json"] }
prost = "0.14"
Expand Down Expand Up @@ -62,7 +63,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
serde_path_to_error = "0.1.15"
serde_with = "3.8.1"
statsdproxy = { version = "0.4.1", features = ["cadence"] }
thiserror = "1.0"
tokio = { version = "1.38.2", features = ["full"] }
tokio-stream = "0.1.15"
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.85.0"
channel = "stable"
2 changes: 2 additions & 0 deletions rust_snuba/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct EnvConfig {
pub sentry_dsn: Option<String>,
pub dogstatsd_host: Option<String>,
pub dogstatsd_port: Option<u16>,
pub dogstatsd_socket_path: Option<String>,
pub default_retention_days: u16,
pub lower_retention_days: u16,
pub valid_retention_days: HashSet<u16>,
Expand All @@ -117,6 +118,7 @@ impl Default for EnvConfig {
sentry_dsn: None,
dogstatsd_host: None,
dogstatsd_port: None,
dogstatsd_socket_path: None,
default_retention_days: 90,
lower_retention_days: 30,
valid_retention_days: [30, 60, 90].iter().cloned().collect(),
Expand Down
41 changes: 32 additions & 9 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::config;
use crate::factory_v2::ConsumerStrategyFactoryV2;
use crate::logging::{setup_logging, setup_sentry};
use crate::metrics::global_tags::set_global_tag;
use crate::metrics::statsd::StatsDBackend;
use crate::metrics::statsd::DogStatsDBackend;
use crate::processors;
use crate::rebalancing;
use crate::types::{InsertOrReplacement, KafkaMessageMetadata};
Expand Down Expand Up @@ -139,20 +139,43 @@ pub fn consumer_impl(
}

// setup arroyo metrics
if let (Some(host), Some(port)) = (
consumer_config.env.dogstatsd_host,
consumer_config.env.dogstatsd_port,
) {
{
let storage_name = consumer_config
.storages
.iter()
.map(|s| s.name.clone())
.collect::<Vec<_>>()
.join(",");
set_global_tag("storage".to_owned(), storage_name);
set_global_tag("consumer_group".to_owned(), consumer_group.to_owned());

metrics::init(StatsDBackend::new(&host, port, "snuba.consumer")).unwrap();
let tags = [
("storage", storage_name.clone()),
("consumer_group", consumer_group.to_owned()),
];

let backend = if let Some(socket_path) = env_config.dogstatsd_socket_path.clone() {
Some(DogStatsDBackend::new_uds(
&socket_path,
"snuba.consumer",
&tags,
))
} else if let (Some(host), Some(port)) = (
consumer_config.env.dogstatsd_host,
consumer_config.env.dogstatsd_port,
) {
Some(DogStatsDBackend::new_udp(
&host,
port,
"snuba.consumer",
&tags,
))
} else {
None
};

if let Some(backend) = backend {
set_global_tag("storage".to_owned(), storage_name);
set_global_tag("consumer_group".to_owned(), consumer_group.to_owned());
metrics::init(backend).unwrap();
}
}

if !use_rust_processor {
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub use config::{
StorageConfig, TopicConfig,
};
pub use factory_v2::ConsumerStrategyFactoryV2;
pub use metrics::statsd::StatsDBackend;
pub use metrics::statsd::DogStatsDBackend;
pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS};
pub use strategies::noop::Noop;
pub use strategies::python::PythonTransformStep;
Expand Down
121 changes: 1 addition & 120 deletions rust_snuba/src/metrics/global_tags.rs
Original file line number Diff line number Diff line change
@@ -1,125 +1,6 @@
use std::collections::BTreeMap;

use parking_lot::RwLock;
use statsdproxy::middleware::Middleware;
use statsdproxy::types::Metric;

static GLOBAL_TAGS: RwLock<BTreeMap<String, String>> = RwLock::new(BTreeMap::new());

/// Sets a tag on the current Sentry scope.
pub fn set_global_tag(key: String, value: String) {
sentry::configure_scope(|scope| {
scope.set_tag(&key, &value);
});
GLOBAL_TAGS.write().insert(key, value);
}

pub struct AddGlobalTags<'a, M> {
next: M,
global_tags: &'a RwLock<BTreeMap<String, String>>,
}

impl<M> AddGlobalTags<'static, M>
where
M: Middleware,
{
pub fn new(next: M) -> Self {
Self::new_with_tagmap(next, &GLOBAL_TAGS)
}
}

impl<'a, M> AddGlobalTags<'a, M>
where
M: Middleware,
{
fn new_with_tagmap(next: M, global_tags: &'a RwLock<BTreeMap<String, String>>) -> Self {
AddGlobalTags { next, global_tags }
}
}

impl<M> Middleware for AddGlobalTags<'_, M>
where
M: Middleware,
{
fn poll(&mut self) {
self.next.poll()
}

fn submit(&mut self, metric: &mut Metric) {
let global_tags = self.global_tags.read();

if global_tags.is_empty() {
return self.next.submit(metric);
}

let mut tag_buffer: Vec<u8> = Vec::new();
let mut add_comma = false;
match metric.tags() {
Some(tags) if !tags.is_empty() => {
tag_buffer.extend(tags);
add_comma = true;
}
_ => (),
}
for (k, v) in global_tags.iter() {
if add_comma {
tag_buffer.push(b',');
}
tag_buffer.extend(k.as_bytes());
tag_buffer.push(b':');
tag_buffer.extend(v.as_bytes());
add_comma = true;
}

metric.set_tags(&tag_buffer);

self.next.submit(metric)
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::cell::RefCell;
use std::collections::BTreeMap;

use statsdproxy::{middleware::Middleware, types::Metric};

struct FnStep<F>(pub F);

impl<F> Middleware for FnStep<F>
where
F: FnMut(&mut Metric),
{
fn submit(&mut self, metric: &mut Metric) {
(self.0)(metric)
}
}

#[test]
fn test_basic() {
let test_cases = [
// Without tags
("users.online:1|c", "users.online:1|c|#env:prod"),
// With tags
(
"users.online:1|c|#tag1:a",
"users.online:1|c|#tag1:a,env:prod",
),
];

for test_case in test_cases {
let results = RefCell::new(vec![]);
let global_tags = RwLock::new(BTreeMap::from([("env".to_owned(), "prod".to_owned())]));

let step = FnStep(|metric: &mut Metric| results.borrow_mut().push(metric.clone()));
let mut middleware = AddGlobalTags::new_with_tagmap(step, &global_tags);

let mut metric = Metric::new(test_case.0.as_bytes().to_vec());
middleware.submit(&mut metric);
assert_eq!(results.borrow().len(), 1);
let updated_metric = Metric::new(results.borrow_mut()[0].raw.clone());
assert_eq!(updated_metric.raw, test_case.1.as_bytes());
}
}
}
125 changes: 81 additions & 44 deletions rust_snuba/src/metrics/statsd.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,93 @@
use std::time::Duration;

use sentry_arroyo::metrics::{Metric, MetricSink, Recorder, StatsdRecorder};
use statsdproxy::cadence::StatsdProxyMetricSink;
use statsdproxy::config::AggregateMetricsConfig;
use statsdproxy::middleware::aggregate::AggregateMetrics;
use statsdproxy::middleware::upstream::Upstream;

use crate::metrics::global_tags::AddGlobalTags;
use metrics::Label;
use metrics_exporter_dogstatsd::DogStatsDBuilder;
use sentry_arroyo::metrics::{Metric, MetricType, MetricValue, Recorder};

/// A metrics backend that uses `metrics-exporter-dogstatsd` to send metrics
/// to DogStatsD over UDP or Unix domain sockets. Adapts arroyo's [`Recorder`]
/// trait to the `metrics` crate facade installed by the exporter.
#[derive(Debug)]
pub struct StatsDBackend {
recorder: StatsdRecorder<Wrapper>,
}
pub struct DogStatsDBackend;

impl Recorder for StatsDBackend {
fn record_metric(&self, metric: Metric<'_>) {
self.recorder.record_metric(metric)
impl DogStatsDBackend {
pub fn new_udp(host: &str, port: u16, prefix: &str, tags: &[(&str, String)]) -> Self {
let addr = format!("{}:{}", host, port);
Self::build(&addr, prefix, tags)
}
}

struct Wrapper(Box<dyn cadence::MetricSink + Send + Sync + 'static>);

impl MetricSink for Wrapper {
fn emit(&self, metric: &str) {
let _ = self.0.emit(metric);
pub fn new_uds(socket_path: &str, prefix: &str, tags: &[(&str, String)]) -> Self {
let addr = format!("unixgram://{}", socket_path);
Self::build(&addr, prefix, tags)
}
}

impl StatsDBackend {
pub fn new(host: &str, port: u16, prefix: &str) -> Self {
let upstream_addr = format!("{}:{}", host, port);
let aggregator_sink = StatsdProxyMetricSink::new(move || {
let upstream = Upstream::new(upstream_addr.clone()).unwrap();
fn build(addr: &str, prefix: &str, tags: &[(&str, String)]) -> Self {
let global_labels: Vec<Label> = tags
.iter()
.map(|(k, v)| Label::new(k.to_string(), v.clone()))
.collect();

let config = AggregateMetricsConfig {
aggregate_counters: true,
flush_offset: 0,
flush_interval: Duration::from_secs(1),
aggregate_gauges: true,
max_map_size: None,
};
let aggregate = AggregateMetrics::new(config, upstream);
DogStatsDBuilder::default()
.with_remote_address(addr)
.expect("invalid DogStatsD address")
.set_global_prefix(prefix)
.with_global_labels(global_labels)
.send_histograms_as_distributions(false)
.install()
.expect("failed to install DogStatsD exporter");

Comment on lines +30 to 38
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: DogStatsDBackend attempts to install a global metrics recorder on every instantiation. Subsequent instantiations in the same process will panic, causing test failures.
Severity: MEDIUM

Suggested Fix

Ensure the metrics recorder installation happens only once per process. This can be achieved by using a once-initialization pattern like OnceCell or LazyLock around the .install() call, or by guarding the call with a check to see if a recorder is already installed.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: rust_snuba/src/metrics/statsd.rs#L30-L38

Potential issue: The `build()` method in `DogStatsDBackend` calls `install()` to set a
global metrics recorder. The `metrics` crate only allows this to be done once per
process. Any subsequent attempt to instantiate `DogStatsDBackend` within the same
process, such as during parallel test execution, will trigger a panic with the message
"failed to install DogStatsD exporter". This will lead to flaky or failing tests and
introduces fragility into the application's metric initialization.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified — not a practical concern. DogStatsDBackend is instantiated exactly once per consumer process via metrics::init(). The single test in this module only creates one backend instance. The install() will only be called once per process lifetime.

— Claude Code

// adding global tags *after* aggregation is more performant than trying to do the same
// in cadence, as it means more bytes and more memory to deal with in
// AggregateMetricsConfig
AddGlobalTags::new(aggregate)
});
Self
}
}

let recorder = StatsdRecorder::new(prefix, Wrapper(Box::new(aggregator_sink)));
impl Recorder for DogStatsDBackend {
fn record_metric(&self, metric: Metric<'_>) {
let key: metrics::SharedString = metric.key.to_string().into();
let labels: Vec<Label> = metric
.tags
.iter()
.map(|(k, v)| Label::new(k.to_string(), v.to_string()))
.collect();
let metadata = metrics::Metadata::new(module_path!(), metrics::Level::INFO, None);
let key = metrics::Key::from_parts(key, labels);

Self { recorder }
match metric.ty {
MetricType::Counter => {
let value = match metric.value {
MetricValue::I64(v) => v as u64,
MetricValue::U64(v) => v,
MetricValue::F64(v) => v as u64,
MetricValue::Duration(d) => d.as_millis() as u64,
_ => return,
};
metrics::with_recorder(|rec| {
rec.register_counter(&key, &metadata).increment(value);
});
}
MetricType::Gauge => {
let value = match metric.value {
MetricValue::I64(v) => v as f64,
MetricValue::U64(v) => v as f64,
MetricValue::F64(v) => v,
MetricValue::Duration(d) => d.as_millis() as f64,
_ => return,
};
metrics::with_recorder(|rec| {
rec.register_gauge(&key, &metadata).set(value);
});
}
MetricType::Timer => {
let value = match metric.value {
MetricValue::I64(v) => v as f64,
MetricValue::U64(v) => v as f64,
MetricValue::F64(v) => v,
MetricValue::Duration(d) => d.as_millis() as f64,
_ => return,
};
metrics::with_recorder(|rec| {
rec.register_histogram(&key, &metadata).record(value);
});
}
_ => {}
}
}
}

Expand All @@ -61,8 +98,8 @@ mod tests {
use super::*;

#[test]
fn statsd_metric_backend() {
let backend = StatsDBackend::new("0.0.0.0", 8125, "test");
fn dogstatsd_metric_backend() {
let backend = DogStatsDBackend::new_udp("0.0.0.0", 8125, "test", &[]);

backend.record_metric(metric!(Counter: "a", 1, "tag1" => "value1"));
backend.record_metric(metric!(Gauge: "b", 20, "tag2" => "value2"));
Expand Down
Loading
Loading