From 898ca981aa11fa24f11290fa0b685f83f82d3839 Mon Sep 17 00:00:00 2001 From: Ian Woodard <17186604+IanWoodard@users.noreply.github.com> Date: Wed, 19 Nov 2025 09:06:22 -0800 Subject: [PATCH] ref(metrics): Add redis pool metrics --- Cargo.lock | 1 + services/gateway/src/main.rs | 25 ++++++++++++++++++++++++- services/publish/src/main.rs | 20 ++++++++++++++++++++ shared/broker/Cargo.toml | 1 + shared/broker/src/lib.rs | 7 +++++++ 5 files changed, 53 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 0709039..78213a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -373,6 +373,7 @@ version = "0.1.0" dependencies = [ "async-trait", "deadpool-redis", + "metrics", "mockall", "redis", "thiserror 2.0.16", diff --git a/services/gateway/src/main.rs b/services/gateway/src/main.rs index a8d423d..8d8a070 100644 --- a/services/gateway/src/main.rs +++ b/services/gateway/src/main.rs @@ -1,4 +1,7 @@ -use std::net::{SocketAddr, ToSocketAddrs}; +use std::{ + net::{SocketAddr, ToSocketAddrs}, + time::Duration, +}; use axum::{Router, routing::get}; use broker::RedisClient; @@ -16,6 +19,7 @@ use tokio::{net::TcpListener, runtime::Runtime, time::Instant}; use tower_http::cors::{Any, CorsLayer}; const SERVICE_NAME: &str = "gateway"; +const POOL_METRICS_INTERVAL_SEC: u64 = 10; fn main() -> anyhow::Result<()> { dotenv().ok(); @@ -76,6 +80,8 @@ async fn async_main() -> anyhow::Result<(), anyhow::Error> { jwt_config, }; + let redis_for_metrics = state.redis.clone(); + // CORS: Allow all origins. JWT authentication (not cookies) is our security boundary. // If we switch to cookie-based auth, we will need to restrict origins to prevent CSRF. let cors = CorsLayer::new() @@ -90,6 +96,23 @@ async fn async_main() -> anyhow::Result<(), anyhow::Error> { .layer(cors) .with_state(state); + tokio::spawn({ + async move { + let mut interval = + tokio::time::interval(Duration::from_secs(POOL_METRICS_INTERVAL_SEC)); + loop { + interval.tick().await; + let status = redis_for_metrics.pool_status(); + + metrics::gauge!("redis.pool.size").set(status.size as f64); + metrics::gauge!("redis.pool.available").set(status.available as f64); + + let active = status.size - status.available; + metrics::gauge!("redis.pool.active").set(active as f64); + } + } + }); + let addr: SocketAddr = format!("0.0.0.0:{}", port).parse()?; println!("Running on http://{}", addr); diff --git a/services/publish/src/main.rs b/services/publish/src/main.rs index 046d466..fd54162 100644 --- a/services/publish/src/main.rs +++ b/services/publish/src/main.rs @@ -23,6 +23,7 @@ use tokio::{net::TcpListener, runtime::Runtime, select, time::Instant}; use tokio_util::sync::CancellationToken; const SERVICE_NAME: &str = "publish"; +const POOL_METRICS_INTERVAL_SEC: u64 = 10; async fn cleanup_old_stream( redis: &impl RedisOperations, @@ -114,6 +115,8 @@ async fn async_main() -> anyhow::Result<(), anyhow::Error> { cleanup_config: cleanup_config.clone(), }; + let redis_for_metrics = state.redis.clone(); + let mut app = Router::new() .route("/healthz", get(healthz_handler)) .route("/readyz", get(readyz_handler)) @@ -132,6 +135,23 @@ async fn async_main() -> anyhow::Result<(), anyhow::Error> { app = app.layer(cors); } + tokio::spawn({ + async move { + let mut interval = + tokio::time::interval(Duration::from_secs(POOL_METRICS_INTERVAL_SEC)); + loop { + interval.tick().await; + let status = redis_for_metrics.pool_status(); + + metrics::gauge!("redis.pool.size").set(status.size as f64); + metrics::gauge!("redis.pool.available").set(status.available as f64); + + let active = status.size - status.available; + metrics::gauge!("redis.pool.active").set(active as f64); + } + } + }); + let addr: SocketAddr = format!("0.0.0.0:{}", port).parse()?; println!("Running on http://{}", addr); diff --git a/shared/broker/Cargo.toml b/shared/broker/Cargo.toml index 0d17e96..85cf1ed 100644 --- a/shared/broker/Cargo.toml +++ b/shared/broker/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true [dependencies] async-trait = { workspace = true } deadpool-redis = { workspace = true } +metrics = { workspace = true } mockall = { workspace = true } redis = { workspace = true } thiserror = { workspace = true } diff --git a/shared/broker/src/lib.rs b/shared/broker/src/lib.rs index 9702c01..73ac551 100644 --- a/shared/broker/src/lib.rs +++ b/shared/broker/src/lib.rs @@ -160,8 +160,15 @@ impl RedisClient { ..Default::default() }; let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; + + metrics::gauge!("redis.pool.max_size").set(max_size as f64); + Ok(Self { pool }) } + + pub fn pool_status(&self) -> deadpool_redis::cluster::Status { + self.pool.status() + } } #[async_trait]