Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 24 additions & 1 deletion services/gateway/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::net::{SocketAddr, ToSocketAddrs};
use std::{
net::{SocketAddr, ToSocketAddrs},
time::Duration,
};

use axum::{Router, routing::get};
use broker::RedisClient;
Expand All @@ -16,6 +19,7 @@ use tokio::{net::TcpListener, runtime::Runtime, time::Instant};
use tower_http::cors::{Any, CorsLayer};

const SERVICE_NAME: &str = "gateway";
const POOL_METRICS_INTERVAL_SEC: u64 = 10;

fn main() -> anyhow::Result<()> {
dotenv().ok();
Expand Down Expand Up @@ -76,6 +80,8 @@ async fn async_main() -> anyhow::Result<(), anyhow::Error> {
jwt_config,
};

let redis_for_metrics = state.redis.clone();

// CORS: Allow all origins. JWT authentication (not cookies) is our security boundary.
// If we switch to cookie-based auth, we will need to restrict origins to prevent CSRF.
let cors = CorsLayer::new()
Expand All @@ -90,6 +96,23 @@ async fn async_main() -> anyhow::Result<(), anyhow::Error> {
.layer(cors)
.with_state(state);

tokio::spawn({
async move {
let mut interval =
tokio::time::interval(Duration::from_secs(POOL_METRICS_INTERVAL_SEC));
loop {
interval.tick().await;
let status = redis_for_metrics.pool_status();

metrics::gauge!("redis.pool.size").set(status.size as f64);
metrics::gauge!("redis.pool.available").set(status.available as f64);

let active = status.size - status.available;
metrics::gauge!("redis.pool.active").set(active as f64);
}
}
});

let addr: SocketAddr = format!("0.0.0.0:{}", port).parse()?;
println!("Running on http://{}", addr);

Expand Down
20 changes: 20 additions & 0 deletions services/publish/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::{net::TcpListener, runtime::Runtime, select, time::Instant};
use tokio_util::sync::CancellationToken;

const SERVICE_NAME: &str = "publish";
const POOL_METRICS_INTERVAL_SEC: u64 = 10;

async fn cleanup_old_stream(
redis: &impl RedisOperations,
Expand Down Expand Up @@ -114,6 +115,8 @@ async fn async_main() -> anyhow::Result<(), anyhow::Error> {
cleanup_config: cleanup_config.clone(),
};

let redis_for_metrics = state.redis.clone();

let mut app = Router::new()
.route("/healthz", get(healthz_handler))
.route("/readyz", get(readyz_handler))
Expand All @@ -132,6 +135,23 @@ async fn async_main() -> anyhow::Result<(), anyhow::Error> {
app = app.layer(cors);
}

tokio::spawn({
async move {
let mut interval =
tokio::time::interval(Duration::from_secs(POOL_METRICS_INTERVAL_SEC));
loop {
interval.tick().await;
let status = redis_for_metrics.pool_status();

metrics::gauge!("redis.pool.size").set(status.size as f64);
metrics::gauge!("redis.pool.available").set(status.available as f64);

let active = status.size - status.available;
metrics::gauge!("redis.pool.active").set(active as f64);
}
}
});

let addr: SocketAddr = format!("0.0.0.0:{}", port).parse()?;
println!("Running on http://{}", addr);

Expand Down
1 change: 1 addition & 0 deletions shared/broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition.workspace = true
[dependencies]
async-trait = { workspace = true }
deadpool-redis = { workspace = true }
metrics = { workspace = true }
mockall = { workspace = true }
redis = { workspace = true }
thiserror = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions shared/broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,15 @@ impl RedisClient {
..Default::default()
};
let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;

metrics::gauge!("redis.pool.max_size").set(max_size as f64);

Ok(Self { pool })
}

pub fn pool_status(&self) -> deadpool_redis::cluster::Status {
self.pool.status()
}
}

#[async_trait]
Expand Down
Loading