Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions influxdb3/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ impl Config {
}) => (host_url, auth_token, ca_cert),
SubCommand::Token(create_token_config) => {
let host_settings = create_token_config.get_connection_settings()?;
(
&host_settings.host_url,
&host_settings.auth_token,
&host_settings.ca_cert,
)
// We need to return references, so we'll handle this differently
return Ok({
let mut client = Client::new(
host_settings.host_url.clone(),
host_settings.ca_cert.clone(),
)?;
if let Some(token) = &host_settings.auth_token {
client = client.with_auth_token(token.expose_secret());
}
client
});
}
};

Expand Down
31 changes: 24 additions & 7 deletions influxdb3/src/commands/create/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ pub enum TokenOutputFormat {

#[derive(Parser, Clone, Debug)]
pub struct InfluxDb3ServerConfig {
/// The host URL of the running InfluxDB 3 Core server
/// The host URL of the running InfluxDB 3 Core server.
#[clap(
name = "host",
short = 'H',
long = "host",
default_value = "http://127.0.0.1:8181",
env = "INFLUXDB3_HOST_URL"
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
pub host_url: Url,

Expand All @@ -96,8 +97,16 @@ pub struct InfluxDb3ServerConfig {

#[derive(Parser, Debug)]
pub struct CreateAdminTokenConfig {
/// Operator token will be regenerated when this is set
#[clap(name = "regenerate", long = "regenerate")]
/// Operator token will be regenerated when this is set.
///
/// When used without --host, connects to the default server endpoint (port 8181).
/// To use the admin token recovery endpoint, specify --host with the recovery endpoint address.
#[clap(
name = "regenerate",
long = "regenerate",
help = "Regenerate the operator token. By default connects to the main server (http://127.0.0.1:8181).
To use the admin token recovery endpoint, specify --host with the recovery endpoint address"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

)]
pub regenerate: bool,

// for named admin and permission tokens this is mandatory but not for admin tokens
Expand Down Expand Up @@ -208,8 +217,16 @@ impl Args for CreateTokenConfig {

impl CommandFactory for CreateTokenConfig {
fn command() -> clap::Command {
let admin_sub_cmd =
ClapCommand::new("--admin").override_usage("influxdb3 create token --admin [OPTIONS]");
let admin_sub_cmd = ClapCommand::new("--admin")
.override_usage("influxdb3 create token --admin [OPTIONS]")
.about("Create or regenerate an admin token")
.long_about(
"Create or regenerate an admin token.\n\n\
When using --regenerate, the command connects to the default server \
endpoint (http://127.0.0.1:8181) unless you specify a different --host. \
To use the admin token recovery endpoint, specify --host with the \
recovery endpoint address configured via --admin-token-recovery-http-bind.",
);
let all_args = CreateAdminTokenConfig::as_args();
let admin_sub_cmd = admin_sub_cmd.args(all_args);

Expand Down
125 changes: 115 additions & 10 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use influxdb3_server::{
CommonServerState, CreateServerArgs, Server,
http::HttpApi,
query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl},
serve,
serve, serve_admin_token_recovery_endpoint,
};
use influxdb3_shutdown::{ShutdownManager, wait_for_signal};
use influxdb3_sys_events::SysEventStore;
Expand Down Expand Up @@ -82,6 +82,9 @@ pub const DEFAULT_DATA_DIRECTORY_NAME: &str = ".influxdb3";
/// The default bind address for the HTTP API.
pub const DEFAULT_HTTP_BIND_ADDR: &str = "0.0.0.0:8181";

/// The default bind address for admin token recovery HTTP API.
pub const DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR: &str = "127.0.0.1:8182";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that this matters, but the default here uses 127.0.0.1 vs. 0.0.0.0 for the main bind address

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intentional, we want it to default to only loop back address. It makes choosing to listen on any other address as an opt-in as this allows the clients with access to this interface regenerate an admin token without password.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, that makes sense, thanks for clarifying.


pub const DEFAULT_TELEMETRY_ENDPOINT: &str = "https://telemetry.v3.influxdata.com";

#[derive(Debug, Error)]
Expand Down Expand Up @@ -125,6 +128,9 @@ pub enum Error {
#[error("lost HTTP/gRPC service")]
LostHttpGrpc,

#[error("lost admin token recovery service")]
LostAdminTokenRecovery,

#[error("tls requires both a cert and a key file to be passed in to work")]
NoCertOrKeyFile,
}
Expand Down Expand Up @@ -179,6 +185,19 @@ pub struct Config {
)]
pub http_bind_address: SocketAddr,

/// Enable admin token recovery endpoint on the specified address.
/// Use flag alone for default address (127.0.0.1:8182) or provide a custom address.
/// WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution!
#[clap(
long = "admin-token-recovery-http-bind",
env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR",
num_args = 0..=1,
default_missing_value = DEFAULT_ADMIN_TOKEN_RECOVERY_BIND_ADDR,
help = "Enable admin token recovery endpoint. Use flag alone for default address (127.0.0.1:8182) or with value for custom address",
action,
)]
pub admin_token_recovery_bind_address: Option<SocketAddr>,

/// Size of memory pool used during query exec, in megabytes.
///
/// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`).
Expand Down Expand Up @@ -450,6 +469,16 @@ pub struct Config {
)]
pub tcp_listener_file_path: Option<PathBuf>,

/// Provide a file path to write the address that the admin recovery endpoint mounted server is listening on to
///
/// This is mainly intended for testing purposes and is not considered stable.
#[clap(
long = "admin-token-recovery-tcp-listener-file-path",
env = "INFLUXDB3_ADMIN_TOKEN_RECOVERY_TCP_LISTENER_FILE_PATH",
hide = true
)]
pub admin_token_recovery_tcp_listener_file_path: Option<PathBuf>,

