From 55fcee7fc49173a76ad5376a16a806c979939e8b Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Tue, 8 Jul 2025 11:26:29 +0100 Subject: [PATCH 1/4] feat: additional server setup for admin token recovery - new server to only serve admin token regeneration without an admin token has been added - minor refactors to allow reuse of some of the utilities like trace layer for metrics moved to their own functions to allow them to be instantiated for both servers - tests added to check if both the new server works right for regenerating token and also ensure none of the other functionalities are available on the admin token recovery server closes: https://github.com/influxdata/influxdb/issues/26330 --- influxdb3/src/commands/create.rs | 16 +- influxdb3/src/commands/create/token.rs | 55 ++++++- influxdb3/src/commands/serve.rs | 61 +++++++- influxdb3/tests/cli/admin_token.rs | 198 ++++++++++++++++++++++++ influxdb3/tests/cli/api.rs | 79 ++++++---- influxdb3/tests/server/mod.rs | 78 +++++++--- influxdb3_server/src/http.rs | 37 +++++ influxdb3_server/src/lib.rs | 202 +++++++++++++++++++------ 8 files changed, 617 insertions(+), 109 deletions(-) diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 7549e8c392b..0f7dcf951cb 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -78,11 +78,17 @@ impl Config { }) => (host_url, auth_token, ca_cert), SubCommand::Token(create_token_config) => { let host_settings = create_token_config.get_connection_settings()?; - ( - &host_settings.host_url, - &host_settings.auth_token, - &host_settings.ca_cert, - ) + let effective_host_url = + create_token_config.get_effective_host_url(&host_settings.host_url); + // We need to return references, so we'll handle this differently + return Ok({ + let mut client = + Client::new(effective_host_url, host_settings.ca_cert.clone())?; + if let Some(token) = &host_settings.auth_token { + client = client.with_auth_token(token.expose_secret()); + } + client + }); } }; diff --git a/influxdb3/src/commands/create/token.rs b/influxdb3/src/commands/create/token.rs index 404e0de1fe7..00452b4ff2f 100644 --- a/influxdb3/src/commands/create/token.rs +++ b/influxdb3/src/commands/create/token.rs @@ -75,12 +75,16 @@ pub enum TokenOutputFormat { #[derive(Parser, Clone, Debug)] pub struct InfluxDb3ServerConfig { - /// The host URL of the running InfluxDB 3 Core server + /// The host URL of the running InfluxDB 3 Core server. + /// + /// Note: When using --regenerate, the effective default changes to http://127.0.0.1:8182 + /// (admin token recovery endpoint) unless a custom host is specified. #[clap( name = "host", long = "host", default_value = "http://127.0.0.1:8181", - env = "INFLUXDB3_HOST_URL" + env = "INFLUXDB3_HOST_URL", + help = "The host URL of the running InfluxDB 3 Core server (default: http://127.0.0.1:8181, or :8182 with --regenerate)" )] pub host_url: Url, @@ -96,8 +100,15 @@ pub struct InfluxDb3ServerConfig { #[derive(Parser, Debug)] pub struct CreateAdminTokenConfig { - /// Operator token will be regenerated when this is set - #[clap(name = "regenerate", long = "regenerate")] + /// Operator token will be regenerated when this is set. + /// + /// When used without --host, connects to the admin token recovery endpoint (port 8182) + /// instead of the default server endpoint (port 8181). + #[clap( + name = "regenerate", + long = "regenerate", + help = "Regenerate the operator token (uses port 8182 by default instead of 8181)" + )] pub regenerate: bool, // for named admin and permission tokens this is mandatory but not for admin tokens @@ -156,6 +167,31 @@ impl CreateTokenConfig { } } + /// Get the effective host URL for the operation. + /// + /// When `--regenerate` is used and no custom host is provided, this will return + /// the admin token recovery endpoint (port 8182) instead of the default (port 8181). + /// + /// # Examples + /// - `influxdb3 create token --admin` → uses http://127.0.0.1:8181 + /// - `influxdb3 create token --admin --regenerate` → uses http://127.0.0.1:8182 + /// - `influxdb3 create token --admin --regenerate --host http://custom:9999` → uses http://custom:9999 + pub fn get_effective_host_url(&self, default_url: &Url) -> Url { + match &self.admin_config { + Some(admin_config) if admin_config.regenerate => { + // Check if the host URL is the default value (normalize by removing trailing slash) + if default_url.as_str().trim_end_matches('/') == "http://127.0.0.1:8181" { + // Use the admin token recovery endpoint + Url::parse("http://127.0.0.1:8182").expect("hardcoded URL should be valid") + } else { + // User provided a custom URL, use it as-is + default_url.clone() + } + } + _ => default_url.clone(), + } + } + pub fn get_output_format(&self) -> Option<&TokenOutputFormat> { match &self.admin_config { Some(admin_config) => admin_config.format.as_ref(), @@ -208,8 +244,15 @@ impl Args for CreateTokenConfig { impl CommandFactory for CreateTokenConfig { fn command() -> clap::Command { - let admin_sub_cmd = - ClapCommand::new("--admin").override_usage("influxdb3 create token --admin [OPTIONS]"); + let admin_sub_cmd = ClapCommand::new("--admin") + .override_usage("influxdb3 create token --admin [OPTIONS]") + .about("Create or regenerate an admin token") + .long_about( + "Create or regenerate an admin token.\n\n\ + When using --regenerate without specifying --host, the command will \ + connect to the admin token recovery endpoint (http://127.0.0.1:8182) \ + instead of the default server endpoint (http://127.0.0.1:8181).", + ); let all_args = CreateAdminTokenConfig::as_args(); let admin_sub_cmd = admin_sub_cmd.args(all_args); diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 6a4383565a8..1fb48402806 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -28,7 +28,7 @@ use influxdb3_server::{ CommonServerState, CreateServerArgs, Server, http::HttpApi, query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl}, - serve, + serve, serve_admin_token_regen_endpoint, }; use influxdb3_shutdown::{ShutdownManager, wait_for_signal}; use influxdb3_sys_events::SysEventStore; @@ -82,6 +82,9 @@ pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb3"; /// The default bind address for the HTTP API. pub const DEFAULT_HTTP_BIND_ADDR: &str = "0.0.0.0:8181"; +/// The default bind address for admin token regeneration HTTP API. +pub const DEFAULT_ADMIN_TOKEN_REGENERATION_BIND_ADDR: &str = "127.0.0.1:8182"; + pub const DEFAULT_TELEMETRY_ENDPOINT: &str = "https://telemetry.v3.influxdata.com"; #[derive(Debug, Error)] @@ -125,6 +128,9 @@ pub enum Error { #[error("lost HTTP/gRPC service")] LostHttpGrpc, + #[error("lost admin token regen service")] + LostAdminTokenRegen, + #[error("tls requires both a cert and a key file to be passed in to work")] NoCertOrKeyFile, } @@ -179,6 +185,15 @@ pub struct Config { )] pub http_bind_address: SocketAddr, + /// The address on which admin token regeration will be allowed + #[clap( + long = "admin-token-regen-bind", + env = "INFLUXDB3_ADMIN_TOKEN_REGEN_BIND_ADDR", + default_value = DEFAULT_ADMIN_TOKEN_REGENERATION_BIND_ADDR, + action, + )] + pub admin_token_regen_bind_address: SocketAddr, + /// Size of memory pool used during query exec, in megabytes. /// /// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`). @@ -450,6 +465,16 @@ pub struct Config { )] pub tcp_listener_file_path: Option, + /// Provide a file path to write the address that the admin recovery endpoint mounted server is listening on to + /// + /// This is mainly intended for testing purposes and is not considered stable. + #[clap( + long = "admin-token-regen-tcp-listener-file-path", + env = "INFLUXDB3_ADMIN_TOKEN_REGEN_TCP_LISTENER_FILE_PATH", + hide = true + )] + pub admin_token_regen_tcp_listener_file_path: Option, + #[clap( long = "wal-replay-concurrency-limit", env = "INFLUXDB3_WAL_REPLAY_CONCURRENCY_LIMIT" @@ -902,6 +927,10 @@ pub async fn command(config: Config) -> Result<()> { .await .map_err(Error::BindAddress)?; + let admin_token_regen_listener = TcpListener::bind(*config.admin_token_regen_bind_address) + .await + .map_err(Error::BindAddress)?; + let processing_engine = ProcessingEngineManagerImpl::new( setup_processing_engine_env_manager(&config.processing_engine_config), write_buffer.catalog(), @@ -945,6 +974,16 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&authorizer), )); + let admin_token_regen_server = Server::new(CreateServerArgs { + common_state: common_state.clone(), + http: Arc::clone(&http), + authorizer: Arc::clone(&authorizer), + listener: admin_token_regen_listener, + cert_file: cert_file.clone(), + key_file: key_file.clone(), + tls_minimum_version: config.tls_minimum_version.into(), + }); + let server = Server::new(CreateServerArgs { common_state, http, @@ -987,6 +1026,7 @@ pub async fn command(config: Config) -> Result<()> { ?paths_without_authz, "setting up server with authz disabled for paths" ); + let frontend = serve( server, frontend_shutdown.clone(), @@ -998,11 +1038,19 @@ pub async fn command(config: Config) -> Result<()> { .fuse(); let backend = shutdown_manager.join().fuse(); + let regen_frontend = serve_admin_token_regen_endpoint( + admin_token_regen_server, + frontend_shutdown.clone(), + config.admin_token_regen_tcp_listener_file_path, + ) + .fuse(); + // pin_mut constructs a Pin<&mut T> from a T by preventing moving the T // from the current stack frame and constructing a Pin<&mut T> to it pin_mut!(signal); pin_mut!(frontend); pin_mut!(backend); + pin_mut!(regen_frontend); let mut res = Ok(()); @@ -1039,6 +1087,17 @@ pub async fn command(config: Config) -> Result<()> { error!("HTTP/gRPC error"); res = res.and(Err(Error::Server(error))); } + }, + regen_result = regen_frontend => match regen_result { + Ok(_) if frontend_shutdown.is_cancelled() => info!("Admin token regeneration service shutdown"), + Ok(_) => { + error!("early admin token regeneration service exit"); + res = res.and(Err(Error::LostAdminTokenRegen)); + } + Err(error) => { + error!("admin token regeneration service error"); + res = res.and(Err(Error::Server(error))); + } } } shutdown_manager.shutdown() diff --git a/influxdb3/tests/cli/admin_token.rs b/influxdb3/tests/cli/admin_token.rs index 42d04313504..dbdc8de060c 100644 --- a/influxdb3/tests/cli/admin_token.rs +++ b/influxdb3/tests/cli/admin_token.rs @@ -123,6 +123,63 @@ async fn test_regenerate_admin_token() { assert_contains!(&res, "Database \"sample_db\" created successfully"); } +#[test_log::test(tokio::test)] +async fn test_regenerate_admin_token_without_auth_using_token_recovery_service() { + let mut server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .spawn() + .await; + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + // create the token manually + let result = server + .run(vec!["create", "token", "--admin"], args) + .unwrap(); + + // already has admin token, so it cannot be created again + assert_contains!(&result, "New token created successfully!"); + + let admin_token = parse_token(result); + + // regenerating token is not allowed without admin token going through the main http server + let result = server + .run_with_confirmation( + vec!["create", "token", "--admin"], + &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + + assert_contains!(&result, "Failed to create token"); + + // regenerate token using the admin token recovery server + let result = server + .run_regenerate_with_confirmation( + vec!["create", "token", "--admin"], + &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + + let old_token = admin_token.clone(); + let new_token = parse_token(result); + assert!(old_token != new_token); + + // old token cannot access + let res = server + .set_token(Some(admin_token)) + .create_database("sample_db") + .run() + .err() + .unwrap() + .to_string(); + assert_contains!(&res, "401 Unauthorized"); + + // new token should allow + server.set_token(Some(new_token)); + let res = server.create_database("sample_db").run().unwrap(); + assert_contains!(&res, "Database \"sample_db\" created successfully"); +} + #[test_log::test(tokio::test)] async fn test_delete_token() { let server = TestServer::configure() @@ -528,3 +585,144 @@ async fn test_check_named_admin_token_expiry_works() { .to_string(); assert_contains!(&res, "[401 Unauthorized]"); } + +#[test_log::test(tokio::test)] +async fn test_recovery_service_only_accepts_regenerate_endpoint() { + let server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .spawn() + .await; + + // First create an admin token + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + let result = server + .run(vec!["create", "token", "--admin"], args) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + + // Try to use recovery service for other operations - should fail + // Test creating a database through recovery port + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = server + .run( + vec!["create", "database", "--host", &recovery_addr, "test_db"], + args, + ) + .unwrap_err() + .to_string(); + // Should fail because recovery port doesn't support database operations + assert!(result.contains("error") || result.contains("failed")); + + // Test listing tokens through recovery port + let result = server + .run(vec!["show", "tokens", "--host", &recovery_addr], args) + .unwrap_err() + .to_string(); + assert!(result.contains("error") || result.contains("failed")); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_service_with_auth_disabled() { + // Start server without auth + let server = TestServer::configure().spawn().await; + + // Try to use recovery service when auth is disabled - should fail + let result = server + .run_regenerate_with_confirmation( + vec!["create", "token", "--admin"], + &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + + // Should get an error - recovery service runs but there's no admin token to regenerate + assert_contains!(&result, "missing admin token, cannot update"); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_service_does_not_affect_named_admin_tokens() { + let mut server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .spawn() + .await; + + let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; + + // Create operator token + let result = server + .run(vec!["create", "token", "--admin"], args) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let operator_token = parse_token(result); + + // Create a named admin token + let result = server + .run( + vec![ + "create", + "token", + "--admin", + "--name", + "test_admin", + "--token", + &operator_token, + ], + args, + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let named_admin_token = parse_token(result); + + // Regenerate operator token via recovery service + let result = server + .run_regenerate_with_confirmation( + vec!["create", "token", "--admin"], + &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let new_operator_token = parse_token(result); + + // Verify old operator token is invalid + server.set_token(Some(operator_token)); + let res = server + .create_database("test_db1") + .run() + .err() + .unwrap() + .to_string(); + assert_contains!(&res, "401 Unauthorized"); + + // Verify named admin token still works + server.set_token(Some(named_admin_token.clone())); + let res = server.create_database("test_db2").run().unwrap(); + assert_contains!(&res, "Database \"test_db2\" created successfully"); + + // Verify new operator token works + server.set_token(Some(new_operator_token)); + let res = server.create_database("test_db3").run().unwrap(); + assert_contains!(&res, "Database \"test_db3\" created successfully"); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_service_cannot_create_new_admin_token() { + let server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .spawn() + .await; + + // Try to create admin token through recovery service (without --regenerate flag) + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = server + .run( + vec!["create", "token", "--admin", "--host", &recovery_addr], + &["--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap_err() + .to_string(); + + // Should fail - recovery service only supports regeneration + assert!(result.contains("error") || result.contains("failed")); +} diff --git a/influxdb3/tests/cli/api.rs b/influxdb3/tests/cli/api.rs index 237821e1d65..1114eff6ccd 100644 --- a/influxdb3/tests/cli/api.rs +++ b/influxdb3/tests/cli/api.rs @@ -36,43 +36,66 @@ impl TestServer { command_args.push("--token"); command_args.push(token); } - let mut child_process = Command::cargo_bin("influxdb3")? - .args(&command_args) - .args(args) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - if let Some(input) = input { - let input = input.to_string(); - let mut stdin = child_process.stdin.take().expect("failed to open stdin"); - thread::spawn(move || { - stdin - .write_all(input.as_bytes()) - .expect("cannot write confirmation msg to stdin"); - }); - } - let output = child_process.wait_with_output()?; - - if !output.status.success() { - println!( - "failed to run influxdb3 {} {}", - command_args.join(" "), - args.join(" ") - ); - bail!("{}", String::from_utf8_lossy(&output.stderr)); - } + run_cmd_with_result(args, input, command_args) + } - Ok(String::from_utf8(output.stdout)?.trim().into()) + pub fn run_regenerate_with_confirmation( + &self, + commands: Vec<&str>, + args: &[&str], + ) -> Result { + let client_addr = self.admin_token_recovery_client_addr(); + let mut command_args = commands.clone(); + command_args.push("--host"); + command_args.push(client_addr.as_str()); + run_cmd_with_result(args, Some("yes"), command_args) } + pub fn run(&self, commands: Vec<&str>, args: &[&str]) -> Result { self.run_with_options(commands, args, None) } + pub fn run_with_confirmation(&self, commands: Vec<&str>, args: &[&str]) -> Result { self.run_with_options(commands, args, Some("yes")) } } + +fn run_cmd_with_result( + args: &[&str], + input: Option<&str>, + command_args: Vec<&str>, +) -> std::result::Result { + let mut child_process = Command::cargo_bin("influxdb3")? + .args(&command_args) + .args(args) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + if let Some(input) = input { + let input = input.to_string(); + let mut stdin = child_process.stdin.take().expect("failed to open stdin"); + thread::spawn(move || { + stdin + .write_all(input.as_bytes()) + .expect("cannot write confirmation msg to stdin"); + }); + } + let output = child_process.wait_with_output()?; + + if !output.status.success() { + println!( + "failed to run influxdb3 {} {}", + command_args.join(" "), + args.join(" ") + ); + bail!("{}", String::from_utf8_lossy(&output.stderr)); + } + + Ok(String::from_utf8(output.stdout)?.trim().into()) +} + impl CreateDatabaseQuery<'_> { pub fn run(self) -> Result { self.server.run( diff --git a/influxdb3/tests/server/mod.rs b/influxdb3/tests/server/mod.rs index 43b3be48b03..60a5d85d7d3 100644 --- a/influxdb3/tests/server/mod.rs +++ b/influxdb3/tests/server/mod.rs @@ -266,6 +266,7 @@ impl ConfigProvider for TestConfig { pub struct TestServer { auth_token: Option, bind_addr: String, + admin_token_recovery_bind_addr: String, server_process: Child, http_client: reqwest::Client, stdout: Option>>, @@ -318,11 +319,17 @@ impl TestServer { let tmp_dir = TempDir::new().unwrap(); let tmp_dir_path = tmp_dir.keep(); let tcp_addr_file = tmp_dir_path.join("tcp-listener"); + + let admin_token_recover_tmp_dir = TempDir::new().unwrap(); + let admin_token_recover_tmp_dir_path = admin_token_recover_tmp_dir.keep(); + let tcp_addr_file_2 = admin_token_recover_tmp_dir_path.join("tcp-listener"); + let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command"); let command = command .arg("serve") .arg("--disable-telemetry-upload") .args(["--http-bind", "0.0.0.0:0"]) + .args(["--admin-token-regen-bind", "0.0.0.0:0"]) .args(["--wal-flush-interval", "10ms"]) .args(["--wal-snapshot-size", "1"]) .args([ @@ -331,6 +338,12 @@ impl TestServer { .to_str() .expect("valid tcp listener file path"), ]) + .args([ + "--admin-token-regen-tcp-listener-file-path", + tcp_addr_file_2 + .to_str() + .expect("valid tcp listener file path"), + ]) .args([ "--tls-cert", if config.bad_tls() { @@ -418,28 +431,9 @@ impl TestServer { } } - let bind_addr = loop { - match tokio::fs::File::open(&tcp_addr_file).await { - Ok(mut file) => { - let mut buf = String::new(); - file.read_to_string(&mut buf) - .await - .expect("read from tcp listener file"); - if buf.is_empty() { - tokio::time::sleep(Duration::from_millis(10)).await; - continue; - } else { - break buf; - } - } - Err(error) if matches!(error.kind(), std::io::ErrorKind::NotFound) => { - tokio::time::sleep(Duration::from_millis(10)).await; - } - Err(error) => { - panic!("unexpected error while checking for tcp listener file: {error:?}") - } - } - }; + let bind_addr = find_bind_addr(tcp_addr_file).await; + + let admin_token_recovery_bind_addr = find_bind_addr(tcp_addr_file_2).await; let http_client = reqwest::ClientBuilder::new() .min_tls_version(Version::TLS_1_3) @@ -455,6 +449,7 @@ impl TestServer { let server = Self { auth_token: config.auth_token().map(|s| s.to_owned()), bind_addr, + admin_token_recovery_bind_addr, server_process, http_client, stdout: stdout_handle, @@ -490,14 +485,26 @@ impl TestServer { ) } + /// Get the URL of admin token recovery service for use with an HTTP client + pub fn admin_token_recovery_client_addr(&self) -> String { + format!( + "https://localhost:{}", + self.admin_token_recovery_bind_addr + .split(':') + .nth(1) + .unwrap() + ) + } + /// Get the token for the server pub fn token(&self) -> Option<&String> { self.auth_token.as_ref() } /// Set the token for the server - pub fn set_token(&mut self, token: Option) { + pub fn set_token(&mut self, token: Option) -> &mut Self { self.auth_token = token; + self } /// Get a [`FlightSqlClient`] for making requests to the running service over gRPC @@ -631,6 +638,31 @@ impl TestServer { } } +async fn find_bind_addr(tcp_addr_file_2: std::path::PathBuf) -> String { + loop { + match tokio::fs::File::open(&tcp_addr_file_2).await { + Ok(mut file) => { + let mut buf = String::new(); + file.read_to_string(&mut buf) + .await + .expect("read from tcp listener file"); + if buf.is_empty() { + tokio::time::sleep(Duration::from_millis(10)).await; + continue; + } else { + break buf; + } + } + Err(error) if matches!(error.kind(), std::io::ErrorKind::NotFound) => { + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(error) => { + panic!("unexpected error while checking for tcp listener file: {error:?}") + } + } + } +} + impl Drop for TestServer { fn drop(&mut self) { self.kill(); diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 49b08388c64..3e87f9724eb 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -1857,6 +1857,43 @@ async fn record_batch_stream_to_body( } } +pub(crate) async fn route_admin_token_recovery_request( + http_server: Arc, + req: Request, +) -> Result { + let method = req.method().clone(); + let uri = req.uri().clone(); + trace!(request = ?req,"Processing request"); + let content_length = req.headers().get("content-length").cloned(); + + let response = match (method.clone(), uri.path()) { + (Method::POST, all_paths::API_V3_CONFIGURE_ADMIN_TOKEN_REGENERATE) => { + http_server.regenerate_admin_token(req).await + } + _ => { + let body = bytes_to_response_body("not found"); + Ok(ResponseBuilder::new() + .status(StatusCode::NOT_FOUND) + .body(body) + .unwrap()) + } + }; + + match response { + Ok(mut response) => { + response + .headers_mut() + .insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*")); + debug!(?response, "Successfully processed request"); + Ok(response) + } + Err(error) => { + error!(%error, %method, path = uri.path(), ?content_length, "Error while handling request"); + Ok(error.into_response()) + } + } +} + pub(crate) async fn route_request( http_server: Arc, mut req: Request, diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index c41a4e88921..6b57f9cb484 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -23,6 +23,7 @@ use crate::grpc::make_flight_server; use crate::http::HttpApi; use crate::http::route_request; use authz::Authorizer; +use http::route_admin_token_recovery_request; use hyper::server::conn::AddrIncoming; use hyper::server::conn::AddrStream; use hyper::server::conn::Http; @@ -174,6 +175,101 @@ impl<'a> Server<'a> { } } +/// Creates HTTP trace layer +fn create_http_trace_layer(common_state: &CommonServerState) -> TraceLayer { + let http_metrics = + RequestMetrics::new(Arc::clone(&common_state.metrics), MetricFamily::HttpServer); + TraceLayer::new( + common_state.trace_header_parser.clone(), + Arc::new(http_metrics), + common_state.trace_collector().clone(), + TRACE_HTTP_SERVER_NAME, + trace_http::tower::ServiceProtocol::Http, + ) +} + +/// Creates gRPC trace layer +fn create_grpc_trace_layer(common_state: &CommonServerState) -> TraceLayer { + let grpc_metrics = + RequestMetrics::new(Arc::clone(&common_state.metrics), MetricFamily::GrpcServer); + TraceLayer::new( + common_state.trace_header_parser.clone(), + Arc::new(grpc_metrics), + common_state.trace_collector().clone(), + TRACE_GRPC_SERVER_NAME, + trace_http::tower::ServiceProtocol::Grpc, + ) +} + +pub async fn serve_admin_token_regen_endpoint( + server: Server<'_>, + shutdown: CancellationToken, + tcp_listener_file_path: Option, +) -> Result<()> { + let http_trace_layer = create_http_trace_layer(&server.common_state); + + if let (Some(key_file), Some(cert_file)) = (&server.key_file, &server.cert_file) { + let listener = server.listener; + let tls_min = server.tls_minimum_version; + let http = Arc::clone(&server.http); + + let (addr, certs, key) = setup_tls(listener, key_file, cert_file)?; + info!( + address = %addr.local_addr(), + "starting admin token recovery endpoint with TLS on", + ); + + let http_server = Arc::clone(&http); + let rest_service = hyper::service::make_service_fn(move |_| { + let http_server = Arc::clone(&http_server); + let service = service_fn(move |req: hyper::Request| { + route_admin_token_recovery_request(Arc::clone(&http_server), req) + }); + let service = http_trace_layer.layer(service); + futures::future::ready(Ok::<_, Infallible>(service)) + }); + + write_address_to_file(tcp_listener_file_path, &addr).await?; + + let acceptor = hyper_rustls::TlsAcceptor::builder() + .with_tls_config( + ServerConfig::builder_with_protocol_versions(tls_min) + .with_no_client_auth() + .with_single_cert(certs, key) + .unwrap(), + ) + .with_all_versions_alpn() + .with_incoming(addr); + + hyper::server::Server::builder(acceptor) + .serve(rest_service) + .with_graceful_shutdown(shutdown.cancelled()) + .await?; + } else { + let rest_service = hyper::service::make_service_fn(|_| { + let http_server = Arc::clone(&server.http); + let service = service_fn(move |req: hyper::Request| { + route_admin_token_recovery_request(Arc::clone(&http_server), req) + }); + let service = http_trace_layer.layer(service); + futures::future::ready(Ok::<_, Infallible>(service)) + }); + + let addr = AddrIncoming::from_listener(server.listener)?; + info!( + address = %addr.local_addr(), + "starting admin token recovery endpoint on", + ); + hyper::server::Builder::new(addr, Http::new()) + .tcp_nodelay(true) + .serve(rest_service) + .with_graceful_shutdown(shutdown.cancelled()) + .await?; + } + + Ok(()) +} + pub async fn serve( server: Server<'_>, shutdown: CancellationToken, @@ -182,38 +278,24 @@ pub async fn serve( paths_without_authz: &'static Vec<&'static str>, tcp_listener_file_path: Option, ) -> Result<()> { - let grpc_metrics = RequestMetrics::new( - Arc::clone(&server.common_state.metrics), - MetricFamily::GrpcServer, - ); - let grpc_trace_layer = TraceLayer::new( - server.common_state.trace_header_parser.clone(), - Arc::new(grpc_metrics), - server.common_state.trace_collector().clone(), - TRACE_GRPC_SERVER_NAME, - trace_http::tower::ServiceProtocol::Grpc, - ); + let grpc_trace_layer = create_grpc_trace_layer(&server.common_state); let grpc_service = grpc_trace_layer.layer(make_flight_server( Arc::clone(&server.http.query_executor), Some(server.authorizer()), )); - let http_metrics = RequestMetrics::new( - Arc::clone(&server.common_state.metrics), - MetricFamily::HttpServer, - ); - let http_trace_layer = TraceLayer::new( - server.common_state.trace_header_parser.clone(), - Arc::new(http_metrics), - server.common_state.trace_collector().clone(), - TRACE_HTTP_SERVER_NAME, - trace_http::tower::ServiceProtocol::Http, - ); + let http_trace_layer = create_http_trace_layer(&server.common_state); - if let (Some(key_file), Some(cert_file)) = (&server.key_file, &server.cert_file) { + let key_file = server.key_file.clone(); + let cert_file = server.cert_file.clone(); + + if let (Some(key_file), Some(cert_file)) = (key_file.as_ref(), cert_file.as_ref()) { + let listener = server.listener; + let tls_min = server.tls_minimum_version; + let http = Arc::clone(&server.http); let rest_service = hyper::service::make_service_fn(|conn: &TlsStream| { let remote_addr = conn.io().map(|conn| conn.remote_addr()); - let http_server = Arc::clone(&server.http); + let http_server = Arc::clone(&http); let service = service_fn(move |mut req: hyper::Request| { req.extensions_mut().insert(remote_addr); route_request( @@ -228,22 +310,7 @@ pub async fn serve( }); let hybrid_make_service = hybrid(rest_service, grpc_service); - let mut addr = AddrIncoming::from_listener(server.listener)?; - addr.set_nodelay(true); - let certs = { - let cert_file = File::open(cert_file).unwrap(); - let mut buf_reader = BufReader::new(cert_file); - rustls_pemfile::certs(&mut buf_reader) - .collect::, _>>() - .unwrap() - }; - let key = { - let key_file = File::open(key_file).unwrap(); - let mut buf_reader = BufReader::new(key_file); - rustls_pemfile::private_key(&mut buf_reader) - .unwrap() - .unwrap() - }; + let (addr, certs, key) = setup_tls(listener, key_file, cert_file)?; let timer_end = Instant::now(); let startup_time = timer_end.duration_since(startup_timer); @@ -253,21 +320,18 @@ pub async fn serve( startup_time.as_millis() ); - if let Some(path) = tcp_listener_file_path { - let mut f = tokio::fs::File::create_new(path).await?; - let _ = f.write(addr.local_addr().to_string().as_bytes()).await?; - f.flush().await?; - } + write_address_to_file(tcp_listener_file_path, &addr).await?; let acceptor = hyper_rustls::TlsAcceptor::builder() .with_tls_config( - ServerConfig::builder_with_protocol_versions(server.tls_minimum_version) + ServerConfig::builder_with_protocol_versions(tls_min) .with_no_client_auth() .with_single_cert(certs, key) .unwrap(), ) .with_all_versions_alpn() .with_incoming(addr); + hyper::server::Server::builder(acceptor) .serve(hybrid_make_service) .with_graceful_shutdown(shutdown.cancelled()) @@ -316,6 +380,52 @@ pub async fn serve( Ok(()) } +// This function is only called when running tests to get hold of the server port details as the +// tests start on arbitrary port by passing in 0 as port. This is also called when setting up TLS +// as the tests seem to use TLS by default. +async fn write_address_to_file( + tcp_listener_file_path: Option, + addr: &AddrIncoming, +) -> Result<(), Error> { + if let Some(path) = tcp_listener_file_path { + let mut f = tokio::fs::File::create_new(path).await?; + let _ = f.write(addr.local_addr().to_string().as_bytes()).await?; + f.flush().await?; + }; + Ok(()) +} + +fn setup_tls( + tcp_listener: TcpListener, + key_file: &PathBuf, + cert_file: &PathBuf, +) -> Result< + ( + AddrIncoming, + Vec>, + rustls::pki_types::PrivateKeyDer<'static>, + ), + Error, +> { + let mut addr = AddrIncoming::from_listener(tcp_listener)?; + addr.set_nodelay(true); + let certs = { + let cert_file = File::open(cert_file).unwrap(); + let mut buf_reader = BufReader::new(cert_file); + rustls_pemfile::certs(&mut buf_reader) + .collect::, _>>() + .unwrap() + }; + let key = { + let key_file = File::open(key_file).unwrap(); + let mut buf_reader = BufReader::new(key_file); + rustls_pemfile::private_key(&mut buf_reader) + .unwrap() + .unwrap() + }; + Ok((addr, certs, key)) +} + #[cfg(test)] mod tests { use crate::query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl}; From ea50bae572273afbe879249343bc8ffbd6ff2d46 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Thu, 10 Jul 2025 13:52:38 +0100 Subject: [PATCH 2/4] refactor: tidy ups + extra logging --- influxdb3/src/commands/create.rs | 3 +- influxdb3/src/commands/create/token.rs | 44 +++++++++++++-------- influxdb3/src/commands/serve.rs | 55 +++++++++++++------------- influxdb3/tests/server/mod.rs | 4 +- influxdb3_server/src/http.rs | 1 + influxdb3_server/src/lib.rs | 14 ++++--- 6 files changed, 69 insertions(+), 52 deletions(-) diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 0f7dcf951cb..1ecfcf62a81 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -78,8 +78,7 @@ impl Config { }) => (host_url, auth_token, ca_cert), SubCommand::Token(create_token_config) => { let host_settings = create_token_config.get_connection_settings()?; - let effective_host_url = - create_token_config.get_effective_host_url(&host_settings.host_url); + let effective_host_url = create_token_config.get_effective_host_url(); // We need to return references, so we'll handle this differently return Ok({ let mut client = diff --git a/influxdb3/src/commands/create/token.rs b/influxdb3/src/commands/create/token.rs index 00452b4ff2f..0415f706107 100644 --- a/influxdb3/src/commands/create/token.rs +++ b/influxdb3/src/commands/create/token.rs @@ -77,16 +77,16 @@ pub enum TokenOutputFormat { pub struct InfluxDb3ServerConfig { /// The host URL of the running InfluxDB 3 Core server. /// - /// Note: When using --regenerate, the effective default changes to http://127.0.0.1:8182 - /// (admin token recovery endpoint) unless a custom host is specified. + /// If not specified: + /// - Default is http://127.0.0.1:8181 + /// - With --regenerate, default is http://127.0.0.1:8182 (admin token recovery endpoint) #[clap( name = "host", long = "host", - default_value = "http://127.0.0.1:8181", env = "INFLUXDB3_HOST_URL", - help = "The host URL of the running InfluxDB 3 Core server (default: http://127.0.0.1:8181, or :8182 with --regenerate)" + help = "The host URL of the running InfluxDB 3 Core server" )] - pub host_url: Url, + pub host_url: Option, /// The token for authentication with the InfluxDB 3 Core server to create permissions. /// This will be the admin token to create tokens with permissions @@ -169,26 +169,38 @@ impl CreateTokenConfig { /// Get the effective host URL for the operation. /// - /// When `--regenerate` is used and no custom host is provided, this will return + /// When `--regenerate` is used and no host is specified, this will return /// the admin token recovery endpoint (port 8182) instead of the default (port 8181). /// /// # Examples /// - `influxdb3 create token --admin` → uses http://127.0.0.1:8181 /// - `influxdb3 create token --admin --regenerate` → uses http://127.0.0.1:8182 + /// - `influxdb3 create token --admin --regenerate --host http://127.0.0.1:8181` → uses http://127.0.0.1:8181 /// - `influxdb3 create token --admin --regenerate --host http://custom:9999` → uses http://custom:9999 - pub fn get_effective_host_url(&self, default_url: &Url) -> Url { + pub fn get_effective_host_url(&self) -> Url { match &self.admin_config { - Some(admin_config) if admin_config.regenerate => { - // Check if the host URL is the default value (normalize by removing trailing slash) - if default_url.as_str().trim_end_matches('/') == "http://127.0.0.1:8181" { - // Use the admin token recovery endpoint - Url::parse("http://127.0.0.1:8182").expect("hardcoded URL should be valid") - } else { - // User provided a custom URL, use it as-is - default_url.clone() + Some(admin_config) => { + match &admin_config.host.host_url { + Some(url) => { + // User explicitly provided a host URL, use it as-is + url.clone() + } + None => { + // No host URL provided, use default based on regenerate flag + if admin_config.regenerate { + Url::parse("http://127.0.0.1:8182") + .expect("hardcoded URL should be valid") + } else { + Url::parse("http://127.0.0.1:8181") + .expect("hardcoded URL should be valid") + } + } } } - _ => default_url.clone(), + None => { + // This shouldn't happen in practice, but provide a sensible default + Url::parse("http://127.0.0.1:8181").expect("hardcoded URL should be valid") + } } } diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 1fb48402806..f5fa3be7c6c 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -28,7 +28,7 @@ use influxdb3_server::{ CommonServerState, CreateServerArgs, Server, http::HttpApi, query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl}, - serve, serve_admin_token_regen_endpoint, + serve, serve_admin_token_recovery_endpoint, }; use influxdb3_shutdown::{ShutdownManager, wait_for_signal}; use influxdb3_sys_events::SysEventStore; @@ -82,8 +82,8 @@ pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb3"; /// The default bind address for the HTTP API. pub const DEFAULT_HTTP_BIND_ADDR: &str = "0.0.0.0:8181"; -/// The default bind address for admin token regeneration HTTP API. -pub const DEFAULT_ADMIN_TOKEN_REGENERATION_BIND_ADDR: &str = "127.0.0.1:8182"; +/// The default bind address for admin token recovery HTTP API. +pub const DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR: &str = "127.0.0.1:8182"; pub const DEFAULT_TELEMETRY_ENDPOINT: &str = "https://telemetry.v3.influxdata.com"; @@ -128,8 +128,8 @@ pub enum Error { #[error("lost HTTP/gRPC service")] LostHttpGrpc, - #[error("lost admin token regen service")] - LostAdminTokenRegen, + #[error("lost admin token recovery service")] + LostAdminTokenRecovery, #[error("tls requires both a cert and a key file to be passed in to work")] NoCertOrKeyFile, @@ -185,14 +185,14 @@ pub struct Config { )] pub http_bind_address: SocketAddr, - /// The address on which admin token regeration will be allowed + /// The HTTP bind address for the admin token recovery endpoint #[clap( - long = "admin-token-regen-bind", - env = "INFLUXDB3_ADMIN_TOKEN_REGEN_BIND_ADDR", - default_value = DEFAULT_ADMIN_TOKEN_REGENERATION_BIND_ADDR, + long = "admin-token-recovery-http-bind", + env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR", + default_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR, action, )] - pub admin_token_regen_bind_address: SocketAddr, + pub admin_token_recovery_bind_address: SocketAddr, /// Size of memory pool used during query exec, in megabytes. /// @@ -469,11 +469,11 @@ pub struct Config { /// /// This is mainly intended for testing purposes and is not considered stable. #[clap( - long = "admin-token-regen-tcp-listener-file-path", - env = "INFLUXDB3_ADMIN_TOKEN_REGEN_TCP_LISTENER_FILE_PATH", + long = "admin-token-recovery-tcp-listener-file-path", + env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_TCP_LISTENER_FILE_PATH", hide = true )] - pub admin_token_regen_tcp_listener_file_path: Option, + pub admin_token_recovery_tcp_listener_file_path: Option, #[clap( long = "wal-replay-concurrency-limit", @@ -927,9 +927,10 @@ pub async fn command(config: Config) -> Result<()> { .await .map_err(Error::BindAddress)?; - let admin_token_regen_listener = TcpListener::bind(*config.admin_token_regen_bind_address) - .await - .map_err(Error::BindAddress)?; + let admin_token_recovery_listener = + TcpListener::bind(*config.admin_token_recovery_bind_address) + .await + .map_err(Error::BindAddress)?; let processing_engine = ProcessingEngineManagerImpl::new( setup_processing_engine_env_manager(&config.processing_engine_config), @@ -974,11 +975,11 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&authorizer), )); - let admin_token_regen_server = Server::new(CreateServerArgs { + let admin_token_recovery_server = Server::new(CreateServerArgs { common_state: common_state.clone(), http: Arc::clone(&http), authorizer: Arc::clone(&authorizer), - listener: admin_token_regen_listener, + listener: admin_token_recovery_listener, cert_file: cert_file.clone(), key_file: key_file.clone(), tls_minimum_version: config.tls_minimum_version.into(), @@ -1038,10 +1039,10 @@ pub async fn command(config: Config) -> Result<()> { .fuse(); let backend = shutdown_manager.join().fuse(); - let regen_frontend = serve_admin_token_regen_endpoint( - admin_token_regen_server, + let recovery_frontend = serve_admin_token_recovery_endpoint( + admin_token_recovery_server, frontend_shutdown.clone(), - config.admin_token_regen_tcp_listener_file_path, + config.admin_token_recovery_tcp_listener_file_path, ) .fuse(); @@ -1050,7 +1051,7 @@ pub async fn command(config: Config) -> Result<()> { pin_mut!(signal); pin_mut!(frontend); pin_mut!(backend); - pin_mut!(regen_frontend); + pin_mut!(recovery_frontend); let mut res = Ok(()); @@ -1088,14 +1089,14 @@ pub async fn command(config: Config) -> Result<()> { res = res.and(Err(Error::Server(error))); } }, - regen_result = regen_frontend => match regen_result { - Ok(_) if frontend_shutdown.is_cancelled() => info!("Admin token regeneration service shutdown"), + recovery_result = recovery_frontend => match recovery_result { + Ok(_) if frontend_shutdown.is_cancelled() => info!("Admin token recovery service shutdown"), Ok(_) => { - error!("early admin token regeneration service exit"); - res = res.and(Err(Error::LostAdminTokenRegen)); + error!("early admin token recovery service exit"); + res = res.and(Err(Error::LostAdminTokenRecovery)); } Err(error) => { - error!("admin token regeneration service error"); + error!("admin token recovery service error"); res = res.and(Err(Error::Server(error))); } } diff --git a/influxdb3/tests/server/mod.rs b/influxdb3/tests/server/mod.rs index 60a5d85d7d3..4f6e7acddb2 100644 --- a/influxdb3/tests/server/mod.rs +++ b/influxdb3/tests/server/mod.rs @@ -329,7 +329,7 @@ impl TestServer { .arg("serve") .arg("--disable-telemetry-upload") .args(["--http-bind", "0.0.0.0:0"]) - .args(["--admin-token-regen-bind", "0.0.0.0:0"]) + .args(["--admin-token-recovery-http-bind", "0.0.0.0:0"]) .args(["--wal-flush-interval", "10ms"]) .args(["--wal-snapshot-size", "1"]) .args([ @@ -339,7 +339,7 @@ impl TestServer { .expect("valid tcp listener file path"), ]) .args([ - "--admin-token-regen-tcp-listener-file-path", + "--admin-token-recovery-tcp-listener-file-path", tcp_addr_file_2 .to_str() .expect("valid tcp listener file path"), diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 3e87f9724eb..abb4dcaa828 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -1868,6 +1868,7 @@ pub(crate) async fn route_admin_token_recovery_request( let response = match (method.clone(), uri.path()) { (Method::POST, all_paths::API_V3_CONFIGURE_ADMIN_TOKEN_REGENERATE) => { + info!("Regenerating admin token without password through token recovery API request"); http_server.regenerate_admin_token(req).await } _ => { diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 6b57f9cb484..2ebe48065ed 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -55,6 +55,7 @@ use trace_http::metrics::RequestMetrics; use trace_http::tower::TraceLayer; const TRACE_HTTP_SERVER_NAME: &str = "influxdb3_http"; +const ADMIN_TOKEN_RECOVERY_TRACE_HTTP_SERVER_NAME: &str = "influxdb3_token_recovery_http"; const TRACE_GRPC_SERVER_NAME: &str = "influxdb3_grpc"; #[derive(Debug, Error)] @@ -176,14 +177,14 @@ impl<'a> Server<'a> { } /// Creates HTTP trace layer -fn create_http_trace_layer(common_state: &CommonServerState) -> TraceLayer { +fn create_http_trace_layer(common_state: &CommonServerState, server_name: &str) -> TraceLayer { let http_metrics = RequestMetrics::new(Arc::clone(&common_state.metrics), MetricFamily::HttpServer); TraceLayer::new( common_state.trace_header_parser.clone(), Arc::new(http_metrics), common_state.trace_collector().clone(), - TRACE_HTTP_SERVER_NAME, + server_name, trace_http::tower::ServiceProtocol::Http, ) } @@ -201,12 +202,15 @@ fn create_grpc_trace_layer(common_state: &CommonServerState) -> TraceLayer { ) } -pub async fn serve_admin_token_regen_endpoint( +pub async fn serve_admin_token_recovery_endpoint( server: Server<'_>, shutdown: CancellationToken, tcp_listener_file_path: Option, ) -> Result<()> { - let http_trace_layer = create_http_trace_layer(&server.common_state); + let http_trace_layer = create_http_trace_layer( + &server.common_state, + ADMIN_TOKEN_RECOVERY_TRACE_HTTP_SERVER_NAME, + ); if let (Some(key_file), Some(cert_file)) = (&server.key_file, &server.cert_file) { let listener = server.listener; @@ -284,7 +288,7 @@ pub async fn serve( Some(server.authorizer()), )); - let http_trace_layer = create_http_trace_layer(&server.common_state); + let http_trace_layer = create_http_trace_layer(&server.common_state, TRACE_HTTP_SERVER_NAME); let key_file = server.key_file.clone(); let cert_file = server.cert_file.clone(); From 731ce169afe963d42d38372c780b1ce5f6a46026 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Wed, 16 Jul 2025 13:55:39 +0100 Subject: [PATCH 3/4] refactor: address PR feedback - recovery server now only starts when `--admin-token-recovery-http-bind` is passed in - as soon as regeneration is done, the recovery server shuts itself down - the select! macro logic has been changed such that shutting down recovery server does not shutdown the main server --- influxdb3/src/commands/serve.rs | 125 ++++++++++++++++-------- influxdb3/src/help/serve_all.txt | 5 + influxdb3/tests/cli/admin_token.rs | 151 ++++++++++++++++++++++++++++- influxdb3/tests/server/mod.rs | 65 +++++++++---- influxdb3_server/src/http.rs | 56 ++++++++++- influxdb3_server/src/lib.rs | 49 +++++++--- 6 files changed, 375 insertions(+), 76 deletions(-) diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index f5fa3be7c6c..f820710dc56 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -185,14 +185,18 @@ pub struct Config { )] pub http_bind_address: SocketAddr, - /// The HTTP bind address for the admin token recovery endpoint + /// Enable admin token recovery endpoint on the specified address. + /// Use flag alone for default address (127.0.0.1:8182) or provide a custom address. + /// WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution! #[clap( long = "admin-token-recovery-http-bind", env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR", - default_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR, + num_args = 0..=1, + default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR, + help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address", action, )] - pub admin_token_recovery_bind_address: SocketAddr, + pub admin_token_recovery_bind_address: Option, /// Size of memory pool used during query exec, in megabytes. /// @@ -927,10 +931,14 @@ pub async fn command(config: Config) -> Result<()> { .await .map_err(Error::BindAddress)?; - let admin_token_recovery_listener = - TcpListener::bind(*config.admin_token_recovery_bind_address) - .await - .map_err(Error::BindAddress)?; + // Only create recovery listener if explicitly enabled + let admin_token_recovery_listener = if let Some(addr) = config.admin_token_recovery_bind_address + { + info!(%addr, "Admin token recovery endpoint enabled - WARNING: This allows unauthenticated admin token regeneration!"); + Some(TcpListener::bind(*addr).await.map_err(Error::BindAddress)?) + } else { + None + }; let processing_engine = ProcessingEngineManagerImpl::new( setup_processing_engine_env_manager(&config.processing_engine_config), @@ -975,14 +983,17 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&authorizer), )); - let admin_token_recovery_server = Server::new(CreateServerArgs { - common_state: common_state.clone(), - http: Arc::clone(&http), - authorizer: Arc::clone(&authorizer), - listener: admin_token_recovery_listener, - cert_file: cert_file.clone(), - key_file: key_file.clone(), - tls_minimum_version: config.tls_minimum_version.into(), + // Only create recovery server if listener was created + let admin_token_recovery_server = admin_token_recovery_listener.map(|listener| { + Server::new(CreateServerArgs { + common_state: common_state.clone(), + http: Arc::clone(&http), + authorizer: Arc::clone(&authorizer), + listener, + cert_file: cert_file.clone(), + key_file: key_file.clone(), + tls_minimum_version: config.tls_minimum_version.into(), + }) }); let server = Server::new(CreateServerArgs { @@ -1039,12 +1050,23 @@ pub async fn command(config: Config) -> Result<()> { .fuse(); let backend = shutdown_manager.join().fuse(); - let recovery_frontend = serve_admin_token_recovery_endpoint( - admin_token_recovery_server, - frontend_shutdown.clone(), - config.admin_token_recovery_tcp_listener_file_path, - ) - .fuse(); + // Only start recovery endpoint if server was created + let recovery_endpoint_enabled = admin_token_recovery_server.is_some(); + let recovery_frontend = if let Some(recovery_server) = admin_token_recovery_server { + futures::future::Either::Left( + serve_admin_token_recovery_endpoint( + recovery_server, + frontend_shutdown.clone(), + config.admin_token_recovery_tcp_listener_file_path, + ) + .fuse(), + ) + } else { + // Provide a future that never completes if recovery endpoint is disabled + futures::future::Either::Right( + futures::future::pending::>().fuse(), + ) + }; // pin_mut constructs a Pin<&mut T> from a T by preventing moving the T // from the current stack frame and constructing a Pin<&mut T> to it @@ -1054,6 +1076,7 @@ pub async fn command(config: Config) -> Result<()> { pin_mut!(recovery_frontend); let mut res = Ok(()); + let mut recovery_endpoint_active = recovery_endpoint_enabled; // Graceful shutdown can be triggered by sending SIGINT or SIGTERM to the // process, or by a background task exiting - most likely with an error @@ -1078,27 +1101,49 @@ pub async fn command(config: Config) -> Result<()> { res = res.and(Err(Error::LostBackend)); } // HTTP/gRPC frontend has stopped - result = frontend => match result { - Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP/gRPC service shutdown"), - Ok(_) => { - error!("early HTTP/gRPC service exit"); - res = res.and(Err(Error::LostHttpGrpc)); - }, - Err(error) => { - error!("HTTP/gRPC error"); - res = res.and(Err(Error::Server(error))); - } - }, - recovery_result = recovery_frontend => match recovery_result { - Ok(_) if frontend_shutdown.is_cancelled() => info!("Admin token recovery service shutdown"), - Ok(_) => { - error!("early admin token recovery service exit"); - res = res.and(Err(Error::LostAdminTokenRecovery)); + result = frontend => { + match result { + Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP/gRPC service shutdown"), + Ok(_) => { + error!("early HTTP/gRPC service exit"); + res = res.and(Err(Error::LostHttpGrpc)); + }, + Err(error) => { + error!("HTTP/gRPC error"); + res = res.and(Err(Error::Server(error))); + } } - Err(error) => { - error!("admin token recovery service error"); - res = res.and(Err(Error::Server(error))); + } + recovery_result = recovery_frontend => { + // Only process recovery endpoint results if it was actually enabled and active + if recovery_endpoint_enabled && recovery_endpoint_active { + match recovery_result { + Ok(_) if frontend_shutdown.is_cancelled() => { + info!("Admin token recovery service shutdown"); + // Only break if the main shutdown was also requested + if frontend.is_terminated() { + break; + } + } + Ok(_) => { + // Recovery endpoint can shut down normally after token regeneration + // This is expected behavior and should not cause an error + info!("Admin token recovery service exited normally after token regeneration"); + recovery_endpoint_active = false; + // Since recovery_frontend is a FusedFuture, it won't be polled again + // after completion, so we don't need to do anything else + // Continue the loop - do NOT break or call shutdown + continue; // Skip shutdown_manager.shutdown() for this iteration + } + Err(error) => { + error!(%error, "admin token recovery service error"); + res = res.and(Err(Error::Server(error))); + // Continue running the main server even if recovery endpoint had an error + } + } } + // If recovery endpoint was disabled, this branch will never be taken again + // because pending() futures never complete } } shutdown_manager.shutdown() diff --git a/influxdb3/src/help/serve_all.txt b/influxdb3/src/help/serve_all.txt index 3da47f05abb..630934da58e 100644 --- a/influxdb3/src/help/serve_all.txt +++ b/influxdb3/src/help/serve_all.txt @@ -36,6 +36,11 @@ Examples: list of resources. Valid values are health, ping, and metrics. To disable auth for multiple resources pass in a list, eg. `--disable-authz health,ping` + --admin-token-recovery-http-bind + Address for HTTP API for admin token recovery requests [default: 127.0.0.1:8182] + WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution! + [env: INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR] + {} --data-dir Location to store files locally [env: INFLUXDB3_DB_DIR=] diff --git a/influxdb3/tests/cli/admin_token.rs b/influxdb3/tests/cli/admin_token.rs index dbdc8de060c..771d7cfe5d0 100644 --- a/influxdb3/tests/cli/admin_token.rs +++ b/influxdb3/tests/cli/admin_token.rs @@ -128,6 +128,7 @@ async fn test_regenerate_admin_token_without_auth_using_token_recovery_service() let mut server = TestServer::configure() .with_auth() .with_no_admin_token() + .with_recovery_endpoint() .spawn() .await; let args = &["--tls-ca", "../testing-certs/rootCA.pem"]; @@ -591,6 +592,7 @@ async fn test_recovery_service_only_accepts_regenerate_endpoint() { let server = TestServer::configure() .with_auth() .with_no_admin_token() + .with_recovery_endpoint() .spawn() .await; @@ -625,7 +627,10 @@ async fn test_recovery_service_only_accepts_regenerate_endpoint() { #[test_log::test(tokio::test)] async fn test_recovery_service_with_auth_disabled() { // Start server without auth - let server = TestServer::configure().spawn().await; + let server = TestServer::configure() + .with_recovery_endpoint() + .spawn() + .await; // Try to use recovery service when auth is disabled - should fail let result = server @@ -644,6 +649,7 @@ async fn test_recovery_service_does_not_affect_named_admin_tokens() { let mut server = TestServer::configure() .with_auth() .with_no_admin_token() + .with_recovery_endpoint() .spawn() .await; @@ -710,6 +716,7 @@ async fn test_recovery_service_cannot_create_new_admin_token() { let server = TestServer::configure() .with_auth() .with_no_admin_token() + .with_recovery_endpoint() .spawn() .await; @@ -726,3 +733,145 @@ async fn test_recovery_service_cannot_create_new_admin_token() { // Should fail - recovery service only supports regeneration assert!(result.contains("error") || result.contains("failed")); } + +#[test_log::test(tokio::test)] +async fn test_recovery_endpoint_disabled_by_default() { + // Start server without recovery endpoint enabled + let _server = TestServer::configure().spawn().await; + + // Try to connect to the recovery endpoint on default port + let client = reqwest::Client::new(); + let recovery_url = "http://127.0.0.1:8182/api/v3/configure/admin_token/regenerate"; + + // This should fail since the recovery endpoint is not enabled + let result = client.post(recovery_url).send().await; + + // Expect connection refused or similar error + assert!( + result.is_err(), + "Recovery endpoint should not be accessible when not explicitly enabled" + ); +} + +#[test_log::test(tokio::test)] +async fn test_recovery_endpoint_auto_shutdown_after_regeneration() { + // This test verifies that the recovery endpoint works and the main server continues running + let mut server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .with_recovery_endpoint() + .spawn() + .await; + + // Create the initial admin token + let result = server + .run( + vec!["create", "token", "--admin"], + &["--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let initial_token = parse_token(result); + + // Use the recovery endpoint to regenerate the token + let result = server + .run_regenerate_with_confirmation( + vec!["create", "token", "--admin"], + &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let new_token = parse_token(result); + + // Use the recovery endpoint to regenerate the token again, recovery server should have been + // shutdown + let result = server + .run_regenerate_with_confirmation( + vec!["create", "token", "--admin"], + &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(result, "ConnectError"); + + // Verify tokens are different + assert_ne!( + initial_token, new_token, + "Token should have been regenerated" + ); + + // Update the server's token to the new one + server.set_token(Some(new_token.clone())); + + // Verify the main server is still running and new token works + let result = server.create_database("test_db").run(); + assert!( + result.is_ok(), + "Main server should still be running with new token" + ); + + // Verify old token no longer works + server.set_token(Some(initial_token)); + let result = server.create_database("test_db2").run(); + assert!(result.is_err(), "Old token should no longer work"); +} + +#[test_log::test(tokio::test)] +async fn test_main_server_continues_after_recovery_endpoint_shutdown() { + // This test specifically verifies that the main server continues running + // after the recovery endpoint auto-shuts down + let mut server = TestServer::configure() + .with_auth() + .with_no_admin_token() + .with_recovery_endpoint() + .spawn() + .await; + + // Create the initial admin token + let result = server + .run( + vec!["create", "token", "--admin"], + &["--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let initial_token = parse_token(result); + + // Set the token for future operations + server.set_token(Some(initial_token.clone())); + + // Create a database before using recovery endpoint + let result = server.create_database("before_recovery_db").run(); + assert!( + result.is_ok(), + "Should be able to create database before recovery" + ); + + // Use the recovery endpoint to regenerate the token + let result = server + .run_regenerate_with_confirmation( + vec!["create", "token", "--admin"], + &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], + ) + .unwrap(); + assert_contains!(&result, "New token created successfully!"); + let new_token = parse_token(result); + + // Verify tokens are different + assert_ne!( + initial_token, new_token, + "Token should have been regenerated" + ); + + // Update to new token + server.set_token(Some(new_token.clone())); + + // Wait a bit to ensure recovery endpoint has shut down + tokio::time::sleep(Duration::from_millis(500)).await; + + // Simple check - verify server is still responding by creating one more database + let result = server.create_database("after_recovery_db").run(); + assert!( + result.is_ok(), + "Main server should still be running after recovery endpoint shutdown" + ); +} diff --git a/influxdb3/tests/server/mod.rs b/influxdb3/tests/server/mod.rs index 4f6e7acddb2..4bdc8e897a4 100644 --- a/influxdb3/tests/server/mod.rs +++ b/influxdb3/tests/server/mod.rs @@ -53,6 +53,9 @@ pub trait ConfigProvider: Send + Sync + 'static { /// Get if logs should be captured fn capture_logs(&self) -> bool; + /// Get if recovery endpoint should be enabled + fn recovery_endpoint_enabled(&self) -> bool; + /// Spawn a new [`TestServer`] with this configuration /// /// This will run the `influxdb3 serve` command and bind its HTTP address to a random port @@ -82,6 +85,7 @@ pub struct TestConfig { disable_authz: Vec, gen1_duration: Option, capture_logs: bool, + enable_recovery_endpoint: bool, } impl TestConfig { @@ -107,6 +111,12 @@ impl TestConfig { self } + /// Enable the admin token recovery endpoint + pub fn with_recovery_endpoint(mut self) -> Self { + self.enable_recovery_endpoint = true; + self + } + /// Set a host identifier prefix on the spawned [`TestServer`] pub fn with_node_id>(mut self, node_id: S) -> Self { self.node_id = Some(node_id.into()); @@ -247,6 +257,10 @@ impl ConfigProvider for TestConfig { fn capture_logs(&self) -> bool { self.capture_logs } + + fn recovery_endpoint_enabled(&self) -> bool { + self.enable_recovery_endpoint + } } /// A running instance of the `influxdb3 serve` process @@ -266,7 +280,7 @@ impl ConfigProvider for TestConfig { pub struct TestServer { auth_token: Option, bind_addr: String, - admin_token_recovery_bind_addr: String, + admin_token_recovery_bind_addr: Option, server_process: Child, http_client: reqwest::Client, stdout: Option>>, @@ -325,11 +339,10 @@ impl TestServer { let tcp_addr_file_2 = admin_token_recover_tmp_dir_path.join("tcp-listener"); let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command"); - let command = command + let mut command = command .arg("serve") .arg("--disable-telemetry-upload") .args(["--http-bind", "0.0.0.0:0"]) - .args(["--admin-token-recovery-http-bind", "0.0.0.0:0"]) .args(["--wal-flush-interval", "10ms"]) .args(["--wal-snapshot-size", "1"]) .args([ @@ -337,13 +350,21 @@ impl TestServer { tcp_addr_file .to_str() .expect("valid tcp listener file path"), - ]) - .args([ - "--admin-token-recovery-tcp-listener-file-path", - tcp_addr_file_2 - .to_str() - .expect("valid tcp listener file path"), - ]) + ]); + + // Only add recovery endpoint args if explicitly enabled + if config.recovery_endpoint_enabled() { + command = command + .args(["--admin-token-recovery-http-bind", "0.0.0.0:0"]) + .args([ + "--admin-token-recovery-tcp-listener-file-path", + tcp_addr_file_2 + .to_str() + .expect("valid tcp listener file path"), + ]); + } + + let command = command .args([ "--tls-cert", if config.bad_tls() { @@ -433,7 +454,11 @@ impl TestServer { let bind_addr = find_bind_addr(tcp_addr_file).await; - let admin_token_recovery_bind_addr = find_bind_addr(tcp_addr_file_2).await; + let admin_token_recovery_bind_addr = if config.recovery_endpoint_enabled() { + Some(find_bind_addr(tcp_addr_file_2).await) + } else { + None + }; let http_client = reqwest::ClientBuilder::new() .min_tls_version(Version::TLS_1_3) @@ -487,13 +512,17 @@ impl TestServer { /// Get the URL of admin token recovery service for use with an HTTP client pub fn admin_token_recovery_client_addr(&self) -> String { - format!( - "https://localhost:{}", - self.admin_token_recovery_bind_addr - .split(':') - .nth(1) - .unwrap() - ) + match &self.admin_token_recovery_bind_addr { + Some(addr) => format!("https://localhost:{}", addr.split(':').nth(1).unwrap()), + None => panic!( + "admin_token_recovery_client_addr called on TestServer without recovery endpoint enabled. Use .with_recovery_endpoint() when configuring the test server." + ), + } + } + + /// Check if the recovery endpoint is enabled + pub fn has_recovery_endpoint(&self) -> bool { + self.admin_token_recovery_bind_addr.is_some() } /// Get the token for the server diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index abb4dcaa828..8a3d9918072 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -558,6 +558,24 @@ pub struct HttpApi { legacy_write_param_unifier: SingleTenantRequestUnifier, } +/// Wrapper for HttpApi used by the recovery endpoint that includes a shutdown token +pub(crate) struct RecoveryHttpApi { + http_api: Arc, + cancellation_token: tokio_util::sync::CancellationToken, +} + +impl RecoveryHttpApi { + pub(crate) fn new( + http_api: Arc, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Self { + Self { + http_api, + cancellation_token, + } + } +} + impl HttpApi { pub fn new( common_state: CommonServerState, @@ -1857,8 +1875,25 @@ async fn record_batch_stream_to_body( } } +/// This is used to trigger a shutdown when it's dropped. +struct ShutdownTrigger { + token: tokio_util::sync::CancellationToken, +} + +impl ShutdownTrigger { + fn new(token: tokio_util::sync::CancellationToken) -> Self { + Self { token } + } +} + +impl Drop for ShutdownTrigger { + fn drop(&mut self) { + self.token.cancel(); + } +} + pub(crate) async fn route_admin_token_recovery_request( - http_server: Arc, + recovery_api: Arc, req: Request, ) -> Result { let method = req.method().clone(); @@ -1869,7 +1904,24 @@ pub(crate) async fn route_admin_token_recovery_request( let response = match (method.clone(), uri.path()) { (Method::POST, all_paths::API_V3_CONFIGURE_ADMIN_TOKEN_REGENERATE) => { info!("Regenerating admin token without password through token recovery API request"); - http_server.regenerate_admin_token(req).await + let result = recovery_api.http_api.regenerate_admin_token(req).await; + + // If token regeneration was successful, trigger shutdown of the recovery endpoint + if let Ok(response) = result { + info!("Admin token regenerated successfully, shutting down recovery endpoint"); + let cancellation_token = recovery_api.cancellation_token.clone(); + let mut res_builder = ResponseBuilder::new(); + let extensions = res_builder.extensions_mut().unwrap(); + let shutdown_trigger = ShutdownTrigger::new(cancellation_token); + extensions.insert(shutdown_trigger); + + Ok(res_builder + .status(response.status()) + .body(response.into_body()) + .unwrap()) + } else { + result + } } _ => { let body = bytes_to_response_body("not found"); diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 2ebe48065ed..88ff38d2db9 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -21,6 +21,7 @@ mod system_tables; use crate::grpc::make_flight_server; use crate::http::HttpApi; +use crate::http::RecoveryHttpApi; use crate::http::route_request; use authz::Authorizer; use http::route_admin_token_recovery_request; @@ -212,10 +213,19 @@ pub async fn serve_admin_token_recovery_endpoint( ADMIN_TOKEN_RECOVERY_TRACE_HTTP_SERVER_NAME, ); + // Create a dedicated shutdown token for the recovery endpoint + // This allows us to shut down just the recovery endpoint after token regeneration + let recovery_shutdown = CancellationToken::new(); + + // Create the recovery API wrapper with the shutdown token + let recovery_api = Arc::new(RecoveryHttpApi::new( + Arc::clone(&server.http), + recovery_shutdown.clone(), + )); + if let (Some(key_file), Some(cert_file)) = (&server.key_file, &server.cert_file) { let listener = server.listener; let tls_min = server.tls_minimum_version; - let http = Arc::clone(&server.http); let (addr, certs, key) = setup_tls(listener, key_file, cert_file)?; info!( @@ -223,11 +233,10 @@ pub async fn serve_admin_token_recovery_endpoint( "starting admin token recovery endpoint with TLS on", ); - let http_server = Arc::clone(&http); let rest_service = hyper::service::make_service_fn(move |_| { - let http_server = Arc::clone(&http_server); + let recovery_api = Arc::clone(&recovery_api); let service = service_fn(move |req: hyper::Request| { - route_admin_token_recovery_request(Arc::clone(&http_server), req) + route_admin_token_recovery_request(Arc::clone(&recovery_api), req) }); let service = http_trace_layer.layer(service); futures::future::ready(Ok::<_, Infallible>(service)) @@ -245,15 +254,19 @@ pub async fn serve_admin_token_recovery_endpoint( .with_all_versions_alpn() .with_incoming(addr); - hyper::server::Server::builder(acceptor) - .serve(rest_service) - .with_graceful_shutdown(shutdown.cancelled()) - .await?; + // Use both the main shutdown and recovery shutdown tokens + tokio::select! { + res = hyper::server::Server::builder(acceptor).serve(rest_service) => res?, + _ = shutdown.cancelled() => {}, + _ = recovery_shutdown.cancelled() => { + info!("Admin token recovery endpoint shutting down after token regeneration"); + }, + } } else { let rest_service = hyper::service::make_service_fn(|_| { - let http_server = Arc::clone(&server.http); + let recovery_api = Arc::clone(&recovery_api); let service = service_fn(move |req: hyper::Request| { - route_admin_token_recovery_request(Arc::clone(&http_server), req) + route_admin_token_recovery_request(Arc::clone(&recovery_api), req) }); let service = http_trace_layer.layer(service); futures::future::ready(Ok::<_, Infallible>(service)) @@ -264,11 +277,17 @@ pub async fn serve_admin_token_recovery_endpoint( address = %addr.local_addr(), "starting admin token recovery endpoint on", ); - hyper::server::Builder::new(addr, Http::new()) - .tcp_nodelay(true) - .serve(rest_service) - .with_graceful_shutdown(shutdown.cancelled()) - .await?; + + // Use both the main shutdown and recovery shutdown tokens + tokio::select! { + res = hyper::server::Builder::new(addr, Http::new()) + .tcp_nodelay(true) + .serve(rest_service) => res?, + _ = shutdown.cancelled() => {}, + _ = recovery_shutdown.cancelled() => { + info!("Admin token recovery endpoint shutting down after token regeneration"); + }, + } } Ok(()) From 40102ae768504b9142bcd90e98e73e2f57c30686 Mon Sep 17 00:00:00 2001 From: Praveen Kumar Date: Fri, 18 Jul 2025 12:29:18 +0100 Subject: [PATCH 4/4] refactor: host url updates when regenerating token - when `--regenerate` is passed in, `--host` still defaults to the main server. To get to the recovery server, `--host` with the recovery server address should be passed in --- influxdb3/src/commands/create.rs | 7 +- influxdb3/src/commands/create/token.rs | 60 +++--------- influxdb3/tests/cli/admin_token.rs | 124 +++++++++++++++++-------- influxdb3/tests/cli/api.rs | 14 +-- 4 files changed, 102 insertions(+), 103 deletions(-) diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs index 1ecfcf62a81..72d930535ae 100644 --- a/influxdb3/src/commands/create.rs +++ b/influxdb3/src/commands/create.rs @@ -78,11 +78,12 @@ impl Config { }) => (host_url, auth_token, ca_cert), SubCommand::Token(create_token_config) => { let host_settings = create_token_config.get_connection_settings()?; - let effective_host_url = create_token_config.get_effective_host_url(); // We need to return references, so we'll handle this differently return Ok({ - let mut client = - Client::new(effective_host_url, host_settings.ca_cert.clone())?; + let mut client = Client::new( + host_settings.host_url.clone(), + host_settings.ca_cert.clone(), + )?; if let Some(token) = &host_settings.auth_token { client = client.with_auth_token(token.expose_secret()); } diff --git a/influxdb3/src/commands/create/token.rs b/influxdb3/src/commands/create/token.rs index 0415f706107..b0f07856e65 100644 --- a/influxdb3/src/commands/create/token.rs +++ b/influxdb3/src/commands/create/token.rs @@ -76,17 +76,14 @@ pub enum TokenOutputFormat { #[derive(Parser, Clone, Debug)] pub struct InfluxDb3ServerConfig { /// The host URL of the running InfluxDB 3 Core server. - /// - /// If not specified: - /// - Default is http://127.0.0.1:8181 - /// - With --regenerate, default is http://127.0.0.1:8182 (admin token recovery endpoint) #[clap( name = "host", + short = 'H', long = "host", env = "INFLUXDB3_HOST_URL", - help = "The host URL of the running InfluxDB 3 Core server" + default_value = "http://127.0.0.1:8181" )] - pub host_url: Option, + pub host_url: Url, /// The token for authentication with the InfluxDB 3 Core server to create permissions. /// This will be the admin token to create tokens with permissions @@ -102,12 +99,13 @@ pub struct InfluxDb3ServerConfig { pub struct CreateAdminTokenConfig { /// Operator token will be regenerated when this is set. /// - /// When used without --host, connects to the admin token recovery endpoint (port 8182) - /// instead of the default server endpoint (port 8181). + /// When used without --host, connects to the default server endpoint (port 8181). + /// To use the admin token recovery endpoint, specify --host with the recovery endpoint address. #[clap( name = "regenerate", long = "regenerate", - help = "Regenerate the operator token (uses port 8182 by default instead of 8181)" + help = "Regenerate the operator token. By default connects to the main server (http://127.0.0.1:8181). + To use the admin token recovery endpoint, specify --host with the recovery endpoint address" )] pub regenerate: bool, @@ -167,43 +165,6 @@ impl CreateTokenConfig { } } - /// Get the effective host URL for the operation. - /// - /// When `--regenerate` is used and no host is specified, this will return - /// the admin token recovery endpoint (port 8182) instead of the default (port 8181). - /// - /// # Examples - /// - `influxdb3 create token --admin` → uses http://127.0.0.1:8181 - /// - `influxdb3 create token --admin --regenerate` → uses http://127.0.0.1:8182 - /// - `influxdb3 create token --admin --regenerate --host http://127.0.0.1:8181` → uses http://127.0.0.1:8181 - /// - `influxdb3 create token --admin --regenerate --host http://custom:9999` → uses http://custom:9999 - pub fn get_effective_host_url(&self) -> Url { - match &self.admin_config { - Some(admin_config) => { - match &admin_config.host.host_url { - Some(url) => { - // User explicitly provided a host URL, use it as-is - url.clone() - } - None => { - // No host URL provided, use default based on regenerate flag - if admin_config.regenerate { - Url::parse("http://127.0.0.1:8182") - .expect("hardcoded URL should be valid") - } else { - Url::parse("http://127.0.0.1:8181") - .expect("hardcoded URL should be valid") - } - } - } - } - None => { - // This shouldn't happen in practice, but provide a sensible default - Url::parse("http://127.0.0.1:8181").expect("hardcoded URL should be valid") - } - } - } - pub fn get_output_format(&self) -> Option<&TokenOutputFormat> { match &self.admin_config { Some(admin_config) => admin_config.format.as_ref(), @@ -261,9 +222,10 @@ impl CommandFactory for CreateTokenConfig { .about("Create or regenerate an admin token") .long_about( "Create or regenerate an admin token.\n\n\ - When using --regenerate without specifying --host, the command will \ - connect to the admin token recovery endpoint (http://127.0.0.1:8182) \ - instead of the default server endpoint (http://127.0.0.1:8181).", + When using --regenerate, the command connects to the default server \ + endpoint (http://127.0.0.1:8181) unless you specify a different --host. \ + To use the admin token recovery endpoint, specify --host with the \ + recovery endpoint address configured via --admin-token-recovery-http-bind.", ); let all_args = CreateAdminTokenConfig::as_args(); let admin_sub_cmd = admin_sub_cmd.args(all_args); diff --git a/influxdb3/tests/cli/admin_token.rs b/influxdb3/tests/cli/admin_token.rs index 771d7cfe5d0..5b5d8a26ed2 100644 --- a/influxdb3/tests/cli/admin_token.rs +++ b/influxdb3/tests/cli/admin_token.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use crate::cli::api::run_cmd_with_result; use crate::server::{ConfigProvider, TestServer, parse_token}; use observability_deps::tracing::info; use serde_json::Value; @@ -153,12 +154,20 @@ async fn test_regenerate_admin_token_without_auth_using_token_recovery_service() assert_contains!(&result, "Failed to create token"); // regenerate token using the admin token recovery server - let result = server - .run_regenerate_with_confirmation( - vec!["create", "token", "--admin"], - &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], - ) - .unwrap(); + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); assert_contains!(&result, "New token created successfully!"); let old_token = admin_token.clone(); @@ -633,12 +642,20 @@ async fn test_recovery_service_with_auth_disabled() { .await; // Try to use recovery service when auth is disabled - should fail - let result = server - .run_regenerate_with_confirmation( - vec!["create", "token", "--admin"], - &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], - ) - .unwrap(); + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); // Should get an error - recovery service runs but there's no admin token to regenerate assert_contains!(&result, "missing admin token, cannot update"); @@ -681,12 +698,20 @@ async fn test_recovery_service_does_not_affect_named_admin_tokens() { let named_admin_token = parse_token(result); // Regenerate operator token via recovery service - let result = server - .run_regenerate_with_confirmation( - vec!["create", "token", "--admin"], - &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], - ) - .unwrap(); + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); assert_contains!(&result, "New token created successfully!"); let new_operator_token = parse_token(result); @@ -737,11 +762,11 @@ async fn test_recovery_service_cannot_create_new_admin_token() { #[test_log::test(tokio::test)] async fn test_recovery_endpoint_disabled_by_default() { // Start server without recovery endpoint enabled - let _server = TestServer::configure().spawn().await; + let _server = TestServer::configure().with_capture_logs().spawn().await; // Try to connect to the recovery endpoint on default port let client = reqwest::Client::new(); - let recovery_url = "http://127.0.0.1:8182/api/v3/configure/admin_token/regenerate"; + let recovery_url = "http://127.0.0.1:8182/api/v3/configure/token/admin/regenerate"; // This should fail since the recovery endpoint is not enabled let result = client.post(recovery_url).send().await; @@ -774,23 +799,38 @@ async fn test_recovery_endpoint_auto_shutdown_after_regeneration() { let initial_token = parse_token(result); // Use the recovery endpoint to regenerate the token - let result = server - .run_regenerate_with_confirmation( - vec!["create", "token", "--admin"], - &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], - ) - .unwrap(); + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); assert_contains!(&result, "New token created successfully!"); let new_token = parse_token(result); // Use the recovery endpoint to regenerate the token again, recovery server should have been // shutdown - let result = server - .run_regenerate_with_confirmation( - vec!["create", "token", "--admin"], - &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], - ) - .unwrap(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); assert_contains!(result, "ConnectError"); // Verify tokens are different @@ -847,12 +887,20 @@ async fn test_main_server_continues_after_recovery_endpoint_shutdown() { ); // Use the recovery endpoint to regenerate the token - let result = server - .run_regenerate_with_confirmation( - vec!["create", "token", "--admin"], - &["--regenerate", "--tls-ca", "../testing-certs/rootCA.pem"], - ) - .unwrap(); + let recovery_addr = server.admin_token_recovery_client_addr(); + let result = run_cmd_with_result( + &["--tls-ca", "../testing-certs/rootCA.pem"], + Some("yes"), + vec![ + "create", + "token", + "--admin", + "--regenerate", + "--host", + &recovery_addr, + ], + ) + .unwrap(); assert_contains!(&result, "New token created successfully!"); let new_token = parse_token(result); diff --git a/influxdb3/tests/cli/api.rs b/influxdb3/tests/cli/api.rs index 1114eff6ccd..5a72299e5b9 100644 --- a/influxdb3/tests/cli/api.rs +++ b/influxdb3/tests/cli/api.rs @@ -39,18 +39,6 @@ impl TestServer { run_cmd_with_result(args, input, command_args) } - pub fn run_regenerate_with_confirmation( - &self, - commands: Vec<&str>, - args: &[&str], - ) -> Result { - let client_addr = self.admin_token_recovery_client_addr(); - let mut command_args = commands.clone(); - command_args.push("--host"); - command_args.push(client_addr.as_str()); - run_cmd_with_result(args, Some("yes"), command_args) - } - pub fn run(&self, commands: Vec<&str>, args: &[&str]) -> Result { self.run_with_options(commands, args, None) } @@ -60,7 +48,7 @@ impl TestServer { } } -fn run_cmd_with_result( +pub(super) fn run_cmd_with_result( args: &[&str], input: Option<&str>, command_args: Vec<&str>,