diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 1178c0aad6502..85e0009bd2676 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use datafusion::common::config::{ ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, TableOptions, Visit, }; -use datafusion::common::{exec_datafusion_err, exec_err, internal_err}; +use datafusion::common::{config_err, exec_datafusion_err, exec_err}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionState; use datafusion::prelude::SessionContext; @@ -39,17 +39,26 @@ pub async fn get_s3_object_store_builder( url: &Url, aws_options: &AwsOptions, ) -> Result { + let AwsOptions { + access_key_id, + secret_access_key, + session_token, + region, + endpoint, + allow_http, + } = aws_options; + let bucket_name = get_bucket_name(url)?; let mut builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name); if let (Some(access_key_id), Some(secret_access_key)) = - (&aws_options.access_key_id, &aws_options.secret_access_key) + (access_key_id, secret_access_key) { builder = builder .with_access_key_id(access_key_id) .with_secret_access_key(secret_access_key); - if let Some(session_token) = &aws_options.session_token { + if let Some(session_token) = session_token { builder = builder.with_token(session_token); } } else { @@ -72,10 +81,30 @@ pub async fn get_s3_object_store_builder( builder = builder.with_credentials(credentials); } - if let Some(region) = &aws_options.region { + if let Some(region) = region { builder = builder.with_region(region); } + if let Some(endpoint) = endpoint { + // Make a nicer error if the user hasn't allowed http and the endpoint + // is http as the default message is "URL scheme is not allowed" + if let Ok(endpoint_url) = Url::try_from(endpoint.as_str()) { + if !matches!(allow_http, Some(true)) && endpoint_url.scheme() == "http" { + return config_err!( + "Invalid endpoint: {endpoint}. \ + HTTP is not allowed for S3 endpoints. \ + To allow HTTP, set 'aws.allow_http' to true" + ); + } + } + + builder = builder.with_endpoint(endpoint); + } + + if let Some(allow_http) = allow_http { + builder = builder.with_allow_http(*allow_http); + } + Ok(builder) } @@ -188,6 +217,8 @@ pub struct AwsOptions { pub region: Option, /// OSS or COS Endpoint pub endpoint: Option, + /// Allow HTTP (otherwise will always use https) + pub allow_http: Option, } impl ExtensionOptions for AwsOptions { @@ -219,11 +250,14 @@ impl ExtensionOptions for AwsOptions { "region" => { self.region.set(rem, value)?; } - "oss" | "cos" => { + "oss" | "cos" | "endpoint" => { self.endpoint.set(rem, value)?; } + "allow_http" => { + self.allow_http.set(rem, value)?; + } _ => { - return internal_err!("Config value \"{}\" not found on AwsOptions", rem); + return config_err!("Config value \"{}\" not found on AwsOptions", rem); } } Ok(()) @@ -262,6 +296,7 @@ impl ExtensionOptions for AwsOptions { self.session_token.visit(&mut v, "session_token", ""); self.region.visit(&mut v, "region", ""); self.endpoint.visit(&mut v, "endpoint", ""); + self.allow_http.visit(&mut v, "allow_http", ""); v.0 } } @@ -307,7 +342,7 @@ impl ExtensionOptions for GcpOptions { self.application_credentials_path.set(rem, value)?; } _ => { - return internal_err!("Config value \"{}\" not found on GcpOptions", rem); + return config_err!("Config value \"{}\" not found on GcpOptions", rem); } } Ok(()) @@ -479,12 +514,21 @@ mod tests { let access_key_id = "fake_access_key_id"; let secret_access_key = "fake_secret_access_key"; let region = "fake_us-east-2"; + let endpoint = "endpoint33"; let session_token = "fake_session_token"; let location = "s3://bucket/path/file.parquet"; let table_url = ListingTableUrl::parse(location)?; let scheme = table_url.scheme(); - let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.region' '{region}', 'aws.session_token' {session_token}) LOCATION '{location}'"); + let sql = format!( + "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\ + ('aws.access_key_id' '{access_key_id}', \ + 'aws.secret_access_key' '{secret_access_key}', \ + 'aws.region' '{region}', \ + 'aws.session_token' {session_token}, \ + 'aws.endpoint' '{endpoint}'\ + ) LOCATION '{location}'" + ); let ctx = SessionContext::new(); let mut plan = ctx.state().create_logical_plan(&sql).await?; @@ -501,6 +545,7 @@ mod tests { (AmazonS3ConfigKey::AccessKeyId, access_key_id), (AmazonS3ConfigKey::SecretAccessKey, secret_access_key), (AmazonS3ConfigKey::Region, region), + (AmazonS3ConfigKey::Endpoint, endpoint), (AmazonS3ConfigKey::Token, session_token), ]; for (key, value) in config { @@ -513,6 +558,66 @@ mod tests { Ok(()) } + #[tokio::test] + async fn s3_object_store_builder_allow_http_error() -> Result<()> { + let access_key_id = "fake_access_key_id"; + let secret_access_key = "fake_secret_access_key"; + let endpoint = "http://endpoint33"; + let location = "s3://bucket/path/file.parquet"; + + let table_url = ListingTableUrl::parse(location)?; + let scheme = table_url.scheme(); + let sql = format!( + "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\ + ('aws.access_key_id' '{access_key_id}', \ + 'aws.secret_access_key' '{secret_access_key}', \ + 'aws.endpoint' '{endpoint}'\ + ) LOCATION '{location}'" + ); + + let ctx = SessionContext::new(); + let mut plan = ctx.state().create_logical_plan(&sql).await?; + + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { + register_options(&ctx, scheme); + let mut table_options = ctx.state().default_table_options().clone(); + table_options.alter_with_string_hash_map(&cmd.options)?; + let aws_options = table_options.extensions.get::().unwrap(); + let err = get_s3_object_store_builder(table_url.as_ref(), aws_options) + .await + .unwrap_err(); + + assert_eq!(err.to_string(), "Invalid or Unsupported Configuration: Invalid endpoint: http://endpoint33. HTTP is not allowed for S3 endpoints. To allow HTTP, set 'aws.allow_http' to true"); + } else { + return plan_err!("LogicalPlan is not a CreateExternalTable"); + } + + // Now add `allow_http` to the options and check if it works + let sql = format!( + "CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS\ + ('aws.access_key_id' '{access_key_id}', \ + 'aws.secret_access_key' '{secret_access_key}', \ + 'aws.endpoint' '{endpoint}',\ + 'aws.allow_http' 'true'\ + ) LOCATION '{location}'" + ); + + let mut plan = ctx.state().create_logical_plan(&sql).await?; + + if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { + register_options(&ctx, scheme); + let mut table_options = ctx.state().default_table_options().clone(); + table_options.alter_with_string_hash_map(&cmd.options)?; + let aws_options = table_options.extensions.get::().unwrap(); + // ensure this isn't an error + get_s3_object_store_builder(table_url.as_ref(), aws_options).await?; + } else { + return plan_err!("LogicalPlan is not a CreateExternalTable"); + } + + Ok(()) + } + #[tokio::test] async fn oss_object_store_builder() -> Result<()> { let access_key_id = "fake_access_key_id";