Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 105 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ macro_rules! config_namespace {
self.$field_name.visit(v, key.as_str(), desc);
)*
}

fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => {
#[allow(deprecated)]
{
if rem.is_empty() {
let default_value: $field_type = $default;
self.$field_name = default_value;
Ok(())
} else {
self.$field_name.reset(rem)
}
}
},
)*
_ => $crate::error::_config_err!(
"Config value \"{}\" not found on {}",
key,
stringify!($struct_name)
),
}
}
}
impl Default for $struct_name {
fn default() -> Self {
Expand Down Expand Up @@ -1146,6 +1171,45 @@ impl ConfigField for ConfigOptions {
self.sql_parser.visit(v, "datafusion.sql_parser", "");
self.format.visit(v, "datafusion.format", "");
}

/// Reset a configuration option back to its default value
fn reset(&mut self, key: &str) -> Result<()> {
let Some((prefix, rest)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key \"{key}\"");
};

if prefix != "datafusion" {
return _config_err!("Could not find config namespace \"{prefix}\"");
}

let (section, rem) = rest.split_once('.').unwrap_or((rest, ""));
if rem.is_empty() {
return _config_err!("could not find config field for key \"{key}\"");
}

match section {
"catalog" => self.catalog.reset(rem),
"execution" => self.execution.reset(rem),
"optimizer" => {
if rem == "enable_dynamic_filter_pushdown" {
let defaults = OptimizerOptions::default();
self.optimizer.enable_dynamic_filter_pushdown =
defaults.enable_dynamic_filter_pushdown;
self.optimizer.enable_topk_dynamic_filter_pushdown =
defaults.enable_topk_dynamic_filter_pushdown;
self.optimizer.enable_join_dynamic_filter_pushdown =
defaults.enable_join_dynamic_filter_pushdown;
Ok(())
} else {
self.optimizer.reset(rem)
}
}
"explain" => self.explain.reset(rem),
"sql_parser" => self.sql_parser.reset(rem),
"format" => self.format.reset(rem),
other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
}
}
}

impl ConfigOptions {
Expand Down Expand Up @@ -1454,6 +1518,10 @@ pub trait ConfigField {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);

fn set(&mut self, key: &str, value: &str) -> Result<()>;

fn reset(&mut self, key: &str) -> Result<()> {
_config_err!("Reset is not supported for this config field, key: {}", key)
}
}

impl<F: ConfigField + Default> ConfigField for Option<F> {
Expand All @@ -1467,6 +1535,15 @@ impl<F: ConfigField + Default> ConfigField for Option<F> {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
self.get_or_insert_with(Default::default).set(key, value)
}

fn reset(&mut self, key: &str) -> Result<()> {
if key.is_empty() {
*self = Default::default();
Ok(())
} else {
self.get_or_insert_with(Default::default).reset(key)
}
}
}

/// Default transformation to parse a [`ConfigField`] for a string.
Expand Down Expand Up @@ -1531,6 +1608,19 @@ macro_rules! config_field {
*self = $transform;
Ok(())
}

fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
if key.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems off. Why error if there is a key but don't if there isn't? Seems backwards?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For scalar config fields the caller already strips off the key for the field before invoking reset. At that point key only contains a nested suffix (if any).

  • key.is_empty() → we’re exactly at the scalar value, so resetting means “restore to default”, which we do.
  • non-empty key → someone tried to reset something below a scalar (e.g. foo.bar where foo is a scalar); that’s invalid, so we emit the helpful error.

*self = <$t as Default>::default();
Ok(())
} else {
$crate::error::_config_err!(
"Config field is a scalar {} and does not have nested field \"{}\"",
stringify!($t),
key
)
}
}
}
};
}
Expand Down Expand Up @@ -2539,7 +2629,7 @@ impl ConfigField for ConfigFileDecryptionProperties {
self.footer_signature_verification.set(rem, value.as_ref())
}
_ => _config_err!(
"Config value \"{}\" not found on ConfigFileEncryptionProperties",
"Config value \"{}\" not found on ConfigFileDecryptionProperties",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

key
),
}
Expand Down Expand Up @@ -2853,7 +2943,6 @@ mod tests {
};
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
Expand Down Expand Up @@ -2968,6 +3057,19 @@ mod tests {
assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
}

#[test]
fn reset_nested_scalar_reports_helpful_error() {
let mut value = true;
let err = <bool as ConfigField>::reset(&mut value, "nested").unwrap_err();
let message = err.to_string();
assert!(
message.starts_with(
"Invalid or Unsupported Configuration: Config field is a scalar bool and does not have nested field \"nested\""
),
"unexpected error message: {message}"
);
}

#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options() {
Expand All @@ -2990,6 +3092,7 @@ mod tests {
};
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;
use std::sync::Arc;