#[clap(
long = "wal-replay-concurrency-limit",
env = "INFLUXDB3_WAL_REPLAY_CONCURRENCY_LIMIT"
Expand Down Expand Up @@ -902,6 +931,15 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(Error::BindAddress)?;

// Only create recovery listener if explicitly enabled
let admin_token_recovery_listener = if let Some(addr) = config.admin_token_recovery_bind_address
{
info!(%addr, "Admin token recovery endpoint enabled - WARNING: This allows unauthenticated admin token regeneration!");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Some(TcpListener::bind(*addr).await.map_err(Error::BindAddress)?)
} else {
None
};

let processing_engine = ProcessingEngineManagerImpl::new(
setup_processing_engine_env_manager(&config.processing_engine_config),
write_buffer.catalog(),
Expand Down Expand Up @@ -945,6 +983,19 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&authorizer),
));

// Only create recovery server if listener was created
let admin_token_recovery_server = admin_token_recovery_listener.map(|listener| {
Server::new(CreateServerArgs {
common_state: common_state.clone(),
http: Arc::clone(&http),
authorizer: Arc::clone(&authorizer),
listener,
cert_file: cert_file.clone(),
key_file: key_file.clone(),
tls_minimum_version: config.tls_minimum_version.into(),
})
});

let server = Server::new(CreateServerArgs {
common_state,
http,
Expand Down Expand Up @@ -987,6 +1038,7 @@ pub async fn command(config: Config) -> Result<()> {
?paths_without_authz,
"setting up server with authz disabled for paths"
);

let frontend = serve(
server,
frontend_shutdown.clone(),
Expand All @@ -998,13 +1050,33 @@ pub async fn command(config: Config) -> Result<()> {
.fuse();
let backend = shutdown_manager.join().fuse();

// Only start recovery endpoint if server was created
let recovery_endpoint_enabled = admin_token_recovery_server.is_some();
let recovery_frontend = if let Some(recovery_server) = admin_token_recovery_server {
futures::future::Either::Left(
serve_admin_token_recovery_endpoint(
recovery_server,
frontend_shutdown.clone(),
config.admin_token_recovery_tcp_listener_file_path,
)
.fuse(),
)
} else {
// Provide a future that never completes if recovery endpoint is disabled
futures::future::Either::Right(
futures::future::pending::<Result<(), influxdb3_server::Error>>().fuse(),
)
};

// pin_mut constructs a Pin<&mut T> from a T by preventing moving the T
// from the current stack frame and constructing a Pin<&mut T> to it
pin_mut!(signal);
pin_mut!(frontend);
pin_mut!(backend);
pin_mut!(recovery_frontend);

let mut res = Ok(());
let mut recovery_endpoint_active = recovery_endpoint_enabled;

// Graceful shutdown can be triggered by sending SIGINT or SIGTERM to the
// process, or by a background task exiting - most likely with an error
Expand All @@ -1029,16 +1101,49 @@ pub async fn command(config: Config) -> Result<()> {
res = res.and(Err(Error::LostBackend));
}
// HTTP/gRPC frontend has stopped
result = frontend => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP/gRPC service shutdown"),
Ok(_) => {
error!("early HTTP/gRPC service exit");
res = res.and(Err(Error::LostHttpGrpc));
},
Err(error) => {
error!("HTTP/gRPC error");
res = res.and(Err(Error::Server(error)));
result = frontend => {
match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP/gRPC service shutdown"),
Ok(_) => {
error!("early HTTP/gRPC service exit");
res = res.and(Err(Error::LostHttpGrpc));
},
Err(error) => {
error!("HTTP/gRPC error");
res = res.and(Err(Error::Server(error)));
}
}
}
recovery_result = recovery_frontend => {
// Only process recovery endpoint results if it was actually enabled and active
if recovery_endpoint_enabled && recovery_endpoint_active {
match recovery_result {
Ok(_) if frontend_shutdown.is_cancelled() => {
info!("Admin token recovery service shutdown");
// Only break if the main shutdown was also requested
if frontend.is_terminated() {
break;
}
}
Ok(_) => {
// Recovery endpoint can shut down normally after token regeneration
// This is expected behavior and should not cause an error
info!("Admin token recovery service exited normally after token regeneration");
recovery_endpoint_active = false;
// Since recovery_frontend is a FusedFuture, it won't be polled again
// after completion, so we don't need to do anything else
// Continue the loop - do NOT break or call shutdown
continue; // Skip shutdown_manager.shutdown() for this iteration
}
Err(error) => {
error!(%error, "admin token recovery service error");
res = res.and(Err(Error::Server(error)));
// Continue running the main server even if recovery endpoint had an error
}
}
}
// If recovery endpoint was disabled, this branch will never be taken again
// because pending() futures never complete
}
}
shutdown_manager.shutdown()
Expand Down
5 changes: 5 additions & 0 deletions influxdb3/src/help/serve_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ Examples:
list of resources. Valid values are health, ping, and metrics.
To disable auth for multiple resources pass in a list, eg.
`--disable-authz health,ping`
--admin-token-recovery-http-bind <ADDR>
Address for HTTP API for admin token recovery requests [default: 127.0.0.1:8182]
WARNING: This endpoint allows unauthenticated admin token regeneration - use with caution!
[env: INFLUXDB3_ADMIN_TOKEN_RECOVERY_HTTP_BIND_ADDR]


{}
--data-dir <DIR> Location to store files locally [env: INFLUXDB3_DB_DIR=]
Expand Down
Loading