diff --git a/CHANGELOG.md b/CHANGELOG.md index 9926f74f206..c206280c77a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +**Breaking Changes**: + +- Use a more mature (dog)statsd backend, with proper support for unix sockets and reservoir sampling. + Due to the new backend, all sampling and tag filtering configuration options have been removed. ([#5675](https://github.com/getsentry/relay/pull/5675)) + **Bug Fixes**: - Prevent minidump compression bomb. ([#5613](https://github.com/getsentry/relay/pull/5613)) diff --git a/Cargo.lock b/Cargo.lock index 8dbe813120a..2b7f1cf0681 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -545,15 +545,6 @@ dependencies = [ "libbz2-rs-sys", ] -[[package]] -name = "cadence" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3075f133bee430b7644c54fb629b9b4420346ffa275a45c81a6babe8b09b4f51" -dependencies = [ - "crossbeam-channel", -] - [[package]] name = "cast" version = "0.3.0" @@ -1313,6 +1304,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "enum-as-inner" version = "0.6.1" @@ -2709,6 +2706,51 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-dogstatsd" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "961f3712d8a7cfe14caaf74c3af503fe701cee6439ff49a7a3ebd04bf49c0502" +dependencies = [ + "bytes", + "itoa", + "metrics", + "metrics-util", + "ryu", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.16.1", + "indexmap", + "metrics", + "ordered-float", + "quanta", + "radix_trie", + "rand 0.9.2", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mimalloc" version = "0.1.48" @@ -2884,6 +2926,15 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.29.0" @@ -3254,6 +3305,15 @@ dependencies = [ "thiserror 2.0.17", ] +[[package]] +name = "ordered-float" +version = "5.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4779c6901a562440c3786d08192c6fbda7c1c2060edd10006b05ee35d10f2d" +dependencies = [ + "num-traits", +] + [[package]] name = "os_info" version = "3.9.0" @@ -3762,6 +3822,21 @@ dependencies = [ "cc", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -3794,6 +3869,16 @@ dependencies = [ "scheduled-thread-pool", ] +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -3871,6 +3956,15 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.3", +] + [[package]] name = "range-map" version = "0.2.0" @@ -3880,6 +3974,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.9.4", +] + [[package]] name = "rayon" version = "1.10.0" @@ -4747,12 +4850,9 @@ dependencies = [ name = "relay-statsd" version = "26.2.1" dependencies = [ - "cadence", - "crossbeam-channel", - "parking_lot", - "rand 0.9.2", + "metrics", + "metrics-exporter-dogstatsd", "relay-log", - "statsdproxy", ] [[package]] @@ -5619,6 +5719,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.9" @@ -5919,20 +6025,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "statsdproxy" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ad88f372ad4e60c8235f74feb2fcc047377c55b1872d0b9247b8a6aed8dcba9" -dependencies = [ - "anyhow", - "cadence", - "crc32fast", - "log", - "rand 0.8.5", - "thread_local", -] - [[package]] name = "string_cache" version = "0.8.7" diff --git a/Cargo.toml b/Cargo.toml index 171ff87b77b..06d7b976525 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,7 +95,6 @@ brotli = "7.0.0" bytecount = "0.6.9" bytes = "1.10.1" bzip2 = "0.6.0" -cadence = "1.6.0" chrono = { version = "0.4.42", default-features = false, features = [ "std", "serde", @@ -106,7 +105,6 @@ cmake = "0.1.54" console = "0.15.11" cookie = "0.18.1" criterion = "0.5.1" -crossbeam-channel = "0.5.15" data-encoding = "2.9.0" deadpool = "0.12.3" debugid = "0.8.0" @@ -144,6 +142,8 @@ mimalloc = { version = "0.1.48", features = ["v3"] } mime = "0.3.17" minidump = "0.26.0" multer = "3.1.0" +metrics = "0.24.3" +metrics-exporter-dogstatsd = "0.9.6" num-traits = "0.2.19" num_cpus = "1.17.0" objectstore-client = "0.0.18" @@ -205,7 +205,6 @@ smallvec = { version = "1.15.1", features = ["serde"] } socket2 = "0.6.2" sqlparser = "0.60.0" sqlx = { version = "0.8.6", default-features = false } -statsdproxy = { version = "0.4.1", default-features = false } symbolic-common = { version = "12.12.3", default-features = false } symbolic-unreal = { version = "12.12.3", default-features = false } syn = { version = "2.0.106" } diff --git a/relay-cardinality/src/redis/cache.rs b/relay-cardinality/src/redis/cache.rs index fe94d5c78af..bb304d7eb3b 100644 --- a/relay-cardinality/src/redis/cache.rs +++ b/relay-cardinality/src/redis/cache.rs @@ -152,7 +152,7 @@ impl Inner { .extract_if(|scope, cache| cache.current_slot < scope.active_slot(ts)) .count() }); - metric!(counter(CardinalityLimiterCounters::RedisCacheVacuum) += expired as i64); + metric!(counter(CardinalityLimiterCounters::RedisCacheVacuum) += expired as u64); } } diff --git a/relay-cardinality/src/redis/state.rs b/relay-cardinality/src/redis/state.rs index 281aa9d8ade..213500b9da5 100644 --- a/relay-cardinality/src/redis/state.rs +++ b/relay-cardinality/src/redis/state.rs @@ -6,7 +6,7 @@ use crate::{ CardinalityLimit, limiter::{Entry, EntryId, Scoping}, redis::quota::{PartialQuotaScoping, QuotaScoping}, - statsd::{CardinalityLimiterCounters, CardinalityLimiterSets}, + statsd::CardinalityLimiterCounters, }; /// Internal state combining relevant entries for the respective quotas. @@ -27,16 +27,14 @@ pub struct LimitState<'a> { /// The original cardinality limit. cardinality_limit: &'a CardinalityLimit, - /// The original/full scoping. - scoping: Scoping, /// Amount of cache hits. - cache_hits: i64, + cache_hits: u64, /// Amount of cache misses. - cache_misses: i64, + cache_misses: u64, /// Amount of accepts, - accepts: i64, + accepts: u64, /// Amount of rejections. - rejections: i64, + rejections: u64, } impl<'a> LimitState<'a> { @@ -49,7 +47,6 @@ impl<'a> LimitState<'a> { scopes: BTreeMap::new(), limit: cardinality_limit.limit, cardinality_limit, - scoping, cache_hits: 0, cache_misses: 0, accepts: 0, @@ -161,15 +158,6 @@ impl Drop for LimitState<'_> { id = &self.cardinality_limit.id, passive = passive, ); - - let organization_id = self.scoping.organization_id; - let status = if self.rejections > 0 { "limited" } else { "ok" }; - metric!( - set(CardinalityLimiterSets::Organizations) = organization_id.value() as i64, - id = &self.cardinality_limit.id, - passive = passive, - status = status, - ) } } diff --git a/relay-cardinality/src/statsd.rs b/relay-cardinality/src/statsd.rs index 3061e51cb43..40f4936fd40 100644 --- a/relay-cardinality/src/statsd.rs +++ b/relay-cardinality/src/statsd.rs @@ -1,6 +1,6 @@ use relay_statsd::TimerMetric; #[cfg(feature = "redis")] -use relay_statsd::{CounterMetric, DistributionMetric, SetMetric}; +use relay_statsd::{CounterMetric, DistributionMetric}; /// Counter metrics for the Relay Cardinality Limiter. #[cfg(feature = "redis")] @@ -113,25 +113,3 @@ impl DistributionMetric for CardinalityLimiterDistributions { } } } - -#[cfg(feature = "redis")] -pub enum CardinalityLimiterSets { - /// Set containing all organizations which have had any metric sent through the cardinality - /// limiter. - /// - /// This metric is tagged with: - /// - `id`: The id of the enforced limit. - /// - `status`: Wether the organization was cardinality limited. - #[cfg(feature = "redis")] - Organizations, -} - -#[cfg(feature = "redis")] -impl SetMetric for CardinalityLimiterSets { - fn name(&self) -> &'static str { - match *self { - #[cfg(feature = "redis")] - Self::Organizations => "cardinality.limiter.organizations", - } - } -} diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index abd9d491551..c6c53f6d161 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -569,28 +569,11 @@ pub struct Metrics { pub default_tags: BTreeMap, /// Tag name to report the hostname to for each metric. Defaults to not sending such a tag. pub hostname_tag: Option, - /// Global sample rate for all emitted metrics between `0.0` and `1.0`. - /// - /// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent. - /// Defaults to `1.0` (100%). - pub sample_rate: f64, /// Interval for periodic metrics emitted from Relay. /// /// Setting it to `0` seconds disables the periodic metrics. /// Defaults to 5 seconds. pub periodic_secs: u64, - /// Whether local metric aggregation using statdsproxy should be enabled. - /// - /// Defaults to `true`. - pub aggregate: bool, - /// Allows emission of metrics with high cardinality tags. - /// - /// High cardinality tags are dynamic values attached to metrics, - /// such as project IDs. When enabled, these tags will be included - /// in the emitted metrics. When disabled, the tags will be omitted. - /// - /// Defaults to `false`. - pub allow_high_cardinality_tags: bool, } impl Default for Metrics { @@ -601,10 +584,7 @@ impl Default for Metrics { prefix: "sentry.relay".into(), default_tags: BTreeMap::new(), hostname_tag: None, - sample_rate: 1.0, periodic_secs: 5, - aggregate: true, - allow_high_cardinality_tags: false, } } } @@ -2191,21 +2171,6 @@ impl Config { self.values.metrics.hostname_tag.as_deref() } - /// Returns the global sample rate for all metrics. - pub fn metrics_sample_rate(&self) -> f64 { - self.values.metrics.sample_rate - } - - /// Returns whether local metric aggregation should be enabled. - pub fn metrics_aggregate(&self) -> bool { - self.values.metrics.aggregate - } - - /// Returns whether high cardinality tags should be removed before sending metrics. - pub fn metrics_allow_high_cardinality_tags(&self) -> bool { - self.values.metrics.allow_high_cardinality_tags - } - /// Returns the interval for periodic metrics emitted from Relay. /// /// `None` if periodic metrics are disabled. diff --git a/relay-kafka/src/producer/utils.rs b/relay-kafka/src/producer/utils.rs index 9f048e8e481..7cf8185c9bd 100644 --- a/relay-kafka/src/producer/utils.rs +++ b/relay-kafka/src/producer/utils.rs @@ -94,7 +94,7 @@ impl ClientContext for Context { /// /// This method is only called if `statistics.interval.ms` is configured. fn stats(&self, statistics: rdkafka::Statistics) { - let producer_name = &self.producer_name; + let producer_name = self.producer_name.as_str(); relay_statsd::metric!( gauge(KafkaGauges::MessageCount) = statistics.msg_cnt, @@ -206,7 +206,7 @@ impl ProducerContext for Context { metric!( counter(KafkaCounters::ProduceStatusSuccess) += 1, topic = message.topic(), - producer_name = &self.producer_name + producer_name = self.producer_name.as_str(), ); } Err((error, message)) => { @@ -220,7 +220,7 @@ impl ProducerContext for Context { metric!( counter(KafkaCounters::ProduceStatusError) += 1, topic = message.topic(), - producer_name = &self.producer_name + producer_name = self.producer_name.as_str(), ); } } diff --git a/relay-log/src/setup.rs b/relay-log/src/setup.rs index 59df4e4d4eb..5ee602e6b52 100644 --- a/relay-log/src/setup.rs +++ b/relay-log/src/setup.rs @@ -269,6 +269,7 @@ fn get_default_filters() -> EnvFilter { tower_http=TRACE,\ trust_dns_proto=WARN,\ minidump=ERROR,\ + metrics_exporter_dogstatsd::forwarder::sync=OFF,\ ", ); diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 4cd7100ac70..733aa4b7613 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -430,7 +430,7 @@ fn emit_envelope_metrics(envelope: &Envelope) { is_container = is_container, ); metric!( - counter(RelayCounters::EnvelopeItems) += item.item_count().unwrap_or(1), + counter(RelayCounters::EnvelopeItems) += item.item_count().unwrap_or(1) as u64, item_type = item_type, is_container = is_container, sdk = client_name, diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index f124c7b6902..087719bab0e 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -79,13 +79,13 @@ impl PolymorphicEnvelopeBuffer { /// Adds an envelope to the buffer. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { relay_statsd::metric!( - distribution(RelayDistributions::BufferEnvelopeBodySize, sample = 0.01) = + distribution(RelayDistributions::BufferEnvelopeBodySize) = envelope.items().map(Item::len).sum::() as u64, partition_id = self.partition_tag() ); relay_statsd::metric!( - timer(RelayTimers::BufferPush, sample = 0.01), + timer(RelayTimers::BufferPush), partition_id = self.partition_tag(), { match self { @@ -99,7 +99,7 @@ impl PolymorphicEnvelopeBuffer { /// Returns a reference to the next-in-line envelope. pub async fn peek(&mut self) -> Result { relay_statsd::metric!( - timer(RelayTimers::BufferPeek, sample = 0.01), + timer(RelayTimers::BufferPeek), partition_id = self.partition_tag(), { match self { @@ -113,7 +113,7 @@ impl PolymorphicEnvelopeBuffer { /// Pops the next-in-line envelope. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { relay_statsd::metric!( - timer(RelayTimers::BufferPop, sample = 0.01), + timer(RelayTimers::BufferPop), partition_id = self.partition_tag(), { match self { diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 9d45cf00075..19640b39e6a 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -159,7 +159,7 @@ impl EnvelopeStack for SqliteEnvelopeStack { } let encoded_envelope = relay_statsd::metric!( - timer(RelayTimers::BufferEnvelopesSerialization, sample = 0.01), + timer(RelayTimers::BufferEnvelopesSerialization), partition_id = &self.partition_tag, { DatabaseEnvelope::try_from(envelope.as_ref())? } ); diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index 9e4dd4a935a..6af40b586f0 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -113,12 +113,9 @@ impl TryFrom for Box { } = value; if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) { - relay_statsd::metric!( - timer(RelayTimers::BufferEnvelopeDecompression, sample = 0.01), - { - encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice(); - } - ); + relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), { + encoded_envelope = zstd::decode_all(&*encoded_envelope)?.into_boxed_slice(); + }); } let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?; @@ -144,19 +141,16 @@ impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope { let serialized_envelope = value.to_vec()?; relay_statsd::metric!( - distribution(RelayDistributions::BufferEnvelopeSize, sample = 0.01) = - serialized_envelope.len() as u64 + distribution(RelayDistributions::BufferEnvelopeSize) = serialized_envelope.len() as u64 ); - let encoded_envelope = relay_statsd::metric!( - timer(RelayTimers::BufferEnvelopeCompression, sample = 0.01), - { zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)? } - ); + let encoded_envelope = + relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), { + zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)? + }); relay_statsd::metric!( - distribution( - RelayDistributions::BufferEnvelopeSizeCompressed, - sample = 0.01 - ) = encoded_envelope.len() as u64 + distribution(RelayDistributions::BufferEnvelopeSizeCompressed) = + encoded_envelope.len() as u64 ); Ok(DatabaseEnvelope { diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index c85346398b6..985d7849228 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -492,7 +492,7 @@ impl EnvelopeBufferService { buffer.mark_ready(&own_key, false); relay_statsd::metric!( counter(RelayCounters::BufferProjectPending) += 1, - partition_id = &partition_tag + partition_id = partition_tag ); return Ok(()); @@ -511,7 +511,7 @@ impl EnvelopeBufferService { buffer.mark_ready(&sampling_key, false); relay_statsd::metric!( counter(RelayCounters::BufferProjectPending) += 1, - partition_id = &partition_tag + partition_id = partition_tag ); return Ok(()); diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index c5b413b1e3f..fdf5ac7f593 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -39,7 +39,7 @@ impl CogsService { let amount = match measurement.value { relay_cogs::Value::Time(duration) => { - duration.as_micros().try_into().unwrap_or(i64::MAX) + duration.as_micros().try_into().unwrap_or(u64::MAX) } }; diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index cc7ac4c47f8..b86553dbfc3 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -112,9 +112,6 @@ trait TrackOutcomeLike { /// Returns the number of items for that outcome. fn quantity(&self) -> Option; - /// The project id for the outcomes. - fn project_id(&self) -> ProjectId; - /// The category for the outcome. fn category(&self) -> DataCategory; } @@ -153,10 +150,6 @@ impl TrackOutcomeLike for TrackOutcome { Some(self.quantity) } - fn project_id(&self) -> ProjectId { - self.scoping.project_id - } - fn category(&self) -> DataCategory { self.category } @@ -885,10 +878,6 @@ impl TrackOutcomeLike for TrackRawOutcome { self.quantity } - fn project_id(&self) -> ProjectId { - self.project_id - } - fn category(&self) -> DataCategory { match self.category { Some(cat) => DataCategory::try_from(cat).unwrap_or(DataCategory::Unknown), @@ -1169,9 +1158,7 @@ impl FromMessage for OutcomeProducer { fn send_outcome_metric(message: &impl TrackOutcomeLike, to: &'static str) { if let Some(quantity) = message.quantity() { metric!( - counter(RelayCounters::OutcomeQuantity) += quantity, - hc.project_id = message.project_id().to_string().as_str(), - hc.reason = message.reason().as_deref().unwrap_or(""), + counter(RelayCounters::OutcomeQuantity) += quantity.into(), category = message.category().name(), outcome = message.tag_name(), to = to, @@ -1180,8 +1167,6 @@ fn send_outcome_metric(message: &impl TrackOutcomeLike, to: &'static str) { metric!( counter(RelayCounters::Outcomes) += 1, reason = message.reason().as_deref().unwrap_or(""), - hc.category = message.category().name(), - hc.project_id = message.project_id().to_string().as_str(), outcome = message.tag_name(), to = to, ); diff --git a/relay-statsd/Cargo.toml b/relay-statsd/Cargo.toml index e321611ead4..073621dc845 100644 --- a/relay-statsd/Cargo.toml +++ b/relay-statsd/Cargo.toml @@ -13,12 +13,10 @@ publish = false workspace = true [dependencies] -cadence = { workspace = true } -crossbeam-channel = { workspace = true } -parking_lot = { workspace = true } -rand = { workspace = true } relay-log = { workspace = true } -statsdproxy = { workspace = true, features = ["cadence"] } + +metrics = { workspace = true } +metrics-exporter-dogstatsd = { workspace = true } [features] default = [] diff --git a/relay-statsd/src/lib.rs b/relay-statsd/src/lib.rs index 7e4b990af59..e1e4e144f86 100644 --- a/relay-statsd/src/lib.rs +++ b/relay-statsd/src/lib.rs @@ -17,21 +17,17 @@ //! or the [`metric!`] macro will become a noop. Only when configured, metrics will actually be //! collected. //! -//! To initialize the client, either use [`set_client`] to pass a custom client, or use -//! [`init`] to create a default client with known arguments: +//! To initialize the client, use [`init`] to create a default client with known arguments: //! //! ```no_run //! # use std::collections::BTreeMap; -//! # use relay_statsd::MetricsClientConfig; +//! # use relay_statsd::MetricsConfig; //! -//! relay_statsd::init(MetricsClientConfig { -//! prefix: "myprefix", +//! relay_statsd::init(MetricsConfig { +//! prefix: "myprefix".to_owned(), //! host: "localhost:8125".to_owned(), //! buffer_size: None, //! default_tags: BTreeMap::new(), -//! sample_rate: 1.0.into(), -//! aggregate: true, -//! allow_high_cardinality_tags: false //! }); //! ``` //! @@ -53,334 +49,86 @@ //! //! metric!(counter(MyCounter) += 1); //! ``` -//! -//! ## Manual Usage -//! -//! ``` -//! use relay_statsd::prelude::*; -//! -//! relay_statsd::with_client(|client| { -//! client.count("mymetric", 1).ok(); -//! }); -//! ``` -//! //! [Metric Types]: https://github.com/statsd/statsd/blob/master/docs/metric_types.md -use rand::Rng; -pub use statsdproxy::config::DenyTagConfig; - -use cadence::{Metric, MetricBuilder, StatsdClient}; -use parking_lot::RwLock; -use rand::distr::StandardUniform; -use statsdproxy::cadence::StatsdProxyMetricSink; -use statsdproxy::config::AggregateMetricsConfig; -use statsdproxy::middleware::deny_tag::DenyTag; -use std::collections::BTreeMap; -use std::ops::{Deref, DerefMut}; -use std::sync::Arc; -use std::time::Duration; +use metrics_exporter_dogstatsd::{AggregationMode, BuildError, DogStatsDBuilder}; -mod upstream; +use std::{collections::BTreeMap, fmt}; -/// Maximum number of metric events that can be queued before we start dropping them -const METRICS_MAX_QUEUE_SIZE: usize = 100_000; +use crate::mock::MockRecorder; -#[derive(Debug, Clone, Copy)] -pub struct SampleRate(f64); +mod mock; -impl From for SampleRate { - fn from(value: f64) -> Self { - Self(value.clamp(0.0, 1.0)) - } -} - -impl From for f64 { - fn from(val: SampleRate) -> Self { - val.0 - } -} - -/// Client configuration object to store globally. -#[derive(Debug)] -pub struct MetricsClient { - /// The raw statsd client. - pub statsd_client: StatsdClient, - /// Default tags to apply to every metric. - pub default_tags: BTreeMap, - /// Global sample rate. - pub default_sample_rate: SampleRate, - /// Receiver for external listeners. - /// - /// Only available when the client was initialized with `init_basic`. - pub rx: Option>>, +#[doc(hidden)] +pub mod _metrics { + pub use ::metrics::*; } -/// Client configuration used for initialization of [`MetricsClient`]. +/// Client configuration used for initialization of the metrics sub-system. #[derive(Debug)] -pub struct MetricsClientConfig<'a> { +pub struct MetricsConfig { /// Prefix which is appended to all metric names. - pub prefix: &'a str, + pub prefix: String, /// Host of the metrics upstream. pub host: String, /// The buffer size to use for the socket. pub buffer_size: Option, /// Tags that are added to all metrics. pub default_tags: BTreeMap, - /// Default sample rate for metrics, between 0.0 (= 0%) and 1.0 (= 100%) - pub sample_rate: SampleRate, - /// If metrics should be batched or send immediately upstream. - pub aggregate: bool, - /// If high cardinality tags should be removed from metrics. - pub allow_high_cardinality_tags: bool, } -impl Deref for MetricsClient { - type Target = StatsdClient; +/// Error returned from [`init`]. +#[derive(Debug)] +pub struct Error(BuildError); - fn deref(&self) -> &StatsdClient { - &self.statsd_client +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) } } -impl DerefMut for MetricsClient { - fn deref_mut(&mut self) -> &mut StatsdClient { - &mut self.statsd_client - } -} +impl std::error::Error for Error {} -impl MetricsClient { - /// Send a metric with the default tags defined on this `MetricsClient`. - #[inline(always)] - pub fn send_metric<'a, T>(&'a self, metric: MetricBuilder<'a, '_, T>) - where - T: Metric + From, - { - self.send_metric_with_sample_rate(metric, None) +impl From for Error { + fn from(value: BuildError) -> Self { + Self(value) } - - /// Send a metric with an explicit sample rate that overrides the global sample rate. - #[doc(hidden)] - #[inline(always)] - pub fn send_metric_with_sample_rate<'a, T>( - &'a self, - mut metric: MetricBuilder<'a, '_, T>, - sample_rate: Option, - ) where - T: Metric + From, - { - let effective_sample_rate = match sample_rate { - Some(sample_rate) => sample_rate.0.min(self.default_sample_rate.0), - None => self.default_sample_rate.0, - }; - if !Self::should_send(effective_sample_rate) { - return; - } - - for (k, v) in &self.default_tags { - metric = metric.with_tag(k, v); - } - - if effective_sample_rate < 1.0 { - metric = metric.with_sampling_rate(effective_sample_rate); - } - - if let Err(error) = metric.try_send() { - relay_log::error!( - error = &error as &dyn std::error::Error, - maximum_capacity = METRICS_MAX_QUEUE_SIZE, - "Error sending a metric", - ); - } - } - - fn should_send(sample_rate: f64) -> bool { - if sample_rate <= 0.0 { - false - } else if sample_rate >= 1.0 { - true - } else { - // Using thread local RNG and uniform distribution here because Rng::gen_range is - // "optimized for the case that only a single sample is made from the given range". - // See https://docs.rs/rand/0.7.3/rand/distributions/uniform/struct.Uniform.html for more - // details. - let mut rng = rand::rng(); - let s: f64 = rng.sample(StandardUniform); - s <= sample_rate - } - } -} - -static METRICS_CLIENT: RwLock>> = RwLock::new(None); - -thread_local! { - static CURRENT_CLIENT: std::cell::RefCell>> = METRICS_CLIENT.read().clone().into(); -} - -/// Internal prelude for the macro -#[doc(hidden)] -pub mod _pred { - pub use cadence::prelude::*; -} - -/// The metrics prelude that is necessary to use the client. -pub mod prelude { - pub use cadence::prelude::*; -} - -/// Set a new statsd client. -pub fn set_client(client: MetricsClient) { - *METRICS_CLIENT.write() = Some(Arc::new(client)); - CURRENT_CLIENT.with(|cell| cell.replace(METRICS_CLIENT.read().clone())); } /// Set a test client for the period of the called function (only affects the current thread). -// TODO: replace usages with `init_basic` pub fn with_capturing_test_client(f: impl FnOnce()) -> Vec { - with_capturing_test_client_sample_rate(1.0, f) -} - -/// Set a test client with a custom global sample rate for the period of the called function. -#[doc(hidden)] -pub fn with_capturing_test_client_sample_rate(sample_rate: f64, f: impl FnOnce()) -> Vec { - let (rx, sink) = cadence::SpyMetricSink::new(); - let test_client = MetricsClient { - statsd_client: StatsdClient::from_sink("", sink), - default_tags: Default::default(), - default_sample_rate: sample_rate.into(), - rx: None, - }; - - CURRENT_CLIENT.with(|cell| { - let old_client = cell.replace(Some(Arc::new(test_client))); - f(); - cell.replace(old_client); - }); - - rx.iter().map(|x| String::from_utf8(x).unwrap()).collect() -} - -// Setup a simple metrics listener. -// -// Returns `None` if the global metrics client has already been configured. -pub fn init_basic() -> Option>> { - CURRENT_CLIENT.with(|cell| { - if cell.borrow().is_none() { - // Setup basic observable metrics sink. - let (receiver, sink) = cadence::SpyMetricSink::new(); - let test_client = MetricsClient { - statsd_client: StatsdClient::from_sink("", sink), - default_tags: Default::default(), - default_sample_rate: 1.0.into(), - rx: Some(receiver.clone()), - }; - cell.replace(Some(Arc::new(test_client))); - } - }); - - CURRENT_CLIENT.with(|cell| { - cell.borrow() - .as_deref() - .and_then(|client| match &client.rx { - Some(rx) => Some(rx.clone()), - None => { - relay_log::error!("Metrics client was already set up."); - None - } - }) - }) -} - -/// Disable the client again. -pub fn disable() { - *METRICS_CLIENT.write() = None; + let recorder = MockRecorder::default(); + metrics::with_local_recorder(&recorder, f); + recorder.consume() } /// Tell the metrics system to report to statsd. -pub fn init(config: MetricsClientConfig<'_>) -> std::io::Result<()> { +pub fn init(config: MetricsConfig) -> Result<(), Error> { relay_log::info!("reporting metrics to statsd at {}", config.host); - // Probe the connection to catch misconfigurations early. - if let Err(err) = upstream::Upstream::connect(&config.host, config.buffer_size) { - relay_log::error!( - error = &err as &dyn std::error::Error, - "failed to connect to statsd sink at {}", - config.host - ); - return Err(err); - } - - let sample_rate: f64 = config.sample_rate.into(); - relay_log::debug!( - "metrics sample rate is set to {sample_rate}{}", - if sample_rate == 0.0 { - ", no metrics will be reported" - } else { - "" - } - ); - - let deny_config = DenyTagConfig { - starts_with: match config.allow_high_cardinality_tags { - true => vec![], - false => vec!["hc.".to_owned()], - }, - tags: vec![], - ends_with: vec![], - }; - - let statsd_client = if config.aggregate { - let statsdproxy_sink = StatsdProxyMetricSink::new(move || { - let aggregate = statsdproxy::middleware::aggregate::AggregateMetrics::new( - AggregateMetricsConfig { - aggregate_gauges: true, - aggregate_counters: true, - flush_interval: Duration::from_millis(50), - flush_offset: 0, - max_map_size: None, - }, - upstream::TryUpstream::connect(&config.host, config.buffer_size), - ); - - DenyTag::new(deny_config.clone(), aggregate) - }); - - StatsdClient::from_sink(config.prefix, statsdproxy_sink) - } else { - let statsdproxy_sink = StatsdProxyMetricSink::new(move || { - let upstream = upstream::TryUpstream::connect(&config.host, config.buffer_size); - DenyTag::new(deny_config.clone(), upstream) - }); - StatsdClient::from_sink(config.prefix, statsdproxy_sink) + let default_labels = config + .default_tags + .into_iter() + .map(|(key, value)| metrics::Label::new(key, value)) + .collect(); + + let mut statsd = DogStatsDBuilder::default() + .with_remote_address(&config.host)? + .with_telemetry(true) + .with_aggregation_mode(AggregationMode::Aggressive) + .send_histograms_as_distributions(true) + .with_histogram_sampling(true) + .set_global_prefix(config.prefix) + .with_global_labels(default_labels); + + if let Some(buffer_size) = config.buffer_size { + statsd = statsd.with_maximum_payload_length(buffer_size)?; }; - set_client(MetricsClient { - statsd_client, - default_tags: config.default_tags, - default_sample_rate: config.sample_rate, - rx: None, - }); + statsd.install()?; Ok(()) } -/// Invoke a callback with the current statsd client. -/// -/// If statsd is not configured the callback is not invoked. For the most part -/// the [`metric!`] macro should be used instead. -#[inline(always)] -pub fn with_client(f: F) -> R -where - F: FnOnce(&MetricsClient) -> R, - R: Default, -{ - CURRENT_CLIENT.with(|client| { - if let Some(client) = client.borrow().as_deref() { - f(client) - } else { - R::default() - } - }) -} - /// A metric for capturing timings. /// /// Timings are a positive number of milliseconds between a start and end time. Examples include @@ -434,18 +182,6 @@ where /// process_a(); /// } /// ); -/// -/// // use an explicit sample rate that overrides the global rate -/// -/// // the override will be capped by the globally configured sample rate. -/// metric!(timer(MyTimer::ProcessA, sample = 0.01) = start_time.elapsed()); -/// -/// // timed block with explicit sample rate -/// -/// // the override will be capped by the globally configured sample rate. -/// metric!(timer(MyTimer::ProcessA, sample = 0.01), { -/// process_a(); -/// }); /// ``` pub trait TimerMetric { /// Returns the timer metric name that will be sent to statsd. @@ -481,7 +217,7 @@ pub trait TimerMetric { /// /// // add to the counter /// metric!(counter(MyCounter::TotalRequests) += 1); -/// metric!(counter(MyCounter::TotalBytes) += buffer.len() as i64); +/// metric!(counter(MyCounter::TotalBytes) += buffer.len() as u64); /// /// // add to the counter and provide tags /// metric!( @@ -489,16 +225,6 @@ pub trait TimerMetric { /// server = "s1", /// host = "h1" /// ); -/// -/// // subtract from the counter -/// metric!(counter(MyCounter::TotalRequests) -= 1); -/// -/// // subtract from the counter and provide tags -/// metric!( -/// counter(MyCounter::TotalRequests) -= 1, -/// server = "s1", -/// host = "h1" -/// ); /// ``` pub trait CounterMetric { /// Returns the counter metric name that will be sent to statsd. @@ -536,58 +262,12 @@ pub trait CounterMetric { /// server = "server1", /// host = "host1", /// ); -/// -/// // record with an explicit sample rate that overrides the global rate -/// metric!(distribution(QueueSize, sample = 0.01) = queue.len() as u64); /// ``` pub trait DistributionMetric { /// Returns the distribution metric name that will be sent to statsd. fn name(&self) -> &'static str; } -/// A metric for capturing sets. -/// -/// Sets count the number of unique elements in a group. You can use them to, for example, count the -/// unique visitors to your site. -/// -/// ## Example -/// -/// ``` -/// use relay_statsd::{metric, SetMetric}; -/// -/// enum MySet { -/// UniqueProjects, -/// UniqueUsers, -/// } -/// -/// impl SetMetric for MySet { -/// fn name(&self) -> &'static str { -/// match self { -/// MySet::UniqueProjects => "unique_projects", -/// MySet::UniqueUsers => "unique_users", -/// } -/// } -/// } -/// -/// # use std::collections::HashSet; -/// let users = HashSet::new(); -/// # let _hint: &HashSet<()> = &users; -/// -/// // use a set metric -/// metric!(set(MySet::UniqueUsers) = users.len() as i64); -/// -/// // use a set metric with tags -/// metric!( -/// set(MySet::UniqueUsers) = users.len() as i64, -/// server = "server1", -/// host = "host1", -/// ); -/// ``` -pub trait SetMetric { - /// Returns the set metric name that will be sent to statsd. - fn name(&self) -> &'static str; -} - /// A metric for capturing gauges. /// /// Gauge values are an instantaneous measurement of a value determined by the client. They do not @@ -620,130 +300,99 @@ pub trait SetMetric { /// server = "server1", /// host = "host1" /// ); +/// +/// // subtract from the gauge +/// metric!(gauge(QueueSize) -= 1); +/// +/// // subtract from the gauge and provide tags +/// metric!( +/// gauge(QueueSize) -= 1, +/// server = "s1", +/// host = "h1" +/// ); /// ``` pub trait GaugeMetric { /// Returns the gauge metric name that will be sent to statsd. fn name(&self) -> &'static str; } +#[doc(hidden)] +#[macro_export] +macro_rules! key_var { + ($id:expr $(,)*) => {{ + let name = $crate::_metrics::KeyName::from_const_str($id); + $crate::_metrics::Key::from_static_labels(name, &[]) + }}; + ($id:expr $(, $k:expr => $v:expr)* $(,)?) => {{ + let name = $crate::_metrics::KeyName::from_const_str($id); + let labels = ::std::vec![ + $($crate::_metrics::Label::new( + $crate::_metrics::SharedString::const_str($k), + $crate::_metrics::SharedString::from_owned($v.into()) + )),* + ]; + + $crate::_metrics::Key::from_parts(name, labels) + }}; +} + /// Emits a metric. /// /// See [crate-level documentation](self) for examples. #[macro_export] macro_rules! metric { // counter increment - (counter($id:expr) += $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => { + (counter($id:expr) += $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => {{ match $value { value if value != 0 => { - $crate::with_client(|client| { - use $crate::_pred::*; - client.send_metric( - client.count_with_tags(&$crate::CounterMetric::name(&$id), value) - $(.with_tag(stringify!($($k).*), $v))* - ) - }) - }, - _ => {}, - }; - }; - - // counter decrement - (counter($id:expr) -= $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => { - match $value { - value if value != 0 => { - $crate::with_client(|client| { - use $crate::_pred::*; - client.send_metric( - client.count_with_tags(&$crate::CounterMetric::name(&$id), -value) - $(.with_tag(stringify!($($k).*), $v))* - ) - }) - }, - _ => {}, - }; - }; + let key = $crate::key_var!($crate::CounterMetric::name(&$id) $(, stringify!($($k).*) => $v)*); + let metadata = $crate::_metrics::metadata_var!(::std::module_path!(), $crate::_metrics::Level::INFO); + $crate::_metrics::with_recorder(|recorder| recorder.register_counter(&key, metadata)) + .increment(value); + } + _ => {} + } + }}; // gauge set - (gauge($id:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => { - $crate::with_client(|client| { - use $crate::_pred::*; - client.send_metric( - client.gauge_with_tags(&$crate::GaugeMetric::name(&$id), $value) - $(.with_tag(stringify!($($k).*), $v))* - ) - }) - }; - - // distribution with explicit sample rate (overrides global sample rate) - (distribution($id:expr, sample = $sample:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => { - $crate::with_client(|client| { - use $crate::_pred::*; - client.send_metric_with_sample_rate( - client.distribution_with_tags(&$crate::DistributionMetric::name(&$id), $value) - $(.with_tag(stringify!($($k).*), $v))*, - Some($sample.into()) - ) - }) - }; - - // distribution (uses global sample rate) - (distribution($id:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => { - $crate::with_client(|client| { - use $crate::_pred::*; - client.send_metric( - client.distribution_with_tags(&$crate::DistributionMetric::name(&$id), $value) - $(.with_tag(stringify!($($k).*), $v))* - ) - }) - }; - - // sets (count unique occurrences of a value per time interval) - (set($id:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => { - $crate::with_client(|client| { - use $crate::_pred::*; - client.send_metric( - client.set_with_tags(&$crate::SetMetric::name(&$id), $value) - $(.with_tag(stringify!($($k).*), $v))* - ) - }) - }; - - // timer value with explicit sample rate (overrides global sample rate) - (timer($id:expr, sample = $sample:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => { - $crate::with_client(|client| { - use $crate::_pred::*; - client.send_metric_with_sample_rate( - // NOTE: cadence distribution support Duration out of the box and converts it to nanos, - // but we want milliseconds for historical reasons. - client.distribution_with_tags(&$crate::TimerMetric::name(&$id), $value.as_nanos() as f64 / 1e6) - $(.with_tag(stringify!($($k).*), $v))*, - Some($sample.into()) - ) - }) - }; + (gauge($id:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => {{ + let key = $crate::key_var!($crate::GaugeMetric::name(&$id) $(, stringify!($($k).*) => $v)*); + let metadata = $crate::_metrics::metadata_var!(::std::module_path!(), $crate::_metrics::Level::INFO); + $crate::_metrics::with_recorder(|recorder| recorder.register_gauge(&key, metadata)) + .set($value as f64); + }}; + // gauge increment + (gauge($id:expr) += $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => {{ + let key = $crate::key_var!($crate::GaugeMetric::name(&$id) $(, stringify!($($k).*) => $v)*); + let metadata = $crate::_metrics::metadata_var!(::std::module_path!(), $crate::_metrics::Level::INFO); + $crate::_metrics::with_recorder(|recorder| recorder.register_gauge(&key, metadata)) + .increment($value as f64); + }}; + // gauge decrement + (gauge($id:expr) -= $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => {{ + let key = $crate::key_var!($crate::GaugeMetric::name(&$id) $(, stringify!($($k).*) => $v)*); + let metadata = $crate::_metrics::metadata_var!(::std::module_path!(), $crate::_metrics::Level::INFO); + $crate::_metrics::with_recorder(|recorder| recorder.register_gauge(&key, metadata)) + .decrement($value as f64); + }}; - // timer value (uses global sample rate) - (timer($id:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => { - $crate::with_client(|client| { - use $crate::_pred::*; - client.send_metric( - // NOTE: cadence distribution support Duration out of the box and converts it to nanos, - // but we want milliseconds for historical reasons. - client.distribution_with_tags(&$crate::TimerMetric::name(&$id), $value.as_nanos() as f64 / 1e6) - $(.with_tag(stringify!($($k).*), $v))* - ) - }) - }; + // distribution + (distribution($id:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => {{ + let key = $crate::key_var!($crate::DistributionMetric::name(&$id) $(, stringify!($($k).*) => $v)*); + let metadata = $crate::_metrics::metadata_var!(::std::module_path!(), $crate::_metrics::Level::INFO); + $crate::_metrics::with_recorder(|recorder| recorder.register_histogram(&key, metadata)) + .record($value as f64); + }}; - // timed block with explicit sample rate (overrides global sample rate) - (timer($id:expr, sample = $sample:expr), $($($k:ident).* = $v:expr,)* $block:block) => {{ - let now = std::time::Instant::now(); - let rv = {$block}; - $crate::metric!(timer($id, sample = $sample) = now.elapsed() $(, $($k).* = $v)*); - rv + // timer value + (timer($id:expr) = $value:expr $(, $($k:ident).* = $v:expr)* $(,)?) => {{ + let key = $crate::key_var!($crate::TimerMetric::name(&$id) $(, stringify!($($k).*) => $v)*); + let metadata = $crate::_metrics::metadata_var!(::std::module_path!(), $crate::_metrics::Level::INFO); + $crate::_metrics::with_recorder(|recorder| recorder.register_histogram(&key, metadata)) + .record($value.as_nanos() as f64 / 1e6); }}; - // timed block (uses global sample rate) + // timed block (timer($id:expr), $($($k:ident).* = $v:expr,)* $block:block) => {{ let now = std::time::Instant::now(); let rv = {$block}; @@ -756,13 +405,7 @@ macro_rules! metric { mod tests { use std::time::Duration; - use cadence::{NopMetricSink, StatsdClient}; - - use crate::{ - CounterMetric, DistributionMetric, GaugeMetric, MetricsClient, SetMetric, TimerMetric, - set_client, with_capturing_test_client, with_capturing_test_client_sample_rate, - with_client, - }; + use super::*; enum TestGauges { Foo, @@ -794,14 +437,6 @@ mod tests { } } - struct TestSet; - - impl SetMetric for TestSet { - fn name(&self) -> &'static str { - "set" - } - } - struct TestTimer; impl TimerMetric for TestTimer { @@ -834,21 +469,6 @@ mod tests { ) } - #[test] - fn current_client_is_global_client() { - let client1 = with_client(|c| format!("{c:?}")); - set_client(MetricsClient { - statsd_client: StatsdClient::from_sink("", NopMetricSink), - default_tags: Default::default(), - default_sample_rate: 1.0.into(), - rx: None, - }); - let client2 = with_client(|c| format!("{c:?}")); - - // After setting the global client,the current client must change: - assert_ne!(client1, client2); - } - #[test] fn test_counter_tags_with_dots() { let captures = with_capturing_test_client(|| { @@ -858,7 +478,7 @@ mod tests { server = "server1", ); metric!( - counter(TestCounter) -= 5, + counter(TestCounter) += 5, hc.project_id = "567", server = "server1", ); @@ -867,7 +487,7 @@ mod tests { captures, [ "counter:10|c|#hc.project_id:567,server:server1", - "counter:-5|c|#hc.project_id:567,server:server1" + "counter:5|c|#hc.project_id:567,server:server1" ] ); } @@ -899,41 +519,6 @@ mod tests { ); } - #[test] - fn test_distribution_with_explicit_sample_rate() { - let captures = with_capturing_test_client(|| { - metric!(distribution(TestDistribution, sample = 0.999999999) = 123); - }); - assert_eq!(captures, ["distribution:123|d|@0.999999999"]); - } - - #[test] - fn test_distribution_with_explicit_sample_rate_and_tags() { - let captures = with_capturing_test_client(|| { - metric!( - distribution(TestDistribution, sample = 0.999999999) = 456, - server = "server1", - host = "host1", - ); - }); - assert_eq!( - captures, - ["distribution:456|d|@0.999999999|#server:server1,host:host1"] - ); - } - - #[test] - fn test_set_tags_with_dots() { - let captures = with_capturing_test_client(|| { - metric!( - set(TestSet) = 123, - hc.project_id = "567", - server = "server1", - ); - }); - assert_eq!(captures, ["set:123|s|#hc.project_id:567,server:server1"]); - } - #[test] fn test_timer_tags_with_dots() { let captures = with_capturing_test_client(|| { @@ -966,38 +551,6 @@ mod tests { assert!(captures[0].ends_with("|d|#hc.project_id:567,server:server1")); } - #[test] - fn test_timer_with_explicit_sample_rate() { - let captures = with_capturing_test_client(|| { - let duration = Duration::from_secs(100); - metric!(timer(TestTimer, sample = 0.999999999) = duration); - }); - assert_eq!(captures, ["timer:100000|d|@0.999999999"]); - } - - #[test] - fn test_timer_with_explicit_sample_rate_and_tags() { - let captures = with_capturing_test_client(|| { - let duration = Duration::from_secs(100); - metric!( - timer(TestTimer, sample = 0.999999999) = duration, - server = "server1", - ); - }); - assert_eq!(captures, ["timer:100000|d|@0.999999999|#server:server1"]); - } - - #[test] - fn test_timed_block_with_explicit_sample_rate() { - let captures = with_capturing_test_client(|| { - metric!(timer(TestTimer, sample = 0.999999999), { - // your code could be here - }) - }); - // just check the sample rate to not make this flaky - assert!(captures[0].contains("|d|@0.999999999")); - } - #[test] fn nanos_rounding_error() { let one_day = Duration::from_secs(60 * 60 * 24); @@ -1006,7 +559,7 @@ mod tests { }); // for "short" durations, precision is preserved: - assert_eq!(captures, ["timer:86400000.000001|d"]); + assert_eq!(captures, ["timer:86400000.000001|d|#"]); let one_year = Duration::from_secs(60 * 60 * 24 * 365); let captures = with_capturing_test_client(|| { @@ -1014,58 +567,6 @@ mod tests { }); // for very long durations, precision is lost: - assert_eq!(captures, ["timer:31536000000|d"]); - } - - #[test] - fn test_local_sample_rate_overrides_global() { - // With global sample rate of 0.999, no @rate is added to the output - // With a local override, the local rate should appear in the output - let captures = with_capturing_test_client_sample_rate(0.999999999, || { - // Without explicit sampling, no @rate in output (global is 1.0) - metric!(distribution(TestDistribution) = 100); - // With explicit sampling, should use local rate (0.01) - metric!(distribution(TestDistribution, sample = 0.999999998) = 200); - }); - assert_eq!(captures.len(), 2); - // First metric has no sample rate - assert_eq!(captures[0], "distribution:100|d|@0.999999999"); - // Second metric uses local sample rate - assert_eq!(captures[1], "distribution:200|d|@0.999999998"); - } - - #[test] - fn test_timer_local_sample_rate_overrides_global() { - // With global sample rate of 1.0, no @rate is added to the output - // With a local override, the local rate should appear in the output - let captures = with_capturing_test_client_sample_rate(1.0, || { - let duration = Duration::from_secs(1); - // Without explicit sampling, no @rate in output (global is 1.0) - metric!(timer(TestTimer) = duration); - // With explicit sampling, should use local rate (0.999999999) - metric!(timer(TestTimer, sample = 0.999999999) = duration); - }); - - assert_eq!(captures.len(), 2); - // First metric has no sample rate (global is 1.0, so it's omitted) - assert_eq!(captures[0], "timer:1000|d"); - // Second metric uses local sample rate - assert_eq!(captures[1], "timer:1000|d|@0.999999999"); - } - - #[test] - fn test_local_sample_rate_capped() { - let captures = with_capturing_test_client_sample_rate(0.0, || { - metric!(distribution(TestDistribution, sample = 0.999999999) = 200); - }); - assert_eq!(captures.len(), 0); - } - - #[test] - fn test_global_disabled() { - let captures = with_capturing_test_client_sample_rate(0.0, || { - metric!(distribution(TestDistribution) = 200); - }); - assert_eq!(captures.len(), 0); + assert_eq!(captures, ["timer:31536000000|d|#"]); } } diff --git a/relay-statsd/src/mock.rs b/relay-statsd/src/mock.rs new file mode 100644 index 00000000000..fc7cd12e671 --- /dev/null +++ b/relay-statsd/src/mock.rs @@ -0,0 +1,98 @@ +use std::sync::{Arc, Mutex, PoisonError}; + +use metrics::{Counter, Gauge, Histogram, KeyName, SharedString, Unit}; + +#[derive(Debug, Default)] +pub struct MockRecorder { + inner: Arc>>, +} + +impl MockRecorder { + pub fn consume(self) -> Vec { + let inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner); + inner.clone() + } +} + +impl metrics::Recorder for MockRecorder { + fn describe_counter(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_gauge(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_histogram(&self, _: KeyName, _: Option, _: SharedString) {} + + fn register_counter(&self, key: &metrics::Key, _: &metrics::Metadata<'_>) -> Counter { + Counter::from_arc(Arc::new(MockFn { + inner: Arc::clone(&self.inner), + key: key.clone(), + })) + } + + fn register_gauge(&self, key: &metrics::Key, _: &metrics::Metadata<'_>) -> Gauge { + Gauge::from_arc(Arc::new(MockFn { + inner: Arc::clone(&self.inner), + key: key.clone(), + })) + } + + fn register_histogram(&self, key: &metrics::Key, _: &metrics::Metadata<'_>) -> Histogram { + Histogram::from_arc(Arc::new(MockFn { + inner: Arc::clone(&self.inner), + key: key.clone(), + })) + } +} + +struct MockFn { + inner: Arc>>, + key: metrics::Key, +} + +impl metrics::CounterFn for MockFn { + fn increment(&self, value: u64) { + let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner); + inner.push(format_metric("c", &self.key, value)); + } + + fn absolute(&self, value: u64) { + let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner); + inner.push(format_metric("c", &self.key, format!("={value}"))); + } +} + +impl metrics::GaugeFn for MockFn { + fn increment(&self, value: f64) { + let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner); + inner.push(format_metric("g", &self.key, format!("+{value}"))); + } + + fn decrement(&self, value: f64) { + let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner); + inner.push(format_metric("g", &self.key, format!("-{value}"))); + } + + fn set(&self, value: f64) { + let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner); + inner.push(format_metric("g", &self.key, value)); + } +} + +impl metrics::HistogramFn for MockFn { + fn record(&self, value: f64) { + let mut inner = self.inner.lock().unwrap_or_else(PoisonError::into_inner); + inner.push(format_metric("d", &self.key, value)); + } +} + +fn format_metric(t: &str, key: &metrics::Key, value: impl std::fmt::Display) -> String { + let name = key.name(); + let tags = key + .labels() + .map(|label| { + let key = label.key(); + let value = label.value(); + format!("{key}:{value}") + }) + .collect::>() + .join(","); + + format!("{name}:{value}|{t}|#{tags}") +} diff --git a/relay-statsd/src/upstream.rs b/relay-statsd/src/upstream.rs deleted file mode 100644 index adef77fefdc..00000000000 --- a/relay-statsd/src/upstream.rs +++ /dev/null @@ -1,180 +0,0 @@ -use std::io; -use std::net::{Ipv4Addr, UdpSocket}; -#[cfg(unix)] -use std::os::unix::net::UnixDatagram; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use statsdproxy::middleware::Middleware; -use statsdproxy::types::Metric; - -pub enum Remote { - Udp(UdpSocket), - #[cfg(unix)] - UnixDatagram(UnixDatagram), -} - -impl Remote { - fn connect(addr: &str) -> io::Result { - // Try treating the address as a fully-qualified URL, where the scheme is the transport identifier. - if let Some((scheme, path)) = addr.split_once("://") { - return match scheme { - "udp" => { - let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0))?; - socket.connect(path)?; - socket.set_nonblocking(true)?; - - Ok(Self::Udp(socket)) - } - #[cfg(unix)] - "unixgram" => { - let socket = UnixDatagram::unbound()?; - socket.connect(path)?; - socket.set_nonblocking(true)?; - - Ok(Self::UnixDatagram(socket)) - } - _ => Err(io::Error::other(format!( - "invalid scheme '{scheme}', expected one of 'udp', 'unixgram'" - ))), - }; - } - - // If there is no scheme, fall back to a UDP socket - let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0))?; - socket.connect(addr)?; - socket.set_nonblocking(true)?; - - Ok(Self::Udp(socket)) - } - - fn bufsize(&self) -> usize { - match self { - // The original statsdproxy uses. - Self::Udp(_) => 512, - #[cfg(unix)] - Self::UnixDatagram(_) => 1024, - } - } - - fn send(&mut self, buf: &[u8]) { - let result = match self { - Self::Udp(socket) => socket.send(buf).map(drop), - #[cfg(unix)] - Self::UnixDatagram(socket) => socket.send(buf).map(drop), - }; - - if let Err(err) = result { - relay_log::warn!("failed to send metrics to upstream: {err}"); - } - } -} - -pub struct Upstream { - remote: Remote, - buffer: Vec, - buf_used: usize, - last_sent_at: SystemTime, -} - -impl Upstream { - pub fn connect(upstream: &str, bufsize: Option) -> io::Result { - let remote = Remote::connect(upstream)?; - - Ok(Upstream { - buffer: vec![0; bufsize.unwrap_or_else(|| remote.bufsize())], - remote, - buf_used: 0, - last_sent_at: UNIX_EPOCH, - }) - } - - fn flush(&mut self) { - if self.buf_used > 0 { - self.remote.send(&self.buffer[..self.buf_used]); - self.buf_used = 0; - } - self.last_sent_at = SystemTime::now(); // Annoyingly superfluous call to now(). - } - - fn timed_flush(&mut self) { - let now = SystemTime::now(); - if now - .duration_since(self.last_sent_at) - .map_or(true, |x| x > Duration::from_secs(1)) - { - // We have not sent any metrics in a while. Flush the buffer. - self.flush(); - } - } -} - -impl Drop for Upstream { - fn drop(&mut self) { - self.flush(); - } -} - -impl Middleware for Upstream { - fn submit(&mut self, metric: &mut Metric) { - let metric_len = metric.raw.len(); - if metric_len + 1 > self.buffer.len() - self.buf_used { - // Message bigger than space left in buffer. Flush the buffer. - self.flush(); - } - if metric_len > self.buffer.len() { - // Message too big for the entire buffer, send it and pray. - self.remote.send(&metric.raw); - } else { - // Put the message in the buffer, separating it from the previous message if any. - if self.buf_used > 0 { - self.buffer[self.buf_used] = b'\n'; - self.buf_used += 1; - } - self.buffer[self.buf_used..self.buf_used + metric_len].copy_from_slice(&metric.raw); - self.buf_used += metric_len; - } - // poll gets called before submit, so if the buffer needed to be flushed for time reasons, - // it already was. - } - - fn poll(&mut self) { - self.timed_flush(); - } -} - -/// A [`Upstream`] which falls back to noop operations if connecting to the statsd sink fails. -pub enum TryUpstream { - Upstream(Upstream), - Error, -} - -impl TryUpstream { - pub fn connect(upstream: &str, bufsize: Option) -> Self { - match Upstream::connect(upstream, bufsize) { - Ok(upstream) => Self::Upstream(upstream), - Err(err) => { - relay_log::error!( - error = &err as &dyn std::error::Error, - "failed to connect to statsd sink at {upstream}" - ); - Self::Error - } - } - } -} - -impl Middleware for TryUpstream { - fn submit(&mut self, metric: &mut Metric) { - match self { - Self::Upstream(upstream) => upstream.submit(metric), - Self::Error => {} - } - } - - fn poll(&mut self) { - match self { - Self::Upstream(upstream) => upstream.poll(), - Self::Error => {} - } - } -} diff --git a/relay-system/src/runtime/spawn.rs b/relay-system/src/runtime/spawn.rs index b460cbbd39f..bf65ce70598 100644 --- a/relay-system/src/runtime/spawn.rs +++ b/relay-system/src/runtime/spawn.rs @@ -1,7 +1,7 @@ use futures::Future; use tokio::task::JoinHandle; -use crate::statsd::SystemCounters; +use crate::statsd::SystemGauges; use crate::{Service, ServiceObj}; /// Spawns an instrumented task with an automatically generated [`TaskId`]. @@ -82,16 +82,6 @@ impl TaskId { pub(crate) fn id(&self) -> &'static str { self.id } - - fn emit_task_count_metric(&self, cnt: i64) { - let Self { id, file, line } = self; - relay_statsd::metric!( - counter(SystemCounters::RuntimeTaskCount) += cnt, - id = id, - file = file.unwrap_or_default(), - line = line.unwrap_or_default() - ); - } } impl From<&ServiceObj> for TaskId { @@ -114,14 +104,24 @@ pin_project_lite::pin_project! { impl PinnedDrop for Task { fn drop(this: Pin<&mut Self>) { - this.id.emit_task_count_metric(-1); + relay_statsd::metric!( + gauge(SystemGauges::RuntimeTaskCount) -= 1, + id = this.id.id, + file = this.id.file.unwrap_or_default(), + line = this.id.line.unwrap_or_default() + ); } } } impl Task { fn new(id: TaskId, inner: T) -> Self { - id.emit_task_count_metric(1); + relay_statsd::metric!( + gauge(SystemGauges::RuntimeTaskCount) += 1, + id = id.id, + file = id.file.unwrap_or_default(), + line = id.line.unwrap_or_default() + ); Self { id, inner } } } @@ -159,15 +159,15 @@ mod tests { #[cfg(not(windows))] assert_debug_snapshot!(captures, @r#" [ - "runtime.task.count:1|c|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155", - "runtime.task.count:-1|c|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155", + "runtime.tasks:+1|g|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155", + "runtime.tasks:-1|g|#id:relay-system/src/runtime/spawn.rs:155,file:relay-system/src/runtime/spawn.rs,line:155", ] "#); #[cfg(windows)] assert_debug_snapshot!(captures, @r###" [ - "runtime.task.count:1|c|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155", - "runtime.task.count:-1|c|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155", + "runtime.tasks:+1|g|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155", + "runtime.tasks:-1|g|#id:relay-system\\src\\runtime\\spawn.rs:155,file:relay-system\\src\\runtime\\spawn.rs,line:155", ] "###); } @@ -192,8 +192,8 @@ mod tests { assert_debug_snapshot!(captures, @r#" [ - "runtime.task.count:1|c|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:", - "runtime.task.count:-1|c|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:", + "runtime.tasks:+1|g|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:", + "runtime.tasks:-1|g|#id:relay_system::runtime::spawn::tests::test_spawn_with_custom_id::Foo,file:,line:", ] "#); } diff --git a/relay-system/src/statsd.rs b/relay-system/src/statsd.rs index 3898f3469ec..b37746a8609 100644 --- a/relay-system/src/statsd.rs +++ b/relay-system/src/statsd.rs @@ -1,7 +1,7 @@ -use relay_statsd::{CounterMetric, GaugeMetric}; +use relay_statsd::GaugeMetric; -/// Counter metrics for Relay system components. -pub enum SystemCounters { +/// Gauge metrics for Relay system components. +pub enum SystemGauges { /// Number of active runtime tasks. /// /// Every call to [`spawn`](`crate::spawn()`) increases this counter by one, @@ -12,18 +12,6 @@ pub enum SystemCounters { /// - `file`: The source filename where the task is created. /// - `line`: The source line where the task is created within the file. RuntimeTaskCount, -} - -impl CounterMetric for SystemCounters { - fn name(&self) -> &'static str { - match self { - Self::RuntimeTaskCount => "runtime.task.count", - } - } -} - -/// Gauge metrics for Relay system components. -pub enum SystemGauges { /// A number of messages queued in a services inbound message channel. /// /// This metric is emitted once per second for every running service. Without backlogs, this @@ -38,6 +26,7 @@ pub enum SystemGauges { impl GaugeMetric for SystemGauges { fn name(&self) -> &'static str { match *self { + Self::RuntimeTaskCount => "runtime.tasks", Self::ServiceBackPressure => "service.back_pressure", } } diff --git a/relay/src/setup.rs b/relay/src/setup.rs index a7e693491bb..8dcbd2f27f3 100644 --- a/relay/src/setup.rs +++ b/relay/src/setup.rs @@ -3,7 +3,7 @@ use anyhow::Context; use anyhow::Result; use relay_config::{Config, RelayMode}; use relay_server::MemoryStat; -use relay_statsd::MetricsClientConfig; +use relay_statsd::MetricsConfig; /// Validates that the `batch_size_bytes` of the configuration is correct and doesn't lead to /// deadlocks in the buffer. @@ -110,14 +110,11 @@ pub fn init_metrics(config: &Config) -> Result<()> { { default_tags.insert(hostname_tag.to_owned(), hostname); } - relay_statsd::init(MetricsClientConfig { - prefix: config.metrics_prefix(), + relay_statsd::init(MetricsConfig { + prefix: config.metrics_prefix().to_owned(), host: host.to_owned(), buffer_size: config.statsd_buffer_size(), default_tags, - sample_rate: config.metrics_sample_rate().into(), - aggregate: config.metrics_aggregate(), - allow_high_cardinality_tags: config.metrics_allow_high_cardinality_tags(), })?; Ok(()) diff --git a/tools/document-metrics/src/main.rs b/tools/document-metrics/src/main.rs index 7e3b3ea5f4d..60c4ebb897e 100644 --- a/tools/document-metrics/src/main.rs +++ b/tools/document-metrics/src/main.rs @@ -24,7 +24,6 @@ enum MetricType { Timer, Counter, Distribution, - Set, Gauge, } @@ -136,8 +135,6 @@ fn get_metric_type(imp: &mut syn::ItemImpl) -> Option { Some(MetricType::Counter) } else if trait_name == "DistributionMetric" { Some(MetricType::Distribution) - } else if trait_name == "SetMetric" { - Some(MetricType::Set) } else if trait_name == "GaugeMetric" { Some(MetricType::Gauge) } else { @@ -300,30 +297,30 @@ mod tests { fn test_parse_metrics() -> Result<()> { let source = r#" /// A metric collection used for testing. - pub enum TestSets { + pub enum TestCounters { /// The metric we test. - UniqueSet, + UniqueCounter, /// The metric we test. #[cfg(feature = "conditional")] - ConditionalSet, + ConditionalCounter, /// Another metric we test. #[cfg(cfg_flag)] - ConditionalCompileSet, + ConditionalCompileCounter, /// Yet another metric we test. #[cfg(all(cfg_flag, feature = "conditional"))] - MultiConditionalCompileSet, + MultiConditionalCompileCounter, } - impl SetMetric for TestSets { + impl CounterMetric for TestCounters { fn name(&self) -> &'static str { match self { - Self::UniqueSet => "test.unique", + Self::UniqueCounter => "test.unique", #[cfg(feature = "conditional")] - Self::ConditionalSet => "test.conditional", + Self::ConditionalCounter => "test.conditional", #[cfg(cfg_flag)] - Self::ConditionalCompileSet => "test.conditional_compile", + Self::ConditionalCompileCounter => "test.conditional_compile", #[cfg(all(cfg_flag, feature = "conditional"))] - Self::MultiConditionalCompileSet => "test.multi_conditional_compile" + Self::MultiConditionalCompileCounter => "test.multi_conditional_compile" } } } @@ -333,7 +330,7 @@ mod tests { insta::assert_debug_snapshot!(metrics, @r###" [ Metric { - ty: Set, + ty: Counter, name: "test.conditional", description: "The metric we test.", features: [ @@ -341,7 +338,7 @@ mod tests { ], }, Metric { - ty: Set, + ty: Counter, name: "test.conditional_compile", description: "Another metric we test.", features: [ @@ -349,7 +346,7 @@ mod tests { ], }, Metric { - ty: Set, + ty: Counter, name: "test.multi_conditional_compile", description: "Yet another metric we test.", features: [ @@ -358,7 +355,7 @@ mod tests { ], }, Metric { - ty: Set, + ty: Counter, name: "test.unique", description: "The metric we test.", features: [],