let footer_key = b"0123456789012345".to_vec(); // 128bit/16
let column_names = vec!["double_field", "float_field"];
Expand Down
79 changes: 72 additions & 7 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use crate::{
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, SetVariable,
TableType, UNNAMED_TABLE,
DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, ResetVariable,
SetVariable, TableType, UNNAMED_TABLE,
},
physical_expr::PhysicalExpr,
physical_plan::ExecutionPlan,
Expand All @@ -63,7 +63,7 @@ use datafusion_catalog::MemoryCatalogProvider;
use datafusion_catalog::{
DynamicFileCatalog, TableFunction, TableFunctionImpl, UrlTableFactory,
};
use datafusion_common::config::ConfigOptions;
use datafusion_common::config::{ConfigField, ConfigOptions};
use datafusion_common::metadata::ScalarAndMetadata;
use datafusion_common::{
config::{ConfigExtension, TableOptions},
Expand All @@ -72,7 +72,11 @@ use datafusion_common::{
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference,
};
use datafusion_execution::cache::cache_manager::DEFAULT_METADATA_CACHE_LIMIT;
pub use datafusion_execution::config::SessionConfig;
use datafusion_execution::disk_manager::{
DiskManagerBuilder, DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
};
use datafusion_execution::registry::SerializerRegistry;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -711,7 +715,12 @@ impl SessionContext {
}
// TODO what about the other statements (like TransactionStart and TransactionEnd)
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
self.set_variable(stmt).await
self.set_variable(stmt).await?;
self.return_empty_dataframe()
}
LogicalPlan::Statement(Statement::ResetVariable(stmt)) => {
self.reset_variable(stmt).await?;
self.return_empty_dataframe()
}
LogicalPlan::Statement(Statement::Prepare(Prepare {
name,
Expand Down Expand Up @@ -1069,7 +1078,7 @@ impl SessionContext {
exec_err!("Schema '{schemaref}' doesn't exist.")
}

async fn set_variable(&self, stmt: SetVariable) -> Result<DataFrame> {
async fn set_variable(&self, stmt: SetVariable) -> Result<()> {
let SetVariable {
variable, value, ..
} = stmt;
Expand Down Expand Up @@ -1099,11 +1108,37 @@ impl SessionContext {
for udf in udfs_to_update {
state.register_udf(udf)?;
}
}

drop(state);
Ok(())
}

async fn reset_variable(&self, stmt: ResetVariable) -> Result<()> {
let variable = stmt.variable;
if variable.starts_with("datafusion.runtime.") {
return self.reset_runtime_variable(&variable);
}

self.return_empty_dataframe()
let mut state = self.state.write();
state.config_mut().options_mut().reset(&variable)?;

// Refresh UDFs to ensure configuration-dependent behavior updates
let config_options = state.config().options();
let udfs_to_update: Vec<_> = state
.scalar_functions()
.values()
.filter_map(|udf| {
udf.inner()
.with_updated_config(config_options)
.map(Arc::new)
})
.collect();

for udf in udfs_to_update {
state.register_udf(udf)?;
}

Ok(())
}

fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> {
Expand Down Expand Up @@ -1136,6 +1171,36 @@ impl SessionContext {
Ok(())
}

fn reset_runtime_variable(&self, variable: &str) -> Result<()> {
let key = variable.strip_prefix("datafusion.runtime.").unwrap();

let mut state = self.state.write();

let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
match key {
"memory_limit" => {
builder.memory_pool = None;
}
"max_temp_directory_size" => {
builder =
builder.with_max_temp_directory_size(DEFAULT_MAX_TEMP_DIRECTORY_SIZE);
}
"temp_directory" => {
builder.disk_manager_builder = Some(DiskManagerBuilder::default());
}
"metadata_cache_limit" => {
builder = builder.with_metadata_cache_limit(DEFAULT_METADATA_CACHE_LIMIT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here in cache_manager.rs

}
_ => return plan_err!("Unknown runtime configuration: {variable}"),
};

*state = SessionStateBuilder::from(state.clone())
.with_runtime_env(Arc::new(builder.build()?))
.build();

Ok(())
}

/// Parse memory limit from string to number of bytes
/// Supports formats like '1.5G', '100M', '512K'
///
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/macro_hygiene/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ mod config_field {
impl std::error::Error for E {}

#[allow(dead_code)]
#[derive(Default)]
struct S;

impl std::str::FromStr for S {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl CacheManager {
}
}

const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M

#[derive(Clone)]
pub struct CacheManagerConfig {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tempfile::{Builder, NamedTempFile, TempDir};

use crate::memory_pool::human_readable_size;

const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB

/// Builder pattern for the [DiskManager] structure
#[derive(Clone, Debug)]
Expand Down
5 changes: 3 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ pub use plan::{
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
};
pub use statement::{
Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode,
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement,
TransactionAccessMode, TransactionConclusion, TransactionEnd,
TransactionIsolationLevel, TransactionStart,
};

pub use datafusion_common::format::ExplainFormat;
Expand Down
12 changes: 12 additions & 0 deletions datafusion/expr/src/logical_plan/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub enum Statement {
TransactionEnd(TransactionEnd),
/// Set a Variable
SetVariable(SetVariable),
/// Reset a Variable
ResetVariable(ResetVariable),
/// Prepare a statement and find any bind parameters
/// (e.g. `?`). This is used to implement SQL-prepared statements.
Prepare(Prepare),
Expand Down Expand Up @@ -66,6 +68,7 @@ impl Statement {
Statement::TransactionStart(_) => "TransactionStart",
Statement::TransactionEnd(_) => "TransactionEnd",
Statement::SetVariable(_) => "SetVariable",
Statement::ResetVariable(_) => "ResetVariable",
Statement::Prepare(_) => "Prepare",
Statement::Execute(_) => "Execute",
Statement::Deallocate(_) => "Deallocate",
Expand Down Expand Up @@ -109,6 +112,9 @@ impl Statement {
}) => {
write!(f, "SetVariable: set {variable:?} to {value:?}")
}
Statement::ResetVariable(ResetVariable { variable }) => {
write!(f, "ResetVariable: reset {variable:?}")
}
Statement::Prepare(Prepare { name, fields, .. }) => {
write!(
f,
Expand Down Expand Up @@ -194,6 +200,12 @@ pub struct SetVariable {
pub value: String,
}

/// Reset a configuration variable to its default
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct ResetVariable {
/// The variable name
pub variable: String,
}
/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
/// `Expr::Placeholder` expressions that are filled in during execution
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
Expand Down
Loading