Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ opentelemetry-otlp = { version = "0.31", default-features = false, features = [
"metrics",
] }
opentelemetry-stdout = { version = "0.31", default-features = false, features = [
"trace",
"metrics",
"trace",
"metrics",
] }
opentelemetry_sdk = { version = "0.31", default-features = false, features = [
"trace",
Expand Down
52 changes: 49 additions & 3 deletions src/moonlink_service/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use apache_avro::Schema as AvroSchema;
use arrow_ipc::writer::StreamWriter;
use axum::{
error_handling::HandleErrorLayer,
extract::{Path, State},
http::{Method, StatusCode},
extract::{Path, Request, State},
http::{HeaderMap, Method, StatusCode},
middleware::{self, Next},
response::{Json, Response},
routing::{delete, get, post},
BoxError, Router,
Expand Down Expand Up @@ -39,13 +40,26 @@ pub struct ApiState {
pub backend: Arc<moonlink_backend::MoonlinkBackend>,
/// Maps from source table name to schema id.
pub kafka_schema_id_cache: Arc<RwLock<HashMap<String, u64>>>,
/// Bearer token for authentication (cached at startup)
pub bearer_token: Option<String>,
}

impl ApiState {
pub fn new(backend: Arc<moonlink_backend::MoonlinkBackend>) -> Self {
let bearer_token = std::env::var("MOONLINK_REST_TOKEN")
.ok()
.filter(|token| !token.is_empty());

if bearer_token.is_some() {
info!("Bearer token authentication enabled for REST API");
} else {
info!("Bearer token authentication disabled for REST API");
}

Self {
backend,
kafka_schema_id_cache: Arc::new(RwLock::new(HashMap::new())),
bearer_token,
}
}
}
Expand Down Expand Up @@ -386,6 +400,34 @@ fn get_backend_error_status_code(error: &moonlink_backend::Error) -> StatusCode
}
}

async fn auth_middleware(
state: State<ApiState>,
headers: HeaderMap,
request: Request,
next: Next,
) -> Result<Response, (StatusCode, Json<ErrorResponse>)> {
// Use cached bearer token from state (no env var lookup)
let Some(ref auth_token) = state.bearer_token else {
// No token configured, skip auth
return Ok(next.run(request).await);
};

let expected_header = format!("Bearer {}", auth_token);
let auth_header = headers
.get("authorization")
.and_then(|header| header.to_str().ok());

match auth_header {
Some(header) if header == expected_header => Ok(next.run(request).await),
_ => Err((
StatusCode::UNAUTHORIZED,
Json(ErrorResponse {
message: "Invalid or missing bearer token".to_string(),
}),
)),
}
}

/// Create the router with all API endpoints
pub fn create_router(state: ApiState) -> Router {
let timeout_layer = ServiceBuilder::new()
Expand Down Expand Up @@ -421,7 +463,11 @@ pub fn create_router(state: ApiState) -> Router {
.route("/tables/{table}/optimize", post(optimize_table))
.route("/tables/{table}/snapshot", post(create_snapshot))
.route("/tables/{table}/flush", post(flush_table))
.with_state(state)
.with_state(state.clone())
.layer(middleware::from_fn_with_state(
state.clone(),
auth_middleware,
))
.layer(
CorsLayer::new()
.allow_origin(Any)
Expand Down
68 changes: 68 additions & 0 deletions src/moonlink_service/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,74 @@ async fn test_health_check_endpoint() {
ma::assert_gt!(response.timestamp, 0);
}

#[tokio::test]
#[serial]
async fn test_bearer_token_authentication() {
let _guard = TestGuard::new(&get_moonlink_backend_dir());

// Set the environment variable for bearer token authentication
std::env::set_var("MOONLINK_REST_TOKEN", "test-secret-token");

let config = get_service_config();
tokio::spawn(async move {
start_with_config(config).await.unwrap();
});
wait_for_server_ready().await;

let client = reqwest::Client::new();

// Should fail without auth header
let response = client
.get(format!("{REST_ADDR}/health"))
.header("content-type", "application/json")
.send()
.await
.unwrap();

assert_eq!(
response.status(),
reqwest::StatusCode::UNAUTHORIZED,
"Request without auth header should return 401"
);

// Should fail with wrong token format
let response = client
.get(format!("{REST_ADDR}/health"))
.header("content-type", "application/json")
.header("Authorization", "wrong-token")
.send()
.await
.unwrap();

assert_eq!(
response.status(),
reqwest::StatusCode::UNAUTHORIZED,
"Request with wrong token format should return 401"
);

// Should succeed with correct Bearer token
let response = client
.get(format!("{REST_ADDR}/health"))
.header("content-type", "application/json")
.header("Authorization", "Bearer test-secret-token")
.send()
.await
.unwrap();

assert!(
response.status().is_success(),
"Response status is {response:?}"
);

let response: HealthResponse = response.json().await.unwrap();
assert_eq!(response.service, "moonlink-rest-api");
assert_eq!(response.status, "healthy");
ma::assert_gt!(response.timestamp, 0);

// Clean up environment variable
std::env::remove_var("MOONLINK_REST_TOKEN");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Preserve Environment State in Cleanup Checks

The test unconditionally removes the MOONLINK_REST_TOKEN environment variable in cleanup without preserving its original value. If the environment variable was set before the test ran (e.g., in a production environment or CI/CD), the test will remove it, potentially breaking subsequent code that expects it to be set. The test should save the original value with std::env::var("MOONLINK_REST_TOKEN").ok() before setting it, and restore the original state in cleanup rather than unconditionally removing it.

Fix in Cursor Fix in Web

}

#[tokio::test]
#[serial]
async fn test_schema() {
Expand Down