Skip to content

Commit 614f91d

Browse files
authored
feat(gateway): Adding broker and readyz endpoint (#6)
* feat(publish): Add core publish service with testing in frastructure * adding newline * Fix linting * cleaning up broker logic * feat(gateway): Adding broker and readyz endpoint
1 parent 19498bf commit 614f91d

File tree

5 files changed

+120
-10
lines changed

5 files changed

+120
-10
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/gateway/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ version = "0.1.0"
44
edition.workspace = true
55

66
[dependencies]
7+
broker = { path = "../../shared/broker" }
78
anyhow = { workspace = true }
89
axum = { workspace = true }
910
dotenvy = { workspace = true }
11+
redis = { workspace = true }
1012
serde = { workspace = true }
1113
tokio = { workspace = true }
1214
tower-http = { workspace = true }
Lines changed: 91 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,30 @@
1+
use std::{collections::HashMap, time::Duration};
2+
13
use axum::{Json, extract::State, http::StatusCode};
4+
use broker::RedisOperations;
25
use serde::Serialize;
6+
use tokio::time::timeout;
7+
8+
use crate::state::{AppState, HealthState};
39

4-
use crate::state::AppState;
10+
const REDIS_PING_TIMEOUT: u64 = 1;
11+
12+
#[derive(Clone, Copy, Debug, PartialEq, Serialize)]
13+
#[serde(rename_all = "lowercase")]
14+
enum Status {
15+
Ok,
16+
Fail,
17+
}
518

619
#[derive(Serialize)]
720
pub struct HealthResponse {
821
status: String,
922
uptime: u64,
1023
}
1124

12-
pub async fn healthz_handler(State(state): State<AppState>) -> (StatusCode, Json<HealthResponse>) {
25+
pub async fn healthz_handler(
26+
State(state): State<HealthState>,
27+
) -> (StatusCode, Json<HealthResponse>) {
1328
let uptime = state.start_time.elapsed().as_secs();
1429
(
1530
StatusCode::OK,
@@ -20,24 +35,91 @@ pub async fn healthz_handler(State(state): State<AppState>) -> (StatusCode, Json
2035
)
2136
}
2237

38+
#[derive(Serialize)]
39+
pub struct ReadyResponse {
40+
status: Status,
41+
dependencies: HashMap<&'static str, Status>,
42+
}
43+
44+
async fn check_readiness<R: RedisOperations>(redis: &R) -> ReadyResponse {
45+
let mut deps = HashMap::new();
46+
let mut status = Status::Ok;
47+
48+
match timeout(Duration::from_secs(REDIS_PING_TIMEOUT), redis.ping()).await {
49+
Ok(Ok(_)) => {
50+
deps.insert("redis", Status::Ok);
51+
}
52+
_ => {
53+
deps.insert("redis", Status::Fail);
54+
status = Status::Fail;
55+
}
56+
}
57+
58+
ReadyResponse {
59+
status,
60+
dependencies: deps,
61+
}
62+
}
63+
64+
pub async fn readyz_handler(State(state): State<AppState>) -> (StatusCode, Json<ReadyResponse>) {
65+
let response = check_readiness(&state.redis).await;
66+
let status_code = match response.status {
67+
Status::Ok => StatusCode::OK,
68+
Status::Fail => StatusCode::SERVICE_UNAVAILABLE,
69+
};
70+
71+
(status_code, Json(response))
72+
}
73+
2374
#[cfg(test)]
2475
mod tests {
76+
use crate::state::HealthState;
77+
2578
use super::*;
2679
use axum::http::StatusCode;
80+
use broker::{Error as BrokerError, MockRedisOperations};
2781
use tokio::time::Instant;
2882

29-
fn mock_app_state() -> AppState {
30-
AppState {
31-
start_time: Instant::now(),
32-
}
33-
}
34-
3583
#[tokio::test]
3684
async fn test_healthz_returns_ok() {
37-
let state = mock_app_state();
85+
let state = HealthState {
86+
start_time: Instant::now(),
87+
};
3888
let (status, response) = healthz_handler(State(state)).await;
3989

4090
assert_eq!(status, StatusCode::OK);
4191
assert_eq!(response.status, "ok");
4292
}
93+
94+
#[tokio::test]
95+
async fn test_readyz_redis_healthy() {
96+
let mut mock_redis = MockRedisOperations::new();
97+
98+
mock_redis
99+
.expect_ping()
100+
.times(1)
101+
.returning(|| Ok("PONG".to_string()));
102+
103+
let response = check_readiness(&mock_redis).await;
104+
105+
assert_eq!(response.status, Status::Ok);
106+
assert_eq!(response.dependencies.get("redis"), Some(&Status::Ok));
107+
}
108+
109+
#[tokio::test]
110+
async fn test_readyz_redis_error() {
111+
let mut mock_redis = MockRedisOperations::new();
112+
113+
mock_redis.expect_ping().times(1).returning(|| {
114+
Err(BrokerError::Redis(redis::RedisError::from((
115+
redis::ErrorKind::IoError,
116+
"Connection refused",
117+
))))
118+
});
119+
120+
let response = check_readiness(&mock_redis).await;
121+
122+
assert_eq!(response.status, Status::Fail);
123+
assert_eq!(response.dependencies.get("redis"), Some(&Status::Fail));
124+
}
43125
}

services/gateway/src/main.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
use std::net::SocketAddr;
22

33
use axum::{Router, routing::get};
4+
use broker::RedisClient;
45
use dotenvy::dotenv;
5-
use gateway::{handlers::health::healthz_handler, state::AppState};
6+
use gateway::{
7+
handlers::health::{healthz_handler, readyz_handler},
8+
state::AppState,
9+
};
610
use tokio::{net::TcpListener, runtime::Runtime, time::Instant};
711

812
fn main() -> anyhow::Result<()> {
@@ -13,13 +17,18 @@ fn main() -> anyhow::Result<()> {
1317

1418
async fn async_main() -> anyhow::Result<(), anyhow::Error> {
1519
let port = std::env::var("GATEWAY_PORT")?;
20+
let redis_url = std::env::var("REDIS_URL")?;
21+
22+
let broker = RedisClient::new(redis_url.as_str()).await?;
1623

1724
let state = AppState {
25+
redis: broker,
1826
start_time: Instant::now(),
1927
};
2028

2129
let mut app = Router::new()
2230
.route("/healthz", get(healthz_handler))
31+
.route("/readyz", get(readyz_handler))
2332
.with_state(state);
2433

2534
#[cfg(debug_assertions)]

services/gateway/src/state.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,21 @@
1+
use broker::RedisClient;
12
use tokio::time::Instant;
23

34
#[derive(Clone)]
45
pub struct AppState {
6+
pub redis: RedisClient,
57
pub start_time: Instant,
68
}
9+
10+
#[derive(Clone)]
11+
pub struct HealthState {
12+
pub start_time: Instant,
13+
}
14+
15+
impl axum::extract::FromRef<AppState> for HealthState {
16+
fn from_ref(app: &AppState) -> Self {
17+
Self {
18+
start_time: app.start_time,
19+
}
20+
}
21+
}

0 commit comments

Comments
 (0)