Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ use influxdb3_cache::{
use influxdb3_catalog::{CatalogError, catalog::Catalog};
use influxdb3_clap_blocks::plugins::{PackageManager, ProcessingEngineConfig};
use influxdb3_clap_blocks::{
datafusion::IoxQueryDatafusionConfig,
memory_size::MemorySize,
object_store::{ObjectStoreConfig, ObjectStoreType},
socket_addr::SocketAddr,
tokio::TokioDatafusionConfig,
datafusion::IoxQueryDatafusionConfig, memory_size::MemorySize, object_store::ObjectStoreConfig,
socket_addr::SocketAddr, tokio::TokioDatafusionConfig,
};
use influxdb3_process::{
INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_START_TIME, PROCESS_UUID_STR, ProcessUuidGetter,
Expand Down Expand Up @@ -1112,9 +1109,7 @@ async fn setup_telemetry_store(
let influxdb_pkg_name = env!("CARGO_PKG_NAME");
// Following should show influxdb3-0.1.0
let influx_version = format!("{influxdb_pkg_name}-{influxdb_pkg_version}");
let obj_store_type = object_store_config
.object_store
.unwrap_or(ObjectStoreType::Memory);
let obj_store_type = object_store_config.object_store;
let storage_type = obj_store_type.as_str();

if disable_upload {
Expand Down
2 changes: 1 addition & 1 deletion influxdb3/src/help/serve.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Examples
{}
--node-id <NODE_ID> Node identifier used as prefix in object store file paths
[env: INFLUXDB3_NODE_IDENTIFIER_PREFIX=]
--object-store <OBJECT_STORE> Which object storage to use. If not specified, defaults to memory.
--object-store <OBJECT_STORE> Which object storage to use.
[env: INFLUXDB3_OBJECT_STORE=]
[possible values: memory, memory-throttled, file, s3, google, azure]

Expand Down
2 changes: 1 addition & 1 deletion influxdb3/src/help/serve_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Examples:
{}
--node-id <NODE_ID> Node identifier used as prefix in object store file paths
[env: INFLUXDB3_NODE_IDENTIFIER_PREFIX=]
--object-store <STORE> Object storage to use [default: memory]
--object-store <STORE> Object storage to use
[possible values: memory, memory-throttled, file, s3, google, azure]

{}
Expand Down
26 changes: 26 additions & 0 deletions influxdb3/tests/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2987,3 +2987,29 @@ async fn test_db_name_cannot_start_with_underscore_on_create_table() {
"db name did not start with a number or letter"
);
}

#[test_log::test(tokio::test)]
async fn test_serve_command_error_msg() {
let output = assert_cmd::Command::cargo_bin("influxdb3")
.unwrap()
.args(["serve", "--node-id", "node-1"])
.output()
.unwrap()
.stderr
.clone();

let full_cmd =
"influxdb3 serve --object-store <object-store> --node-id <NODE_IDENTIFIER_PREFIX>";
assert_object_store_error_msg(output, full_cmd);
}

fn assert_object_store_error_msg(error_output: Vec<u8>, full_command: &str) {
let str_msg = String::from_utf8(error_output).unwrap();
assert_contains!(&str_msg, full_command);

assert_contains!(
&str_msg,
"error: the following required arguments were not provided:
--object-store <object-store>"
);
}
75 changes: 39 additions & 36 deletions influxdb3_clap_blocks/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ pub enum ParseError {
missing: String,
},

#[snafu(display("Object store not specified"))]
UnspecifiedObjectStore,

// Creating a new S3 object store can fail if the region is *specified* but
// not *parseable* as a rusoto `Region`. The other object store constructors
// don't return `Result`.
Expand Down Expand Up @@ -347,11 +344,11 @@ macro_rules! object_store_config_inner {
/// CLI config for object stores.
#[derive(Debug, Clone, clap::Parser)]
pub struct [<$prefix:camel ObjectStoreConfig>] {
/// Which object storage to use. If not specified, defaults to memory.
/// Which object storage to use.
///
/// Possible values (case insensitive):
///
/// * memory (default): Effectively no object persistence.
/// * memory: Effectively no object persistence.
/// * memorythrottled: Like `memory` but with latency and throughput that somewhat resamble a cloud
/// object store. Useful for testing and benchmarking.
/// * file: Stores objects in the local filesystem. Must also set `--data-dir`.
Expand All @@ -367,9 +364,10 @@ macro_rules! object_store_config_inner {
env = gen_env!($prefix, "INFLUXDB3_OBJECT_STORE"),
ignore_case = true,
action,
required = true,
verbatim_doc_comment
)]
pub object_store: Option<ObjectStoreType>,
pub object_store: ObjectStoreType,

/// Name of the bucket to use for the object store. Must also set
/// `--object-store` to a cloud object storage to have any effect.
Expand Down Expand Up @@ -658,17 +656,21 @@ macro_rules! object_store_config_inner {

/// Create a new instance for all-in-one mode, only allowing some arguments.
pub fn new(database_directory: Option<PathBuf>) -> Self {
match &database_directory {
Some(dir) => info!("Object store: File-based in `{}`", dir.display()),
None => info!("Object store: In-memory"),
}

let object_store = database_directory.as_ref().map(|_| ObjectStoreType::File);
let object_store = match &database_directory {
Some(dir) => {
info!("Object store: File-based in `{}`", dir.display());
ObjectStoreType::File
}
None => {
info!("Object store: In-memory");
ObjectStoreType::Memory
}
};

Self {
aws_access_key_id: Default::default(),
aws_allow_http: Default::default(),
aws_default_region: Default::default(),
aws_default_region: FALLBACK_AWS_REGION.to_string(),
aws_endpoint: Default::default(),
aws_secret_access_key: Default::default(),
aws_session_token: Default::default(),
Expand Down Expand Up @@ -912,19 +914,18 @@ macro_rules! object_store_config_inner {
/// Create config-dependant object store.
pub fn make_object_store(&self) -> Result<Arc<DynObjectStore>, ParseError> {
if let Some(data_dir) = &self.database_directory {
if !matches!(&self.object_store, Some(ObjectStoreType::File)) {
if !matches!(&self.object_store, ObjectStoreType::File) {
warn!(?data_dir, object_store_type=?self.object_store,
"--data-dir / `INFLUXDB3_DB_DIR` ignored. It only affects 'file' object stores");
}
}

let object_store: Arc<DynObjectStore> = match &self.object_store {
None => return Err(ParseError::UnspecifiedObjectStore),
Some(ObjectStoreType::Memory) => {
ObjectStoreType::Memory => {
info!(object_store_type = "Memory", "Object Store");
Arc::new(InMemory::new())
}
Some(ObjectStoreType::MemoryThrottled) => {
ObjectStoreType::MemoryThrottled => {
let config = ThrottleConfig {
// for every call: assume a 100ms latency
wait_delete_per_call: Duration::from_millis(100),
Expand All @@ -945,10 +946,10 @@ macro_rules! object_store_config_inner {
Arc::new(ThrottledStore::new(InMemory::new(), config))
}

Some(ObjectStoreType::Google) => self.new_gcs()?,
Some(ObjectStoreType::S3) => self.new_s3()?,
Some(ObjectStoreType::Azure) => self.new_azure()?,
Some(ObjectStoreType::File) => self.new_local_file_system()?,
ObjectStoreType::Google => self.new_gcs()?,
ObjectStoreType::S3 => self.new_s3()?,
ObjectStoreType::Azure => self.new_azure()?,
ObjectStoreType::File => self.new_local_file_system()?,
};

Ok(object_store)
Expand Down Expand Up @@ -1024,8 +1025,8 @@ pub fn make_presigned_url_signer(
config: &ObjectStoreConfig,
) -> Result<Option<Arc<dyn object_store::signer::Signer>>, ParseError> {
match &config.object_store {
Some(ObjectStoreType::S3) => Ok(Some(Arc::new(config.build_s3_signer()?))),
Some(ObjectStoreType::File) => Ok(Some(Arc::new(LocalUploadSigner::new(config)?))),
ObjectStoreType::S3 => Ok(Some(Arc::new(config.build_s3_signer()?))),
ObjectStoreType::File => Ok(Some(Arc::new(LocalUploadSigner::new(config)?))),
_ => Ok(None),
}
}
Expand All @@ -1037,7 +1038,7 @@ pub fn make_presigned_url_signer(
config: &ObjectStoreConfig,
) -> Result<Option<Arc<dyn object_store::signer::Signer>>, ParseError> {
match &config.object_store {
Some(ObjectStoreType::File) => Ok(Some(Arc::new(LocalUploadSigner::new(config)?))),
ObjectStoreType::File => Ok(Some(Arc::new(LocalUploadSigner::new(config)?))),
_ => Ok(None),
}
}
Expand Down Expand Up @@ -1372,15 +1373,10 @@ mod tests {

#[test]
fn object_store_flag_is_required() {
let configs = vec![
StoreConfigs::Base(ObjectStoreConfig::try_parse_from(["server"]).unwrap()),
StoreConfigs::Source(SourceObjectStoreConfig::try_parse_from(["server"]).unwrap()),
StoreConfigs::Sink(SinkObjectStoreConfig::try_parse_from(["server"]).unwrap()),
];
for config in configs {
let err = config.make_object_store().unwrap_err().to_string();
assert_eq!(err, "Object store not specified");
}
// Since object-store is now required, parsing should fail without it
assert!(ObjectStoreConfig::try_parse_from(["server"]).is_err());
assert!(SourceObjectStoreConfig::try_parse_from(["server"]).is_err());
assert!(SinkObjectStoreConfig::try_parse_from(["server"]).is_err());
}

#[test]
Expand Down Expand Up @@ -1410,7 +1406,8 @@ mod tests {

#[test]
fn default_url_signer_is_none() {
let config = ObjectStoreConfig::try_parse_from(["server"]).unwrap();
let config =
ObjectStoreConfig::try_parse_from(["server", "--object-store", "memory"]).unwrap();

let signer = make_presigned_url_signer(&config).unwrap();
assert!(signer.is_none(), "Expected None, got {signer:?}");
Expand Down Expand Up @@ -1476,8 +1473,14 @@ mod tests {
#[test]
#[cfg(feature = "aws")]
fn valid_s3_endpoint_url() {
ObjectStoreConfig::try_parse_from(["server", "--aws-endpoint", "http://whatever.com"])
.expect("must successfully parse config with absolute AWS endpoint URL");
ObjectStoreConfig::try_parse_from([
"server",
"--aws-endpoint",
"http://whatever.com",
"--object-store",
"s3",
])
.expect("must successfully parse config with absolute AWS endpoint URL");
}

#[test]
Expand Down