diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 7549e8c392b..72d930535ae 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -78,11 +78,17 @@ impl Config { }) => (host_url, auth_token, ca_cert), SubCommand::Token(create_token_config) => { let host_settings = create_token_config.get_connection_settings()?; - ( - &host_settings.host_url, - &host_settings.auth_token, - &host_settings.ca_cert, - ) + // We need to return references, so we'll handle this differently + return Ok({ + let mut client = Client::new( + host_settings.host_url.clone(), + host_settings.ca_cert.clone(), + )?; + if let Some(token) = &host_settings.auth_token { + client = client.with_auth_token(token.expose_secret()); + } + client + }); } }; diff --git a/influxdb3/src/commands/create/token.rs b/influxdb3/src/commands/create/token.rs index 404e0de1fe7..b0f07856e65 100644 --- a/influxdb3/src/commands/create/token.rs +++ b/influxdb3/src/commands/create/token.rs @@ -75,12 +75,13 @@ pub enum TokenOutputFormat { #[derive(Parser, Clone, Debug)] pub struct InfluxDb3ServerConfig { - /// The host URL of the running InfluxDB 3 Core server + /// The host URL of the running InfluxDB 3 Core server. #[clap( name = "host", + short = 'H', long = "host", - default_value = "http://127.0.0.1:8181", - env = "INFLUXDB3_HOST_URL" + env = "INFLUXDB3_HOST_URL", + default_value = "http://127.0.0.1:8181" )] pub host_url: Url, @@ -96,8 +97,16 @@ pub struct InfluxDb3ServerConfig { #[derive(Parser, Debug)] pub struct CreateAdminTokenConfig { - /// Operator token will be regenerated when this is set - #[clap(name = "regenerate", long = "regenerate")] + /// Operator token will be regenerated when this is set. + /// + /// When used without --host, connects to the default server endpoint (port 8181). + /// To use the admin token recovery endpoint, specify --host with the recovery endpoint address. + #[clap( + name = "regenerate", + long = "regenerate", + help = "Regenerate the operator token. By default connects to the main server (http://127.0.0.1:8181). + To use the admin token recovery endpoint, specify --host with the recovery endpoint address" + )] pub regenerate: bool, // for named admin and permission tokens this is mandatory but not for admin tokens @@ -208,8 +217,16 @@ impl Args for CreateTokenConfig { impl CommandFactory for CreateTokenConfig { fn command() -> clap::Command { - let admin_sub_cmd = - ClapCommand::new("--admin").override_usage("influxdb3 create token --admin [OPTIONS]"); + let admin_sub_cmd = ClapCommand::new("--admin") + .override_usage("influxdb3 create token --admin [OPTIONS]") + .about("Create or regenerate an admin token") + .long_about( + "Create or regenerate an admin token.\n\n\ + When using --regenerate, the command connects to the default server \ + endpoint (http://127.0.0.1:8181) unless you specify a different --host. \ + To use the admin token recovery endpoint, specify --host with the \ + recovery endpoint address configured via --admin-token-recovery-http-bind.", + ); let all_args = CreateAdminTokenConfig::as_args(); let admin_sub_cmd = admin_sub_cmd.args(all_args); diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 6a4383565a8..f820710dc56 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -28,7 +28,7 @@ use influxdb3_server::{ CommonServerState, CreateServerArgs, Server, http::HttpApi, query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl}, - serve, + serve, serve_admin_token_recovery_endpoint, }; use influxdb3_shutdown::{ShutdownManager, wait_for_signal}; use influxdb3_sys_events::SysEventStore; @@ -82,6 +82,9 @@ pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb3"; /// The default bind address for the HTTP API. pub const DEFAULT_HTTP_BIND_ADDR: &str = "0.0.0.0:8181"; +/// The default bind address for admin token recovery HTTP API. +pub const DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR: &str = "127.0.0.1:8182"; + pub const DEFAULT_TELEMETRY_ENDPOINT: &str = "https://telemetry.v3.influxdata.com"; #[derive(Debug, Error)] @@ -125,6 +128,9 @@ pub enum Error { #[error("lost HTTP/gRPC service")] LostHttpGrpc, + #[error("lost admin token recovery service")] + LostAdminTokenRecovery, + #[error("tls requires both a cert and a key file to be passed in to work")] NoCertOrKeyFile, } @@ -179,6 +185,19 @@ pub struct Config { )] pub http_bind_address: SocketAddr, + /// Enable admin token recovery endpoint on the specified address. + /// Use flag alone for default address (127.0.0.1:8182) or provide a custom address. + /// WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution! + #[clap( + long = "admin-token-recovery-http-bind", + env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR", + num_args = 0..=1, + default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR, + help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address", + action, + )] + pub admin_token_recovery_bind_address: Option, + /// Size of memory pool used during query exec, in megabytes. /// /// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`). @@ -450,6 +469,16 @@ pub struct Config { )] pub tcp_listener_file_path: Option, + /// Provide a file path to write the address that the admin recovery endpoint mounted server is listening on to + /// + /// This is mainly intended for testing purposes and is not considered stable. + #[clap( + long = "admin-token-recovery-tcp-listener-file-path", + env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_TCP_LISTENER_FILE_PATH", + hide = true + )] + pub admin_token_recovery_tcp_listener_file_path: Option, + #[clap( long = "wal-replay-concurrency-limit", env = "INFLUXDB3_WAL_REPLAY_CONCURRENCY_LIMIT" @@ -902,6 +931,15 @@ pub async fn command(config: Config) -> Result<()> { .await .map_err(Error::BindAddress)?; + // Only create recovery listener if explicitly enabled + let admin_token_recovery_listener = if let Some(addr) = config.admin_token_recovery_bind_address + { + info!(%addr, "Admin token recovery endpoint enabled - WARNING: This allows unauthenticated admin token regeneration!"); + Some(TcpListener::bind(*addr).await.map_err(Error::BindAddress)?) + } else { + None + }; + let processing_engine = ProcessingEngineManagerImpl::new( setup_processing_engine_env_manager(&config.processing_engine_config), write_buffer.catalog(), @@ -945,6 +983,19 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&authorizer), )); + // Only create recovery server if listener was created + let admin_token_recovery_server = admin_token_recovery_listener.map(|listener| { + Server::new(CreateServerArgs { + common_state: common_state.clone(), + http: Arc::clone(&http), + authorizer: Arc::clone(&authorizer), + listener, + cert_file: cert_file.clone(), + key_file: key_file.clone(), + tls_minimum_version: config.tls_minimum_version.into(), + }) + }); + let server = Server::new(CreateServerArgs { common_state, http, @@ -987,6 +1038,7 @@ pub async fn command(config: Config) -> Result<()> { ?paths_without_authz, "setting up server with authz disabled for paths" ); + let frontend = serve( server, frontend_shutdown.clone(), @@ -998,13 +1050,33 @@ pub async fn command(config: Config) -> Result<()> { .fuse(); let backend = shutdown_manager.join().fuse(); + // Only start recovery endpoint if server was created + let recovery_endpoint_enabled = admin_token_recovery_server.is_some(); + let recovery_frontend = if let Some(recovery_server) = admin_token_recovery_server { + futures::future::Either::Left( + serve_admin_token_recovery_endpoint( + recovery_server, + frontend_shutdown.clone(), + config.admin_token_recovery_tcp_listener_file_path, + ) + .fuse(), + ) + } else { + // Provide a future that never completes if recovery endpoint is disabled + futures::future::Either::Right( + futures::future::pending::>().fuse(), + ) + }; + // pin_mut constructs a Pin<&mut T> from a T by preventing moving the T // from the current stack frame and constructing a Pin<&mut T> to it pin_mut!(signal); pin_mut!(frontend); pin_mut!(backend); + pin_mut!(recovery_frontend); let mut res = Ok(()); + let mut recovery_endpoint_active = recovery_endpoint_enabled; // Graceful shutdown can be triggered by sending SIGINT or SIGTERM to the // process, or by a background task exiting - most likely with an error @@ -1029,16 +1101,49 @@ pub async fn command(config: Config) -> Result<()> { res = res.and(Err(Error::LostBackend)); } // HTTP/gRPC frontend has stopped - result = frontend => match result { - Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP/gRPC service shutdown"), - Ok(_) => { - error!("early HTTP/gRPC service exit"); - res = res.and(Err(Error::LostHttpGrpc)); - }, - Err(error) => { - error!("HTTP/gRPC error"); - res = res.and(Err(Error::Server(error))); + result = frontend => { + match result { + Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP/gRPC service shutdown"), + Ok(_) => { + error!("early HTTP/gRPC service exit"); + res = res.and(Err(Error::LostHttpGrpc)); + }, + Err(error) => { + error!("HTTP/gRPC error"); + res = res.and(Err(Error::Server(error))); + } + } + } + recovery_result = recovery_frontend => { + // Only process recovery endpoint results if it was actually enabled and active + if recovery_endpoint_enabled && recovery_endpoint_active { + match recovery_result { + Ok(_) if frontend_shutdown.is_cancelled() => { + info!("Admin token recovery service shutdown"); + // Only break if the main shutdown was also requested + if frontend.is_terminated() { + break; + } + } + Ok(_) => { + // Recovery endpoint can shut down normally after token regeneration + // This is expected behavior and should not cause an error + info!("Admin token recovery service exited normally after token regeneration"); + recovery_endpoint_active = false; + // Since recovery_frontend is a FusedFuture, it won't be polled again + // after completion, so we don't need to do anything else + // Continue the loop - do NOT break or call shutdown + continue; // Skip shutdown_manager.shutdown() for this iteration + } + Err(error) => { + error!(%error, "admin token recovery service error"); + res = res.and(Err(Error::Server(error))); + // Continue running the main server even if recovery endpoint had an error + } + } } + // If recovery endpoint was disabled, this branch will never be taken again + // because pending() futures never complete } } shutdown_manager.shutdown() diff --git a/influxdb3/src/help/serve_all.txt b/influxdb3/src/help/serve_all.txt index 3da47f05abb..630934da58e 100644 --- a/influxdb3/src/help/serve_all.txt +++ b/influxdb3/src/help/serve_all.txt @@ -36,6 +36,11 @@ Examples: list of resources. Valid values are health, ping, and metrics. To disable auth for multiple resources pass in a list, eg. `--disable-authz health,ping` + --admin-token-recovery-http-bind + Address for HTTP API for admin token recovery requests [default: 127.0.0.1:8182] + WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution! + [env: INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR] + {} --data-dir Location to store files locally [env: INFLUXDB3_DB_DIR=] diff --git a/influxdb3/tests/cli/admin_token.rs b/influxdb3/tests/cli/admin_token.rs index 42d04313504..5b5d8a26ed2 100644 --- a/influxdb3/tests/cli/admin_token.rs +++ b/influxdb3/tests/cli/admin_token.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use crate::cli::api::run_cmd_with_result; use crate::server::{ConfigProvider, TestServer, parse_token}; use observability_deps::tracing::info; use serde_json::Value; @@ -123,6 +124,72 @@ async fn test_regenerate_admin_token() { assert_contains!(&res, "Database \"sample_db\" created successfully"); } +#[test_log::test(tokio::test)] +async fn test_regenerate_admin_token_without_auth_using_token_recovery_service() { + let mut server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .with_recovery_endpoint() + .spawn() + .await; + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + // create the token manually + let result = server + .run(vec!["create", "token", "--admin"], args) + .unwrap(); + + // already has admin token, so it cannot be created again + assert_contains!(&result, "New token created successfully!"); + + let admin_token = parse_token(result); + + // regenerating token is not allowed without admin token going through the main http server + let result = server + .run_with_confirmation( + vec!["create", "token", "--admin"], + &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + + assert_contains!(&result, "Failed to create token"); + + // regenerate token using the admin token recovery server + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + + let old_token = admin_token.clone(); + let new_token = parse_token(result); + assert!(old_token != new_token); + + // old token cannot access + let res = server + .set_token(Some(admin_token)) + .create_database("sample_db") + .run() + .err() + .unwrap() + .to_string(); + assert_contains!(&res, "401 Unauthorized"); + + // new token should allow + server.set_token(Some(new_token)); + let res = server.create_database("sample_db").run().unwrap(); + assert_contains!(&res, "Database \"sample_db\" created successfully"); +} + #[test_log::test(tokio::test)] async fn test_delete_token() { let server = TestServer::configure() @@ -528,3 +595,331 @@ async fn test_check_named_admin_token_expiry_works() { .to_string(); assert_contains!(&res, "[401 Unauthorized]"); } + +#[test_log::test(tokio::test)] +async fn test_recovery_service_only_accepts_regenerate_endpoint() { + let server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .with_recovery_endpoint() + .spawn() + .await; + + // First create an admin token + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + let result = server + .run(vec!["create", "token", "--admin"], args) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + + // Try to use recovery service for other operations - should fail + // Test creating a database through recovery port + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = server + .run( + vec!["create", "database", "--host", &recovery_addr, "test_db"], + args, + ) + .unwrap_err() + .to_string(); + // Should fail because recovery port doesn't support database operations + assert!(result.contains("error") || result.contains("failed")); + + // Test listing tokens through recovery port + let result = server + .run(vec!["show", "tokens", "--host", &recovery_addr], args) + .unwrap_err() + .to_string(); + assert!(result.contains("error") || result.contains("failed")); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_service_with_auth_disabled() { + // Start server without auth + let server = TestServer::configure() + .with_recovery_endpoint() + .spawn() + .await; + + // Try to use recovery service when auth is disabled - should fail + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); + + // Should get an error - recovery service runs but there's no admin token to regenerate + assert_contains!(&result, "missing admin token, cannot update"); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_service_does_not_affect_named_admin_tokens() { + let mut server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .with_recovery_endpoint() + .spawn() + .await; + + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + + // Create operator token + let result = server + .run(vec!["create", "token", "--admin"], args) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let operator_token = parse_token(result); + + // Create a named admin token + let result = server + .run( + vec![ + "create", + "token", + "--admin", + "--name", + "test_admin", + "--token", + &operator_token, + ], + args, + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let named_admin_token = parse_token(result); + + // Regenerate operator token via recovery service + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let new_operator_token = parse_token(result); + + // Verify old operator token is invalid + server.set_token(Some(operator_token)); + let res = server + .create_database("test_db1") + .run() + .err() + .unwrap() + .to_string(); + assert_contains!(&res, "401 Unauthorized"); + + // Verify named admin token still works + server.set_token(Some(named_admin_token.clone())); + let res = server.create_database("test_db2").run().unwrap(); + assert_contains!(&res, "Database \"test_db2\" created successfully"); + + // Verify new operator token works + server.set_token(Some(new_operator_token)); + let res = server.create_database("test_db3").run().unwrap(); + assert_contains!(&res, "Database \"test_db3\" created successfully"); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_service_cannot_create_new_admin_token() { + let server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .with_recovery_endpoint() + .spawn() + .await; + + // Try to create admin token through recovery service (without --regenerate flag) + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = server + .run( + vec!["create", "token", "--admin", "--host", &recovery_addr], + &["--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap_err() + .to_string(); + + // Should fail - recovery service only supports regeneration + assert!(result.contains("error") || result.contains("failed")); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_endpoint_disabled_by_default() { + // Start server without recovery endpoint enabled + let _server = TestServer::configure().with_capture_logs().spawn().await; + + // Try to connect to the recovery endpoint on default port + let client = reqwest::Client::new(); + let recovery_url = "http://127.0.0.1:8182/api/v3/configure/token/admin/regenerate"; + + // This should fail since the recovery endpoint is not enabled + let result = client.post(recovery_url).send().await; + + // Expect connection refused or similar error + assert!( + result.is_err(), + "Recovery endpoint should not be accessible when not explicitly enabled" + ); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_endpoint_auto_shutdown_after_regeneration() { + // This test verifies that the recovery endpoint works and the main server continues running + let mut server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .with_recovery_endpoint() + .spawn() + .await; + + // Create the initial admin token + let result = server + .run( + vec!["create", "token", "--admin"], + &["--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let initial_token = parse_token(result); + + // Use the recovery endpoint to regenerate the token + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let new_token = parse_token(result); + + // Use the recovery endpoint to regenerate the token again, recovery server should have been + // shutdown + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); + assert_contains!(result, "ConnectError"); + + // Verify tokens are different + assert_ne!( + initial_token, new_token, + "Token should have been regenerated" + ); + + // Update the server's token to the new one + server.set_token(Some(new_token.clone())); + + // Verify the main server is still running and new token works + let result = server.create_database("test_db").run(); + assert!( + result.is_ok(), + "Main server should still be running with new token" + ); + + // Verify old token no longer works + server.set_token(Some(initial_token)); + let result = server.create_database("test_db2").run(); + assert!(result.is_err(), "Old token should no longer work"); +} + +#[test_log::test(tokio::test)] +async fn test_main_server_continues_after_recovery_endpoint_shutdown() { + // This test specifically verifies that the main server continues running + // after the recovery endpoint auto-shuts down + let mut server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .with_recovery_endpoint() + .spawn() + .await; + + // Create the initial admin token + let result = server + .run( + vec!["create", "token", "--admin"], + &["--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let initial_token = parse_token(result); + + // Set the token for future operations + server.set_token(Some(initial_token.clone())); + + // Create a database before using recovery endpoint + let result = server.create_database("before_recovery_db").run(); + assert!( + result.is_ok(), + "Should be able to create database before recovery" + ); + + // Use the recovery endpoint to regenerate the token + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let new_token = parse_token(result); + + // Verify tokens are different + assert_ne!( + initial_token, new_token, + "Token should have been regenerated" + ); + + // Update to new token + server.set_token(Some(new_token.clone())); + + // Wait a bit to ensure recovery endpoint has shut down + tokio::time::sleep(Duration::from_millis(500)).await; + + // Simple check - verify server is still responding by creating one more database + let result = server.create_database("after_recovery_db").run(); + assert!( + result.is_ok(), + "Main server should still be running after recovery endpoint shutdown" + ); +} diff --git a/influxdb3/tests/cli/api.rs b/influxdb3/tests/cli/api.rs index 237821e1d65..5a72299e5b9 100644 --- a/influxdb3/tests/cli/api.rs +++ b/influxdb3/tests/cli/api.rs @@ -36,43 +36,54 @@ impl TestServer { command_args.push("--token"); command_args.push(token); } - let mut child_process = Command::cargo_bin("influxdb3")? - .args(&command_args) - .args(args) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - if let Some(input) = input { - let input = input.to_string(); - let mut stdin = child_process.stdin.take().expect("failed to open stdin"); - thread::spawn(move || { - stdin - .write_all(input.as_bytes()) - .expect("cannot write confirmation msg to stdin"); - }); - } - let output = child_process.wait_with_output()?; - - if !output.status.success() { - println!( - "failed to run influxdb3 {} {}", - command_args.join(" "), - args.join(" ") - ); - bail!("{}", String::from_utf8_lossy(&output.stderr)); - } - - Ok(String::from_utf8(output.stdout)?.trim().into()) + run_cmd_with_result(args, input, command_args) } + pub fn run(&self, commands: Vec<&str>, args: &[&str]) -> Result { self.run_with_options(commands, args, None) } + pub fn run_with_confirmation(&self, commands: Vec<&str>, args: &[&str]) -> Result { self.run_with_options(commands, args, Some("yes")) } } + +pub(super) fn run_cmd_with_result( + args: &[&str], + input: Option<&str>, + command_args: Vec<&str>, +) -> std::result::Result { + let mut child_process = Command::cargo_bin("influxdb3")? + .args(&command_args) + .args(args) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + if let Some(input) = input { + let input = input.to_string(); + let mut stdin = child_process.stdin.take().expect("failed to open stdin"); + thread::spawn(move || { + stdin + .write_all(input.as_bytes()) + .expect("cannot write confirmation msg to stdin"); + }); + } + let output = child_process.wait_with_output()?; + + if !output.status.success() { + println!( + "failed to run influxdb3 {} {}", + command_args.join(" "), + args.join(" ") + ); + bail!("{}", String::from_utf8_lossy(&output.stderr)); + } + + Ok(String::from_utf8(output.stdout)?.trim().into()) +} + impl CreateDatabaseQuery<'_> { pub fn run(self) -> Result { self.server.run( diff --git a/influxdb3/tests/server/mod.rs b/influxdb3/tests/server/mod.rs index 43b3be48b03..4bdc8e897a4 100644 --- a/influxdb3/tests/server/mod.rs +++ b/influxdb3/tests/server/mod.rs @@ -53,6 +53,9 @@ pub trait ConfigProvider: Send + Sync + 'static { /// Get if logs should be captured fn capture_logs(&self) -> bool; + /// Get if recovery endpoint should be enabled + fn recovery_endpoint_enabled(&self) -> bool; + /// Spawn a new [`TestServer`] with this configuration /// /// This will run the `influxdb3 serve` command and bind its HTTP address to a random port @@ -82,6 +85,7 @@ pub struct TestConfig { disable_authz: Vec, gen1_duration: Option, capture_logs: bool, + enable_recovery_endpoint: bool, } impl TestConfig { @@ -107,6 +111,12 @@ impl TestConfig { self } + /// Enable the admin token recovery endpoint + pub fn with_recovery_endpoint(mut self) -> Self { + self.enable_recovery_endpoint = true; + self + } + /// Set a host identifier prefix on the spawned [`TestServer`] pub fn with_node_id>(mut self, node_id: S) -> Self { self.node_id = Some(node_id.into()); @@ -247,6 +257,10 @@ impl ConfigProvider for TestConfig { fn capture_logs(&self) -> bool { self.capture_logs } + + fn recovery_endpoint_enabled(&self) -> bool { + self.enable_recovery_endpoint + } } /// A running instance of the `influxdb3 serve` process @@ -266,6 +280,7 @@ impl ConfigProvider for TestConfig { pub struct TestServer { auth_token: Option, bind_addr: String, + admin_token_recovery_bind_addr: Option, server_process: Child, http_client: reqwest::Client, stdout: Option>>, @@ -318,8 +333,13 @@ impl TestServer { let tmp_dir = TempDir::new().unwrap(); let tmp_dir_path = tmp_dir.keep(); let tcp_addr_file = tmp_dir_path.join("tcp-listener"); + + let admin_token_recover_tmp_dir = TempDir::new().unwrap(); + let admin_token_recover_tmp_dir_path = admin_token_recover_tmp_dir.keep(); + let tcp_addr_file_2 = admin_token_recover_tmp_dir_path.join("tcp-listener"); + let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command"); - let command = command + let mut command = command .arg("serve") .arg("--disable-telemetry-upload") .args(["--http-bind", "0.0.0.0:0"]) @@ -330,7 +350,21 @@ impl TestServer { tcp_addr_file .to_str() .expect("valid tcp listener file path"), - ]) + ]); + + // Only add recovery endpoint args if explicitly enabled + if config.recovery_endpoint_enabled() { + command = command + .args(["--admin-token-recovery-http-bind", "0.0.0.0:0"]) + .args([ + "--admin-token-recovery-tcp-listener-file-path", + tcp_addr_file_2 + .to_str() + .expect("valid tcp listener file path"), + ]); + } + + let command = command .args([ "--tls-cert", if config.bad_tls() { @@ -418,27 +452,12 @@ impl TestServer { } } - let bind_addr = loop { - match tokio::fs::File::open(&tcp_addr_file).await { - Ok(mut file) => { - let mut buf = String::new(); - file.read_to_string(&mut buf) - .await - .expect("read from tcp listener file"); - if buf.is_empty() { - tokio::time::sleep(Duration::from_millis(10)).await; - continue; - } else { - break buf; - } - } - Err(error) if matches!(error.kind(), std::io::ErrorKind::NotFound) => { - tokio::time::sleep(Duration::from_millis(10)).await; - } - Err(error) => { - panic!("unexpected error while checking for tcp listener file: {error:?}") - } - } + let bind_addr = find_bind_addr(tcp_addr_file).await; + + let admin_token_recovery_bind_addr = if config.recovery_endpoint_enabled() { + Some(find_bind_addr(tcp_addr_file_2).await) + } else { + None }; let http_client = reqwest::ClientBuilder::new() @@ -455,6 +474,7 @@ impl TestServer { let server = Self { auth_token: config.auth_token().map(|s| s.to_owned()), bind_addr, + admin_token_recovery_bind_addr, server_process, http_client, stdout: stdout_handle, @@ -490,14 +510,30 @@ impl TestServer { ) } + /// Get the URL of admin token recovery service for use with an HTTP client + pub fn admin_token_recovery_client_addr(&self) -> String { + match &self.admin_token_recovery_bind_addr { + Some(addr) => format!("https://localhost:{}", addr.split(':').nth(1).unwrap()), + None => panic!( + "admin_token_recovery_client_addr called on TestServer without recovery endpoint enabled. Use .with_recovery_endpoint() when configuring the test server." + ), + } + } + + /// Check if the recovery endpoint is enabled + pub fn has_recovery_endpoint(&self) -> bool { + self.admin_token_recovery_bind_addr.is_some() + } + /// Get the token for the server pub fn token(&self) -> Option<&String> { self.auth_token.as_ref() } /// Set the token for the server - pub fn set_token(&mut self, token: Option) { + pub fn set_token(&mut self, token: Option) -> &mut Self { self.auth_token = token; + self } /// Get a [`FlightSqlClient`] for making requests to the running service over gRPC @@ -631,6 +667,31 @@ impl TestServer { } } +async fn find_bind_addr(tcp_addr_file_2: std::path::PathBuf) -> String { + loop { + match tokio::fs::File::open(&tcp_addr_file_2).await { + Ok(mut file) => { + let mut buf = String::new(); + file.read_to_string(&mut buf) + .await + .expect("read from tcp listener file"); + if buf.is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; + continue; + } else { + break buf; + } + } + Err(error) if matches!(error.kind(), std::io::ErrorKind::NotFound) => { + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(error) => { + panic!("unexpected error while checking for tcp listener file: {error:?}") + } + } + } +} + impl Drop for TestServer { fn drop(&mut self) { self.kill(); diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 49b08388c64..8a3d9918072 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -558,6 +558,24 @@ pub struct HttpApi { legacy_write_param_unifier: SingleTenantRequestUnifier, } +/// Wrapper for HttpApi used by the recovery endpoint that includes a shutdown token +pub(crate) struct RecoveryHttpApi { + http_api: Arc, + cancellation_token: tokio_util::sync::CancellationToken, +} + +impl RecoveryHttpApi { + pub(crate) fn new( + http_api: Arc, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Self { + Self { + http_api, + cancellation_token, + } + } +} + impl HttpApi { pub fn new( common_state: CommonServerState, @@ -1857,6 +1875,78 @@ async fn record_batch_stream_to_body( } } +/// This is used to trigger a shutdown when it's dropped. +struct ShutdownTrigger { + token: tokio_util::sync::CancellationToken, +} + +impl ShutdownTrigger { + fn new(token: tokio_util::sync::CancellationToken) -> Self { + Self { token } + } +} + +impl Drop for ShutdownTrigger { + fn drop(&mut self) { + self.token.cancel(); + } +} + +pub(crate) async fn route_admin_token_recovery_request( + recovery_api: Arc, + req: Request, +) -> Result { + let method = req.method().clone(); + let uri = req.uri().clone(); + trace!(request = ?req,"Processing request"); + let content_length = req.headers().get("content-length").cloned(); + + let response = match (method.clone(), uri.path()) { + (Method::POST, all_paths::API_V3_CONFIGURE_ADMIN_TOKEN_REGENERATE) => { + info!("Regenerating admin token without password through token recovery API request"); + let result = recovery_api.http_api.regenerate_admin_token(req).await; + + // If token regeneration was successful, trigger shutdown of the recovery endpoint + if let Ok(response) = result { + info!("Admin token regenerated successfully, shutting down recovery endpoint"); + let cancellation_token = recovery_api.cancellation_token.clone(); + let mut res_builder = ResponseBuilder::new(); + let extensions = res_builder.extensions_mut().unwrap(); + let shutdown_trigger = ShutdownTrigger::new(cancellation_token); + extensions.insert(shutdown_trigger); + + Ok(res_builder + .status(response.status()) + .body(response.into_body()) + .unwrap()) + } else { + result + } + } + _ => { + let body = bytes_to_response_body("not found"); + Ok(ResponseBuilder::new() + .status(StatusCode::NOT_FOUND) + .body(body) + .unwrap()) + } + }; + + match response { + Ok(mut response) => { + response + .headers_mut() + .insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*")); + debug!(?response, "Successfully processed request"); + Ok(response) + } + Err(error) => { + error!(%error, %method, path = uri.path(), ?content_length, "Error while handling request"); + Ok(error.into_response()) + } + } +} + pub(crate) async fn route_request( http_server: Arc, mut req: Request, diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index c41a4e88921..88ff38d2db9 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -21,8 +21,10 @@ mod system_tables; use crate::grpc::make_flight_server; use crate::http::HttpApi; +use crate::http::RecoveryHttpApi; use crate::http::route_request; use authz::Authorizer; +use http::route_admin_token_recovery_request; use hyper::server::conn::AddrIncoming; use hyper::server::conn::AddrStream; use hyper::server::conn::Http; @@ -54,6 +56,7 @@ use trace_http::metrics::RequestMetrics; use trace_http::tower::TraceLayer; const TRACE_HTTP_SERVER_NAME: &str = "influxdb3_http"; +const ADMIN_TOKEN_RECOVERY_TRACE_HTTP_SERVER_NAME: &str = "influxdb3_token_recovery_http"; const TRACE_GRPC_SERVER_NAME: &str = "influxdb3_grpc"; #[derive(Debug, Error)] @@ -174,6 +177,122 @@ impl<'a> Server<'a> { } } +/// Creates HTTP trace layer +fn create_http_trace_layer(common_state: &CommonServerState, server_name: &str) -> TraceLayer { + let http_metrics = + RequestMetrics::new(Arc::clone(&common_state.metrics), MetricFamily::HttpServer); + TraceLayer::new( + common_state.trace_header_parser.clone(), + Arc::new(http_metrics), + common_state.trace_collector().clone(), + server_name, + trace_http::tower::ServiceProtocol::Http, + ) +} + +/// Creates gRPC trace layer +fn create_grpc_trace_layer(common_state: &CommonServerState) -> TraceLayer { + let grpc_metrics = + RequestMetrics::new(Arc::clone(&common_state.metrics), MetricFamily::GrpcServer); + TraceLayer::new( + common_state.trace_header_parser.clone(), + Arc::new(grpc_metrics), + common_state.trace_collector().clone(), + TRACE_GRPC_SERVER_NAME, + trace_http::tower::ServiceProtocol::Grpc, + ) +} + +pub async fn serve_admin_token_recovery_endpoint( + server: Server<'_>, + shutdown: CancellationToken, + tcp_listener_file_path: Option, +) -> Result<()> { + let http_trace_layer = create_http_trace_layer( + &server.common_state, + ADMIN_TOKEN_RECOVERY_TRACE_HTTP_SERVER_NAME, + ); + + // Create a dedicated shutdown token for the recovery endpoint + // This allows us to shut down just the recovery endpoint after token regeneration + let recovery_shutdown = CancellationToken::new(); + + // Create the recovery API wrapper with the shutdown token + let recovery_api = Arc::new(RecoveryHttpApi::new( + Arc::clone(&server.http), + recovery_shutdown.clone(), + )); + + if let (Some(key_file), Some(cert_file)) = (&server.key_file, &server.cert_file) { + let listener = server.listener; + let tls_min = server.tls_minimum_version; + + let (addr, certs, key) = setup_tls(listener, key_file, cert_file)?; + info!( + address = %addr.local_addr(), + "starting admin token recovery endpoint with TLS on", + ); + + let rest_service = hyper::service::make_service_fn(move |_| { + let recovery_api = Arc::clone(&recovery_api); + let service = service_fn(move |req: hyper::Request| { + route_admin_token_recovery_request(Arc::clone(&recovery_api), req) + }); + let service = http_trace_layer.layer(service); + futures::future::ready(Ok::<_, Infallible>(service)) + }); + + write_address_to_file(tcp_listener_file_path, &addr).await?; + + let acceptor = hyper_rustls::TlsAcceptor::builder() + .with_tls_config( + ServerConfig::builder_with_protocol_versions(tls_min) + .with_no_client_auth() + .with_single_cert(certs, key) + .unwrap(), + ) + .with_all_versions_alpn() + .with_incoming(addr); + + // Use both the main shutdown and recovery shutdown tokens + tokio::select! { + res = hyper::server::Server::builder(acceptor).serve(rest_service) => res?, + _ = shutdown.cancelled() => {}, + _ = recovery_shutdown.cancelled() => { + info!("Admin token recovery endpoint shutting down after token regeneration"); + }, + } + } else { + let rest_service = hyper::service::make_service_fn(|_| { + let recovery_api = Arc::clone(&recovery_api); + let service = service_fn(move |req: hyper::Request| { + route_admin_token_recovery_request(Arc::clone(&recovery_api), req) + }); + let service = http_trace_layer.layer(service); + futures::future::ready(Ok::<_, Infallible>(service)) + }); + + let addr = AddrIncoming::from_listener(server.listener)?; + info!( + address = %addr.local_addr(), + "starting admin token recovery endpoint on", + ); + + // Use both the main shutdown and recovery shutdown tokens + tokio::select! { + res = hyper::server::Builder::new(addr, Http::new()) + .tcp_nodelay(true) + .serve(rest_service) => res?, + _ = shutdown.cancelled() => {}, + _ = recovery_shutdown.cancelled() => { + info!("Admin token recovery endpoint shutting down after token regeneration"); + }, + } + } + + Ok(()) +} + pub async fn serve( server: Server<'_>, shutdown: CancellationToken, @@ -182,38 +301,24 @@ pub async fn serve( paths_without_authz: &'static Vec<&'static str>, tcp_listener_file_path: Option, ) -> Result<()> { - let grpc_metrics = RequestMetrics::new( - Arc::clone(&server.common_state.metrics), - MetricFamily::GrpcServer, - ); - let grpc_trace_layer = TraceLayer::new( - server.common_state.trace_header_parser.clone(), - Arc::new(grpc_metrics), - server.common_state.trace_collector().clone(), - TRACE_GRPC_SERVER_NAME, - trace_http::tower::ServiceProtocol::Grpc, - ); + let grpc_trace_layer = create_grpc_trace_layer(&server.common_state); let grpc_service = grpc_trace_layer.layer(make_flight_server( Arc::clone(&server.http.query_executor), Some(server.authorizer()), )); - let http_metrics = RequestMetrics::new( - Arc::clone(&server.common_state.metrics), - MetricFamily::HttpServer, - ); - let http_trace_layer = TraceLayer::new( - server.common_state.trace_header_parser.clone(), - Arc::new(http_metrics), - server.common_state.trace_collector().clone(), - TRACE_HTTP_SERVER_NAME, - trace_http::tower::ServiceProtocol::Http, - ); + let http_trace_layer = create_http_trace_layer(&server.common_state, TRACE_HTTP_SERVER_NAME); - if let (Some(key_file), Some(cert_file)) = (&server.key_file, &server.cert_file) { + let key_file = server.key_file.clone(); + let cert_file = server.cert_file.clone(); + + if let (Some(key_file), Some(cert_file)) = (key_file.as_ref(), cert_file.as_ref()) { + let listener = server.listener; + let tls_min = server.tls_minimum_version; + let http = Arc::clone(&server.http); let rest_service = hyper::service::make_service_fn(|conn: &TlsStream| { let remote_addr = conn.io().map(|conn| conn.remote_addr()); - let http_server = Arc::clone(&server.http); + let http_server = Arc::clone(&http); let service = service_fn(move |mut req: hyper::Request| { req.extensions_mut().insert(remote_addr); route_request( @@ -228,22 +333,7 @@ pub async fn serve( }); let hybrid_make_service = hybrid(rest_service, grpc_service); - let mut addr = AddrIncoming::from_listener(server.listener)?; - addr.set_nodelay(true); - let certs = { - let cert_file = File::open(cert_file).unwrap(); - let mut buf_reader = BufReader::new(cert_file); - rustls_pemfile::certs(&mut buf_reader) - .collect::, _>>() - .unwrap() - }; - let key = { - let key_file = File::open(key_file).unwrap(); - let mut buf_reader = BufReader::new(key_file); - rustls_pemfile::private_key(&mut buf_reader) - .unwrap() - .unwrap() - }; + let (addr, certs, key) = setup_tls(listener, key_file, cert_file)?; let timer_end = Instant::now(); let startup_time = timer_end.duration_since(startup_timer); @@ -253,21 +343,18 @@ pub async fn serve( startup_time.as_millis() ); - if let Some(path) = tcp_listener_file_path { - let mut f = tokio::fs::File::create_new(path).await?; - let _ = f.write(addr.local_addr().to_string().as_bytes()).await?; - f.flush().await?; - } + write_address_to_file(tcp_listener_file_path, &addr).await?; let acceptor = hyper_rustls::TlsAcceptor::builder() .with_tls_config( - ServerConfig::builder_with_protocol_versions(server.tls_minimum_version) + ServerConfig::builder_with_protocol_versions(tls_min) .with_no_client_auth() .with_single_cert(certs, key) .unwrap(), ) .with_all_versions_alpn() .with_incoming(addr); + hyper::server::Server::builder(acceptor) .serve(hybrid_make_service) .with_graceful_shutdown(shutdown.cancelled()) @@ -316,6 +403,52 @@ pub async fn serve( Ok(()) } +// This function is only called when running tests to get hold of the server port details as the +// tests start on arbitrary port by passing in 0 as port. This is also called when setting up TLS +// as the tests seem to use TLS by default. +async fn write_address_to_file( + tcp_listener_file_path: Option, + addr: &AddrIncoming, +) -> Result<(), Error> { + if let Some(path) = tcp_listener_file_path { + let mut f = tokio::fs::File::create_new(path).await?; + let _ = f.write(addr.local_addr().to_string().as_bytes()).await?; + f.flush().await?; + }; + Ok(()) +} + +fn setup_tls( + tcp_listener: TcpListener, + key_file: &PathBuf, + cert_file: &PathBuf, +) -> Result< + ( + AddrIncoming, + Vec>, + rustls::pki_types::PrivateKeyDer<'static>, + ), + Error, +> { + let mut addr = AddrIncoming::from_listener(tcp_listener)?; + addr.set_nodelay(true); + let certs = { + let cert_file = File::open(cert_file).unwrap(); + let mut buf_reader = BufReader::new(cert_file); + rustls_pemfile::certs(&mut buf_reader) + .collect::, _>>() + .unwrap() + }; + let key = { + let key_file = File::open(key_file).unwrap(); + let mut buf_reader = BufReader::new(key_file); + rustls_pemfile::private_key(&mut buf_reader) + .unwrap() + .unwrap() + }; + Ok((addr, certs, key)) +} + #[cfg(test)] mod tests { use crate::query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl};