diff --git a/datafusion/core/benches/push_down_filter.rs b/datafusion/core/benches/push_down_filter.rs index 139fb12c30947..58a259e70afad 100644 --- a/datafusion/core/benches/push_down_filter.rs +++ b/datafusion/core/benches/push_down_filter.rs @@ -19,8 +19,8 @@ use arrow::array::RecordBatch; use arrow::datatypes::{DataType, Field, Schema}; use bytes::{BufMut, BytesMut}; use criterion::{criterion_group, criterion_main, Criterion}; -use datafusion::config::ConfigOptions; use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_execution::config::SessionConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_optimizer::PhysicalOptimizerRule; @@ -88,7 +88,7 @@ async fn create_plan() -> Arc { #[derive(Clone)] struct BenchmarkPlan { plan: Arc, - config: ConfigOptions, + config: SessionConfig, } impl std::fmt::Display for BenchmarkPlan { @@ -102,8 +102,8 @@ fn bench_push_down_filter(c: &mut Criterion) { let plan = tokio::runtime::Runtime::new() .unwrap() .block_on(create_plan()); - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; + let mut config = SessionConfig::default(); + config.options_mut().execution.parquet.pushdown_filters = true; let plan = BenchmarkPlan { plan, config }; let optimizer = FilterPushdown::new(); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0fa17deea1295..5765d0e3870fd 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2118,7 +2118,7 @@ impl DefaultPhysicalPlanner { for optimizer in optimizers { let before_schema = new_plan.schema(); new_plan = optimizer - .optimize(new_plan, session_state.config_options()) + .optimize(new_plan, session_state.config()) .map_err(|e| { DataFusionError::Context(optimizer.name().to_string(), Box::new(e)) })?; @@ -2441,7 +2441,6 @@ mod tests { use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; use arrow::datatypes::{DataType, Field, Int32Type}; use arrow_schema::SchemaRef; - use datafusion_common::config::ConfigOptions; use datafusion_common::{ assert_contains, DFSchemaRef, TableReference, ToDFSchema as _, }; @@ -3545,7 +3544,7 @@ digraph { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result> { Ok(plan) } diff --git a/datafusion/core/tests/execution/coop.rs b/datafusion/core/tests/execution/coop.rs index b6f406e967509..ebdcc9df19f76 100644 --- a/datafusion/core/tests/execution/coop.rs +++ b/datafusion/core/tests/execution/coop.rs @@ -810,8 +810,7 @@ async fn query_yields( task_ctx: Arc, ) -> Result<(), Box> { // Run plan through EnsureCooperative - let optimized = - EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?; + let optimized = EnsureCooperative::new().optimize(plan, task_ctx.session_config())?; // Get the stream let stream = physical_plan::execute_stream(optimized, task_ctx)?; diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 64ee92eda2545..96fc24c7b9bf2 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -38,7 +38,6 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::{col, lit, Expr}; use datafusion::datasource::physical_plan::FileScanConfig; -use datafusion_common::config::ConfigOptions; use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::filter::FilterExec; @@ -56,8 +55,13 @@ async fn check_stats_precision_with_filter_pushdown() { let table = get_listing_table(&table_path, None, &opt).await; let (_, _, state) = get_cache_runtime_state(); - let mut options: ConfigOptions = state.config().options().as_ref().clone(); - options.execution.parquet.pushdown_filters = true; + let mut session_config = + SessionConfig::from(state.config().options().as_ref().clone()); + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; // Scan without filter, stats are exact let exec = table.scan(&state, None, &[], None).await.unwrap(); @@ -85,7 +89,7 @@ async fn check_stats_precision_with_filter_pushdown() { as Arc; let optimized_exec = FilterPushdown::new() - .optimize(filtered_exec, &options) + .optimize(filtered_exec, &session_config) .unwrap(); assert!( diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs index a79d743cb253d..76483eaa56a71 100644 --- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs @@ -25,8 +25,8 @@ use arrow::record_batch::RecordBatch; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; use datafusion_common::cast::as_int64_array; -use datafusion_common::config::ConfigOptions; use datafusion_common::Result; +use datafusion_execution::config::SessionConfig; use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{self, cast}; @@ -67,7 +67,7 @@ async fn assert_count_optim_success( let task_ctx = Arc::new(TaskContext::default()); let plan: Arc = Arc::new(plan); - let config = ConfigOptions::new(); + let config = SessionConfig::new(); let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?; // A ProjectionExec is a sign that the count optimization was applied @@ -264,7 +264,7 @@ async fn test_count_inexact_stat() -> Result<()> { Arc::clone(&schema), )?; - let conf = ConfigOptions::new(); + let conf = SessionConfig::new(); let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; // check that the original ExecutionPlan was not replaced @@ -308,7 +308,7 @@ async fn test_count_with_nulls_inexact_stat() -> Result<()> { Arc::clone(&schema), )?; - let conf = ConfigOptions::new(); + let conf = SessionConfig::new(); let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; // check that the original ExecutionPlan was not replaced diff --git a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs index 9c76f6ab6f58b..72c3d692897ae 100644 --- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::parquet_exec; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::config::ConfigOptions; +use datafusion_execution::config::SessionConfig; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; @@ -47,7 +47,7 @@ macro_rules! assert_optimized { ($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => { // run optimizer let optimizer = CombinePartialFinalAggregate {}; - let config = ConfigOptions::new(); + let config = SessionConfig::new(); let optimized = optimizer.optimize($PLAN, &config)?; // Now format correctly let plan = displayable(optimized.as_ref()).indent(true).to_string(); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 63111f43806b3..84d0c4e86f5d1 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -404,41 +404,56 @@ const SORT_DISTRIB_DISTRIB: [Run; 3] = #[derive(Clone)] struct TestConfig { - config: ConfigOptions, + session_config: SessionConfig, } impl Default for TestConfig { fn default() -> Self { - Self { - config: test_suite_default_config_options(), - } + let config = test_suite_default_config_options(); + let session_config = SessionConfig::from(config.clone()); + Self { session_config } } } impl TestConfig { /// If preferred, will not repartition / resort data if it is already sorted. fn with_prefer_existing_sort(mut self) -> Self { - self.config.optimizer.prefer_existing_sort = true; + self.session_config + .options_mut() + .optimizer + .prefer_existing_sort = true; self } /// If preferred, will not attempt to convert Union to Interleave. fn with_prefer_existing_union(mut self) -> Self { - self.config.optimizer.prefer_existing_union = true; + self.session_config + .options_mut() + .optimizer + .prefer_existing_union = true; self } /// If preferred, will repartition file scans. /// Accepts a minimum file size to repartition. fn with_prefer_repartition_file_scans(mut self, file_min_size: usize) -> Self { - self.config.optimizer.repartition_file_scans = true; - self.config.optimizer.repartition_file_min_size = file_min_size; + self.session_config + .options_mut() + .optimizer + .repartition_file_scans = true; + self.session_config + .options_mut() + .optimizer + .repartition_file_min_size = file_min_size; self } /// Set the preferred target partitions for query execution concurrency. fn with_query_execution_partitions(mut self, target_partitions: usize) -> Self { - self.config.execution.target_partitions = target_partitions; + self.session_config + .options_mut() + .execution + .target_partitions = target_partitions; self } @@ -455,13 +470,18 @@ impl TestConfig { // Add the ancillary output requirements operator at the start: let optimizer = OutputRequirements::new_add_mode(); - let mut optimized = optimizer.optimize(plan.clone(), &self.config)?; + let mut optimized = optimizer.optimize(plan.clone(), &self.session_config)?; // This file has 2 rules that use tree node, apply these rules to original plan consecutively // After these operations tree nodes should be in a consistent state. // This code block makes sure that these rules doesn't violate tree node integrity. { - let adjusted = if self.config.optimizer.top_down_join_key_reordering { + let adjusted = if self + .session_config + .options() + .optimizer + .top_down_join_key_reordering + { // Run adjust_input_keys_ordering rule let plan_requirements = PlanWithKeyRequirements::new_default(plan.clone()); @@ -483,7 +503,10 @@ impl TestConfig { // Then run ensure_distribution rule DistributionContext::new_default(adjusted) .transform_up(|distribution_context| { - ensure_distribution(distribution_context, &self.config) + ensure_distribution( + distribution_context, + self.session_config.options(), + ) }) .data() .and_then(check_integrity)?; @@ -494,18 +517,18 @@ impl TestConfig { optimized = match run { Run::Distribution => { let optimizer = EnforceDistribution::new(); - optimizer.optimize(optimized, &self.config)? + optimizer.optimize(optimized, &self.session_config)? } Run::Sorting => { let optimizer = EnforceSorting::new(); - optimizer.optimize(optimized, &self.config)? + optimizer.optimize(optimized, &self.session_config)? } }; } // Remove the ancillary output requirements operator when done: let optimizer = OutputRequirements::new_remove_mode(); - let optimized = optimizer.optimize(optimized, &self.config)?; + let optimized = optimizer.optimize(optimized, &self.session_config)?; // Now format correctly let actual_lines = get_plan_string(&optimized); @@ -3340,10 +3363,13 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - let mut config = ConfigOptions::new(); - config.execution.target_partitions = 10; - config.optimizer.enable_round_robin_repartition = true; - config.optimizer.prefer_existing_sort = false; + let mut config = SessionConfig::new(); + config.options_mut().execution.target_partitions = 10; + config + .options_mut() + .optimizer + .enable_round_robin_repartition = true; + config.options_mut().optimizer.prefer_existing_sort = false; let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?; assert_plan_txt!(expected, dist_plan); @@ -3378,10 +3404,13 @@ fn put_sort_when_input_is_valid() -> Result<()> { " DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - let mut config = ConfigOptions::new(); - config.execution.target_partitions = 10; - config.optimizer.enable_round_robin_repartition = true; - config.optimizer.prefer_existing_sort = false; + let mut config = SessionConfig::new(); + config.options_mut().execution.target_partitions = 10; + config + .options_mut() + .optimizer + .enable_round_robin_repartition = true; + config.options_mut().optimizer.prefer_existing_sort = false; let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?; assert_plan_txt!(expected, dist_plan); @@ -3503,7 +3532,11 @@ async fn test_distribute_sort_parquet() -> Result<()> { let test_config: TestConfig = TestConfig::default().with_prefer_repartition_file_scans(1000); assert!( - test_config.config.optimizer.repartition_file_scans, + test_config + .session_config + .options() + .optimizer + .repartition_file_scans, "should enable scans to be repartitioned" ); @@ -3542,7 +3575,11 @@ async fn test_distribute_sort_memtable() -> Result<()> { let test_config: TestConfig = TestConfig::default().with_prefer_repartition_file_scans(1000); assert!( - test_config.config.optimizer.repartition_file_scans, + test_config + .session_config + .options() + .optimizer + .repartition_file_scans, "should enable scans to be repartitioned" ); diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index ad77a453350f8..52dc2da19e70b 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -31,9 +31,9 @@ use crate::physical_optimizer::test_utils::{ use arrow::compute::SortOptions; use arrow::datatypes::{DataType, SchemaRef}; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{TreeNode, TransformedResult}; use datafusion_common::{Result, TableReference}; +use datafusion_execution::config::SessionConfig; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; use datafusion_expr_common::operator::Operator; @@ -110,8 +110,9 @@ impl EnforceSortingTest { /// Runs the enforce sorting test and returns a string with the input and /// optimized plan as strings for snapshot comparison using insta pub(crate) fn run(&self) -> String { - let mut config = ConfigOptions::new(); - config.optimizer.repartition_sorts = self.repartition_sorts; + let mut session_config = SessionConfig::new(); + session_config.options_mut().optimizer.repartition_sorts = self.repartition_sorts; + let config = session_config.options(); // This file has 4 rules that use tree node, apply these rules as in the // EnforceSorting::optimize implementation @@ -149,7 +150,7 @@ impl EnforceSortingTest { plan_with_pipeline_fixer, false, true, - &config, + config, ) }) .data() @@ -171,7 +172,7 @@ impl EnforceSortingTest { // Run the actual optimizer let optimized_physical_plan = EnforceSorting::new() - .optimize(Arc::clone(&self.plan), &config) + .optimize(Arc::clone(&self.plan), &session_config) .expect("enforce_sorting failed"); // Get string representation of the plan @@ -2277,7 +2278,7 @@ async fn test_commutativity() -> Result<()> { DataSourceExec: partitions=1, partition_sizes=[0] "#); - let config = ConfigOptions::new(); + let config = SessionConfig::new(); let rules = vec![ Arc::new(EnforceDistribution::new()) as Arc, Arc::new(EnforceSorting::new()) as Arc, diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index b91c1732260cf..ff8cef7331dd3 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -239,9 +239,12 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { .with_fetch(Some(2)), ) as Arc; - let mut config = ConfigOptions::default(); - config.optimizer.enable_dynamic_filter_pushdown = true; - config.execution.parquet.pushdown_filters = true; + let mut config = SessionConfig::default(); + config + .options_mut() + .optimizer + .enable_dynamic_filter_pushdown = true; + config.options_mut().execution.parquet.pushdown_filters = true; // Apply the FilterPushdown optimizer rule let plan = FilterPushdown::new_post_optimization() @@ -720,10 +723,14 @@ async fn test_topk_dynamic_filter_pushdown() { ); // Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; + let mut session_config = SessionConfig::default(); + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) + .optimize(plan, &session_config) .unwrap(); let config = SessionConfig::new().with_batch_size(2); let session_ctx = SessionContext::new_with_config(config); @@ -803,10 +810,14 @@ async fn test_topk_dynamic_filter_pushdown_multi_column_sort() { ); // Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; + let mut session_config = SessionConfig::default(); + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) + .optimize(plan, &session_config) .unwrap(); let config = SessionConfig::new().with_batch_size(2); let session_ctx = SessionContext::new_with_config(config); @@ -1049,11 +1060,18 @@ async fn test_hashjoin_dynamic_filter_pushdown() { ); // Actually apply the optimization to the plan and execute to see the filter in action - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; + let mut session_config = SessionConfig::default(); + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; + session_config + .options_mut() + .optimizer + .enable_dynamic_filter_pushdown = true; let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) + .optimize(plan, &session_config) .unwrap(); // Test for https://github.com/apache/datafusion/pull/17371: dynamic filter linking survives `with_new_children` @@ -1277,11 +1295,18 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { ); // Actually apply the optimization to the plan and execute to see the filter in action - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; + let mut session_config = SessionConfig::default(); + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; + session_config + .options_mut() + .optimizer + .enable_dynamic_filter_pushdown = true; let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) + .optimize(plan, &session_config) .unwrap(); let config = SessionConfig::new().with_batch_size(10); let session_ctx = SessionContext::new_with_config(config); @@ -1474,11 +1499,18 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { ); // Actually apply the optimization to the plan and execute to see the filter in action - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; + let mut session_config = SessionConfig::default(); + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; + session_config + .options_mut() + .optimizer + .enable_dynamic_filter_pushdown = true; let plan = FilterPushdown::new_post_optimization() - .optimize(plan, &config) + .optimize(plan, &session_config) .unwrap(); let config = SessionConfig::new().with_batch_size(10); let session_ctx = SessionContext::new_with_config(config); @@ -1646,11 +1678,18 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { ); // Execute the plan to verify the dynamic filters are properly updated - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = true; - config.optimizer.enable_dynamic_filter_pushdown = true; + let mut session_config = SessionConfig::default(); + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; + session_config + .options_mut() + .optimizer + .enable_dynamic_filter_pushdown = true; let plan = FilterPushdown::new_post_optimization() - .optimize(outer_join, &config) + .optimize(outer_join, &session_config) .unwrap(); let config = SessionConfig::new().with_batch_size(10); let session_ctx = SessionContext::new_with_config(config); diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index f05f3f00281d6..37ce0d9508135 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -18,13 +18,14 @@ use arrow::datatypes::SchemaRef; use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; -use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics}; +use datafusion_common::{internal_err, Result, Statistics}; use datafusion_datasource::{ file::FileSource, file_scan_config::FileScanConfig, file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile, }; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::filter::batch_filter; @@ -49,6 +50,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; + pub struct TestOpener { batches: Vec, batch_size: Option, @@ -219,9 +221,9 @@ impl FileSource for TestSource { fn try_pushdown_filters( &self, mut filters: Vec>, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result>> { - if self.support && config.execution.parquet.pushdown_filters { + if self.support && config.options().execution.parquet.pushdown_filters { if let Some(internal) = self.predicate.as_ref() { filters.push(Arc::clone(internal)); } @@ -380,9 +382,12 @@ impl OptimizationTest { where O: PhysicalOptimizerRule, { - let mut parquet_pushdown_config = ConfigOptions::default(); - parquet_pushdown_config.execution.parquet.pushdown_filters = - allow_pushdown_filters; + let mut parquet_pushdown_config = SessionConfig::default(); + parquet_pushdown_config + .options_mut() + .execution + .parquet + .pushdown_filters = allow_pushdown_filters; let input = format_execution_plan(&input_plan); let input_schema = input_plan.schema(); @@ -519,7 +524,7 @@ impl ExecutionPlan for TestNode { &self, _phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { // Since TestNode marks all parent filters as supported and adds its own filter, // we use from_child to create a description with all parent filters supported @@ -533,7 +538,7 @@ impl ExecutionPlan for TestNode { &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { if self.inject_filter { // Add a FilterExec if our own filter was not handled by the child diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index f9d3a045469e1..b4f2d1c22b80b 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -29,6 +29,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{stats::Precision, ColumnStatistics, JoinType, ScalarValue}; use datafusion_common::{JoinSide, NullEquality}; use datafusion_common::{Result, Statistics}; +use datafusion_execution::config::SessionConfig; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::col; @@ -227,7 +228,7 @@ async fn test_join_with_swap() { ); let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) + .optimize(join, &SessionConfig::new()) .unwrap(); let swapping_projection = optimized_join @@ -289,7 +290,7 @@ async fn test_left_join_no_swap() { ); let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) + .optimize(join, &SessionConfig::new()) .unwrap(); let swapped_join = optimized_join @@ -339,7 +340,7 @@ async fn test_join_with_swap_semi() { let original_schema = join.schema(); let optimized_join = JoinSelection::new() - .optimize(Arc::new(join), &ConfigOptions::new()) + .optimize(Arc::new(join), &SessionConfig::new()) .unwrap(); let swapped_join = optimized_join @@ -394,7 +395,7 @@ async fn test_join_with_swap_mark() { let original_schema = join.schema(); let optimized_join = JoinSelection::new() - .optimize(Arc::new(join), &ConfigOptions::new()) + .optimize(Arc::new(join), &SessionConfig::new()) .unwrap(); let swapped_join = optimized_join @@ -431,7 +432,7 @@ macro_rules! assert_optimized { let plan = Arc::new($PLAN); let optimized = JoinSelection::new() - .optimize(plan.clone(), &ConfigOptions::new()) + .optimize(plan.clone(), &SessionConfig::new()) .unwrap(); let plan_string = displayable(optimized.as_ref()).indent(true).to_string(); @@ -523,7 +524,7 @@ async fn test_join_no_swap() { ); let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) + .optimize(join, &SessionConfig::new()) .unwrap(); let swapped_join = optimized_join @@ -572,7 +573,7 @@ async fn test_nl_join_with_swap(join_type: JoinType) { ); let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) + .optimize(join, &SessionConfig::new()) .unwrap(); let swapping_projection = optimized_join @@ -652,7 +653,7 @@ async fn test_nl_join_with_swap_no_proj(join_type: JoinType) { let optimized_join = JoinSelection::new() .optimize( Arc::::clone(&join), - &ConfigOptions::new(), + &SessionConfig::new(), ) .unwrap(); @@ -911,7 +912,7 @@ fn check_join_partition_mode( ); let optimized_join = JoinSelection::new() - .optimize(join, &ConfigOptions::new()) + .optimize(join, &SessionConfig::new()) .unwrap(); if !is_swapped { @@ -1556,7 +1557,7 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { )?) as _; let optimized_join_plan = - JoinSelection::new().optimize(Arc::clone(&join), &ConfigOptions::new())?; + JoinSelection::new().optimize(Arc::clone(&join), &SessionConfig::new())?; // If swap did happen let projection_added = optimized_join_plan.as_any().is::(); diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs index 56d48901f284d..23351e8aeeb2c 100644 --- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -24,8 +24,8 @@ use crate::physical_optimizer::test_utils::{ use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; +use datafusion_execution::config::SessionConfig; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{col, lit, BinaryExpr}; use datafusion_physical_expr::Partitioning; @@ -102,7 +102,7 @@ fn transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero() -> assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(global_limit, &SessionConfig::new())?; let expected = [ "StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true, fetch=5" @@ -127,7 +127,7 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(global_limit, &SessionConfig::new())?; let expected = [ "GlobalLimitExec: skip=2, fetch=5", @@ -163,7 +163,7 @@ fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(global_limit, &SessionConfig::new())?; let expected = [ "CoalescePartitionsExec: fetch=5", @@ -195,7 +195,7 @@ fn pushes_global_limit_exec_through_projection_exec() -> Result<()> { assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(global_limit, &SessionConfig::new())?; let expected = [ "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", @@ -228,7 +228,7 @@ fn pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batc assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(global_limit, &SessionConfig::new())?; let expected = [ "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", @@ -270,7 +270,7 @@ fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> { assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(global_limit, &SessionConfig::new())?; let expected = [ "SortPreservingMergeExec: [c1@0 ASC], fetch=5", @@ -306,7 +306,7 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(global_limit, &SessionConfig::new())?; let expected = [ "CoalescePartitionsExec: fetch=5", @@ -336,7 +336,7 @@ fn merges_local_limit_with_local_limit() -> Result<()> { assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(parent_local_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(parent_local_limit, &SessionConfig::new())?; let expected = ["GlobalLimitExec: skip=0, fetch=10", " EmptyExec"]; assert_eq!(get_plan_string(&after_optimize), expected); @@ -361,7 +361,7 @@ fn merges_global_limit_with_global_limit() -> Result<()> { assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(parent_global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(parent_global_limit, &SessionConfig::new())?; let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; assert_eq!(get_plan_string(&after_optimize), expected); @@ -386,7 +386,7 @@ fn merges_global_limit_with_local_limit() -> Result<()> { assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(global_limit, &SessionConfig::new())?; let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; assert_eq!(get_plan_string(&after_optimize), expected); @@ -411,7 +411,7 @@ fn merges_local_limit_with_global_limit() -> Result<()> { assert_eq!(initial, expected_initial); let after_optimize = - LimitPushdown::new().optimize(local_limit, &ConfigOptions::new())?; + LimitPushdown::new().optimize(local_limit, &SessionConfig::new())?; let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"]; assert_eq!(get_plan_string(&after_optimize), expected); diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index c51a5e02c9c33..16c9765ce6b64 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -59,6 +59,7 @@ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{displayable, ExecutionPlan}; +use datafusion_execution::config::SessionConfig; use insta::assert_snapshot; use itertools::Itertools; @@ -449,7 +450,7 @@ fn test_csv_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -487,7 +488,7 @@ fn test_memory_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -583,7 +584,7 @@ fn test_streaming_table_after_projection() -> Result<()> { )?) as _; let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let result = after_optimize .as_any() @@ -683,7 +684,7 @@ fn test_projection_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(top_projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(top_projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -751,7 +752,7 @@ fn test_output_req_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -842,7 +843,7 @@ fn test_coalesce_partitions_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -900,7 +901,7 @@ fn test_filter_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -1003,7 +1004,7 @@ fn test_join_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -1133,7 +1134,7 @@ fn test_join_after_required_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -1211,7 +1212,7 @@ fn test_nested_loop_join_after_projection() -> Result<()> { ); let after_optimize_string = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize_string.as_ref()) .indent(true) .to_string(); @@ -1308,7 +1309,7 @@ fn test_hash_join_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) .to_string(); @@ -1336,7 +1337,7 @@ fn test_hash_join_after_projection() -> Result<()> { )?); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) .to_string(); @@ -1389,7 +1390,7 @@ fn test_repartition_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -1460,7 +1461,7 @@ fn test_sort_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -1514,7 +1515,7 @@ fn test_sort_preserving_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -1559,7 +1560,7 @@ fn test_union_after_projection() -> Result<()> { ); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -1626,7 +1627,7 @@ fn test_partition_col_projection_pushdown() -> Result<()> { )?); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) @@ -1672,7 +1673,7 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> { )?); let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + ProjectionPushdown::new().optimize(projection, &SessionConfig::new())?; let after_optimize_string = displayable(after_optimize.as_ref()) .indent(true) diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index ce6eb13c86c44..df21ed4cc7bbb 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -28,7 +28,6 @@ use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use datafusion::prelude::{CsvReadOptions, SessionContext}; -use datafusion_common::config::ConfigOptions; use datafusion_common::{JoinType, Result, ScalarValue}; use datafusion_physical_expr::expressions::{col, Literal}; use datafusion_physical_expr::Partitioning; @@ -39,6 +38,7 @@ use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::{displayable, ExecutionPlan}; use async_trait::async_trait; +use datafusion_execution::config::SessionConfig; async fn register_current_csv( ctx: &SessionContext, @@ -393,7 +393,7 @@ fn create_test_schema2() -> SchemaRef { /// Check if sanity checker should accept or reject plans. fn assert_sanity_check(plan: &Arc, is_sane: bool) { let sanity_checker = SanityCheckPlan::new(); - let opts = ConfigOptions::default(); + let opts = SessionConfig::default(); assert_eq!( sanity_checker.optimize(plan.clone(), &opts).is_ok(), is_sane diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 8ca33f3d4abb9..429a6fa92b98f 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -29,12 +29,12 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::source::DataSourceExec; -use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; use datafusion_common::{ColumnStatistics, JoinType, NullEquality, Result, Statistics}; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_execution::config::SessionConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; @@ -641,7 +641,7 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec) -> Physica } pub fn get_optimized_plan(plan: &Arc) -> Result { - let config = ConfigOptions::new(); + let config = SessionConfig::new(); let optimized = LimitedDistinctAggregation::new().optimize(Arc::clone(plan), &config)?; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index dd10363079f91..e5b430ac3622b 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -26,7 +26,6 @@ use crate::opener::ParquetOpener; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; -use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; @@ -54,6 +53,7 @@ use datafusion_physical_plan::DisplayFormatType; #[cfg(feature = "parquet_encryption")] use datafusion_common::encryption::map_config_decryption_to_decryption; +use datafusion_execution::config::SessionConfig; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; use itertools::Itertools; @@ -698,7 +698,7 @@ impl FileSource for ParquetSource { fn try_pushdown_filters( &self, filters: Vec>, - config: &ConfigOptions, + config: &SessionConfig, ) -> datafusion_common::Result>> { let Some(file_schema) = self.file_schema.clone() else { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( @@ -712,7 +712,7 @@ impl FileSource for ParquetSource { // By default they are both disabled. // Regardless of pushdown, we will update the predicate to include the filters // because even if scan pushdown is disabled we can still use the filters for stats pruning. - let config_pushdown_enabled = config.execution.parquet.pushdown_filters; + let config_pushdown_enabled = config.options().execution.parquet.pushdown_filters; let table_pushdown_enabled = self.pushdown_filters(); let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled; diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 7a2cf403fd8d6..7ae2a7f7b199c 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -27,13 +27,13 @@ use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; use crate::schema_adapter::SchemaAdapterFactory; use arrow::datatypes::SchemaRef; -use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; +use datafusion_execution::config::SessionConfig; use object_store::ObjectStore; /// Helper function to convert any type implementing FileSource to Arc<dyn FileSource> @@ -122,7 +122,7 @@ pub trait FileSource: Send + Sync { fn try_pushdown_filters( &self, filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { Ok(FilterPushdownPropagation::with_parent_pushdown_result( vec![PushedDown::No; filters.len()], diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index e67e1f8273723..759b897781297 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -35,7 +35,6 @@ use arrow::{ buffer::Buffer, datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}, }; -use datafusion_common::config::ConfigOptions; use datafusion_common::{ exec_datafusion_err, exec_err, internal_datafusion_err, ColumnStatistics, Constraints, Result, ScalarValue, Statistics, @@ -63,6 +62,7 @@ use std::{ fmt::Result as FmtResult, marker::PhantomData, sync::Arc, }; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::equivalence::project_orderings; use datafusion_physical_plan::coop::cooperative; use datafusion_physical_plan::execution_plan::SchedulingType; @@ -666,7 +666,7 @@ impl DataSource for FileScanConfig { fn try_pushdown_filters( &self, filters: Vec>, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result>> { let result = self.file_source.try_pushdown_filters(filters, config)?; match result.updated_node { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 20d9a1d6e53f0..c8027a35508f7 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -37,6 +37,7 @@ use itertools::Itertools; use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Result, Statistics}; +use datafusion_execution::config::SessionConfig; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -169,7 +170,7 @@ pub trait DataSource: Send + Sync + Debug { fn try_pushdown_filters( &self, filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { Ok(FilterPushdownPropagation::with_parent_pushdown_result( vec![PushedDown::No; filters.len()], @@ -332,7 +333,7 @@ impl ExecutionPlan for DataSourceExec { &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result>> { // Push any remaining filters into our data source let parent_filters = child_pushdown_result diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 672317060d902..de2b5c88fede1 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -16,10 +16,10 @@ // under the License. //! Utilizing exact statistics from sources to avoid scanning data -use datafusion_common::config::ConfigOptions; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; +use datafusion_execution::config::SessionConfig; use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; @@ -46,7 +46,7 @@ impl PhysicalOptimizerRule for AggregateStatistics { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { if let Some(partial_agg_exec) = take_optimizable(&*plan) { let partial_agg_exec = partial_agg_exec diff --git a/datafusion/physical-optimizer/src/coalesce_async_exec_input.rs b/datafusion/physical-optimizer/src/coalesce_async_exec_input.rs index 0b46c68f2daed..deeecf97c9c95 100644 --- a/datafusion/physical-optimizer/src/coalesce_async_exec_input.rs +++ b/datafusion/physical-optimizer/src/coalesce_async_exec_input.rs @@ -16,9 +16,9 @@ // under the License. use crate::PhysicalOptimizerRule; -use datafusion_common::config::ConfigOptions; use datafusion_common::internal_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_execution::config::SessionConfig; use datafusion_physical_plan::async_func::AsyncFuncExec; use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::ExecutionPlan; @@ -39,9 +39,9 @@ impl PhysicalOptimizerRule for CoalesceAsyncExecInput { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> datafusion_common::Result> { - let target_batch_size = config.execution.batch_size; + let target_batch_size = config.options().execution.batch_size; plan.transform(|plan| { if let Some(async_exec) = plan.as_any().downcast_ref::() { if async_exec.children().len() != 1 { diff --git a/datafusion/physical-optimizer/src/coalesce_batches.rs b/datafusion/physical-optimizer/src/coalesce_batches.rs index 5cf2c877c61a4..8aa9cee3ac30e 100644 --- a/datafusion/physical-optimizer/src/coalesce_batches.rs +++ b/datafusion/physical-optimizer/src/coalesce_batches.rs @@ -22,8 +22,8 @@ use crate::PhysicalOptimizerRule; use std::sync::Arc; -use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::Partitioning; use datafusion_physical_plan::{ coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, @@ -47,13 +47,14 @@ impl PhysicalOptimizerRule for CoalesceBatches { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { - if !config.execution.coalesce_batches { + let config_options = config.options(); + if !config_options.execution.coalesce_batches { return Ok(plan); } - let target_batch_size = config.execution.batch_size; + let target_batch_size = config_options.execution.batch_size; plan.transform_up(|plan| { let plan_any = plan.as_any(); // The goal here is to detect operators that could produce small batches and only diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs index 86f7e73e9e359..28614b1f09e77 100644 --- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs +++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs @@ -27,8 +27,8 @@ use datafusion_physical_plan::aggregates::{ use datafusion_physical_plan::ExecutionPlan; use crate::PhysicalOptimizerRule; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{physical_exprs_equal, PhysicalExpr}; @@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result> { plan.transform_down(|plan| { // Check if the plan is AggregateExec diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 898386e2f9880..4a3bf623da8cc 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -36,6 +36,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_execution::config::SessionConfig; use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::map_columns_before_projection; @@ -193,9 +194,11 @@ impl PhysicalOptimizerRule for EnforceDistribution { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { - let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering; + let config_options = config.options(); + let top_down_join_key_reordering = + config_options.optimizer.top_down_join_key_reordering; let adjusted = if top_down_join_key_reordering { // Run a top-down process to adjust input key ordering recursively @@ -216,7 +219,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { // Distribution enforcement needs to be applied bottom-up. let distribution_context = distribution_context .transform_up(|distribution_context| { - ensure_distribution(distribution_context, config) + ensure_distribution(distribution_context, config_options) }) .data()?; Ok(distribution_context.plan) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 8a71b28486a2a..43a17cb2f343f 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -53,10 +53,10 @@ use crate::utils::{ }; use crate::PhysicalOptimizerRule; -use datafusion_common::config::ConfigOptions; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::{Distribution, Partitioning}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -210,13 +210,14 @@ impl PhysicalOptimizerRule for EnforceSorting { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { + let config_options = config.options(); let plan_requirements = PlanWithCorrespondingSort::new_default(plan); // Execute a bottom-up traversal to enforce sorting requirements, // remove unnecessary sorts, and optimize sort-sensitive operators: let adjusted = plan_requirements.transform_up(ensure_sorting)?.data; - let new_plan = if config.optimizer.repartition_sorts { + let new_plan = if config_options.optimizer.repartition_sorts { let plan_with_coalesce_partitions = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); let parallel = plan_with_coalesce_partitions @@ -234,7 +235,7 @@ impl PhysicalOptimizerRule for EnforceSorting { plan_with_pipeline_fixer, false, true, - config, + config_options, ) }) .data()?; diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index 0c0b63c0b3e79..bfc2f24e4450e 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -25,9 +25,9 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::Result; +use datafusion_execution::config::SessionConfig; use datafusion_physical_plan::coop::CooperativeExec; use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType}; use datafusion_physical_plan::ExecutionPlan; @@ -65,7 +65,7 @@ impl PhysicalOptimizerRule for EnsureCooperative { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result> { plan.transform_up(|plan| { let is_leaf = plan.children().is_empty(); @@ -96,14 +96,14 @@ impl PhysicalOptimizerRule for EnsureCooperative { #[cfg(test)] mod tests { use super::*; - use datafusion_common::config::ConfigOptions; + use datafusion_execution::config::SessionConfig; use datafusion_physical_plan::{displayable, test::scan_partitioned}; use insta::assert_snapshot; #[tokio::test] async fn test_cooperative_exec_for_custom_exec() { let test_custom_exec = scan_partitioned(1); - let config = ConfigOptions::new(); + let config = SessionConfig::new(); let optimized = EnsureCooperative::new() .optimize(test_custom_exec, &config) .unwrap(); diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 5ee7023ff6ee2..553d39c251ad2 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -36,7 +36,8 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{config::ConfigOptions, internal_err, Result}; +use datafusion_common::{internal_err, Result}; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::physical_expr::is_volatile; use datafusion_physical_plan::filter_pushdown::{ @@ -419,7 +420,7 @@ impl PhysicalOptimizerRule for FilterPushdown { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { Ok( push_down_filters(Arc::clone(&plan), vec![], config, self.phase)? @@ -440,7 +441,7 @@ impl PhysicalOptimizerRule for FilterPushdown { fn push_down_filters( node: Arc, parent_predicates: Vec>, - config: &ConfigOptions, + config: &SessionConfig, phase: FilterPushdownPhase, ) -> Result>> { let mut parent_filter_pushdown_supports: Vec> = diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 1db4d7b30565e..8fff0b2c0f717 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -28,6 +28,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, JoinSide, JoinType}; +use datafusion_execution::config::SessionConfig; use datafusion_expr_common::sort_properties::SortProperties; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::LexOrdering; @@ -106,8 +107,9 @@ impl PhysicalOptimizerRule for JoinSelection { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { + let config_options = config.options(); // First, we make pipeline-fixing modifications to joins so as to accommodate // unbounded inputs. Each pipeline-fixing subrule, which is a function // of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`] @@ -118,7 +120,7 @@ impl PhysicalOptimizerRule for JoinSelection { Box::new(hash_join_swap_subrule), ]; let new_plan = plan - .transform_up(|p| apply_subrules(p, &subrules, config)) + .transform_up(|p| apply_subrules(p, &subrules, config_options)) .data()?; // Next, we apply another subrule that tries to optimize joins using any // statistics their inputs might have. @@ -131,9 +133,11 @@ impl PhysicalOptimizerRule for JoinSelection { // do not modify join sides. // - We will also swap left and right sides for cross joins so that the left // side is the small side. - let config = &config.optimizer; - let collect_threshold_byte_size = config.hash_join_single_partition_threshold; - let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows; + let optimizer_config = &config_options.optimizer; + let collect_threshold_byte_size = + optimizer_config.hash_join_single_partition_threshold; + let collect_threshold_num_rows = + optimizer_config.hash_join_single_partition_threshold_rows; new_plan .transform_up(|plan| { statistical_join_selection_subrule( diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7469c3af9344c..9742d08321394 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -23,10 +23,10 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; -use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TreeNodeRecursion}; use datafusion_common::utils::combine_limit; +use datafusion_execution::config::SessionConfig; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -63,7 +63,7 @@ impl PhysicalOptimizerRule for LimitPushdown { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result> { let global_state = GlobalRequirements { fetch: None, diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs index 1c671cd074886..e4f6d990851b8 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs @@ -16,9 +16,9 @@ // under the License. use crate::PhysicalOptimizerRule; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::ScalarValue; +use datafusion_execution::config::SessionConfig; use datafusion_expr::{LimitEffect, WindowFrameBound, WindowFrameUnits}; use datafusion_physical_expr::window::{ PlainAggregateWindowExpr, SlidingAggregateWindowExpr, StandardWindowExpr, @@ -74,9 +74,9 @@ impl PhysicalOptimizerRule for LimitPushPastWindows { fn optimize( &self, original: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> datafusion_common::Result> { - if !config.optimizer.enable_window_limits { + if !config.options().optimizer.enable_window_limits { return Ok(original); } let mut ctx = TraverseState::default(); diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs index 3666ff3798b67..9152ceec018ef 100644 --- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -24,9 +24,9 @@ use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; +use datafusion_execution::config::SessionConfig; use crate::PhysicalOptimizerRule; use itertools::Itertools; @@ -161,9 +161,13 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { - if config.optimizer.enable_distinct_aggregation_soft_limit { + if config + .options() + .optimizer + .enable_distinct_aggregation_soft_limit + { plan.transform_down(|plan| { Ok( if let Some(plan) = diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 4d00f1029db71..b0e7419a0a936 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -38,8 +38,8 @@ use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::coalesce_async_exec_input::CoalesceAsyncExecInput; use crate::limit_pushdown_past_window::LimitPushPastWindows; -use datafusion_common::config::ConfigOptions; use datafusion_common::Result; +use datafusion_execution::config::SessionConfig; use datafusion_physical_plan::ExecutionPlan; /// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which @@ -54,7 +54,7 @@ pub trait PhysicalOptimizerRule: Debug { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result>; /// A human readable name for this optimizer rule diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 9e5e980219767..6a4346cb6f0f1 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -26,9 +26,9 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, Statistics}; +use datafusion_execution::config::SessionConfig; use datafusion_execution::TaskContext; use datafusion_physical_expr::Distribution; use datafusion_physical_expr_common::sort_expr::OrderingRequirements; @@ -306,7 +306,7 @@ impl PhysicalOptimizerRule for OutputRequirements { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result> { match self.mode { RuleMode::Add => require_top_ordering(plan), diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs b/datafusion/physical-optimizer/src/projection_pushdown.rs index 987e3cb6f713e..c7ab142db0ae6 100644 --- a/datafusion/physical-optimizer/src/projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/projection_pushdown.rs @@ -26,11 +26,11 @@ use datafusion_common::alias::AliasGenerator; use std::collections::HashSet; use std::sync::Arc; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; use datafusion_common::{JoinSide, JoinType, Result}; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; @@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result> { let alias_generator = AliasGenerator::new(); let plan = plan @@ -447,6 +447,7 @@ fn is_volatile_expression_tree(expr: &dyn PhysicalExpr) -> bool { mod test { use super::*; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; + use datafusion_common::config::ConfigOptions; use datafusion_expr_common::operator::Operator; use datafusion_functions::math::random; use datafusion_physical_expr::expressions::{binary, lit}; diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index acc70d39f057b..593c05b7eacb1 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -26,9 +26,10 @@ use std::sync::Arc; use datafusion_common::Result; use datafusion_physical_plan::ExecutionPlan; -use datafusion_common::config::{ConfigOptions, OptimizerOptions}; +use datafusion_common::config::OptimizerOptions; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; @@ -57,9 +58,9 @@ impl PhysicalOptimizerRule for SanityCheckPlan { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { - plan.transform_up(|p| check_plan_sanity(p, &config.optimizer)) + plan.transform_up(|p| check_plan_sanity(p, &config.options().optimizer)) .data() } diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs b/datafusion/physical-optimizer/src/topk_aggregation.rs index b7505f0df4edb..1c0c93251ec29 100644 --- a/datafusion/physical-optimizer/src/topk_aggregation.rs +++ b/datafusion/physical-optimizer/src/topk_aggregation.rs @@ -21,9 +21,9 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; use arrow::datatypes::DataType; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::execution_plan::CardinalityEffect; @@ -148,9 +148,9 @@ impl PhysicalOptimizerRule for TopKAggregation { fn optimize( &self, plan: Arc, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result> { - if config.optimizer.enable_topk_aggregation { + if config.options().optimizer.enable_topk_aggregation { plan.transform_down(|plan| { Ok(if let Some(plan) = TopKAggregation::transform_sort(&plan) { Transformed::yes(plan) diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs b/datafusion/physical-optimizer/src/update_aggr_exprs.rs index 61bc715592af6..a663b3234080f 100644 --- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs +++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs @@ -20,9 +20,9 @@ use std::sync::Arc; -use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{plan_datafusion_err, Result}; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement}; use datafusion_physical_plan::aggregates::{concat_slices, AggregateExec}; @@ -73,7 +73,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result> { plan.transform_up(|plan| { if let Some(aggr_exec) = plan.as_any().downcast_ref::() { diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 397bd9a377c35..766085d2e8922 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -40,7 +40,7 @@ use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; -use datafusion_common::config::ConfigOptions; +use datafusion_execution::config::SessionConfig; use futures::ready; use futures::stream::{Stream, StreamExt}; @@ -229,7 +229,7 @@ impl ExecutionPlan for CoalesceBatchesExec { &self, _phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { FilterDescription::from_children(parent_filters, &self.children()) } @@ -238,7 +238,7 @@ impl ExecutionPlan for CoalesceBatchesExec { &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 5869c51b26b8d..4eb41684014f6 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -32,8 +32,8 @@ use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{make_with_child, ProjectionExec}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; -use datafusion_common::config::ConfigOptions; use datafusion_common::{internal_err, Result}; +use datafusion_execution::config::SessionConfig; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -268,7 +268,7 @@ impl ExecutionPlan for CoalescePartitionsExec { &self, _phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { FilterDescription::from_children(parent_filters, &self.children()) } diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index b62d15e6d2f17..3e1a427831ca7 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -65,7 +65,6 @@ //! The optimizer rule currently checks the plan for exchange-like operators and leave operators //! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties). -use datafusion_common::config::ConfigOptions; use datafusion_physical_expr::PhysicalExpr; #[cfg(datafusion_coop = "tokio_fallback")] use futures::Future; @@ -90,6 +89,7 @@ use datafusion_execution::TaskContext; use crate::execution_plan::SchedulingType; use crate::stream::RecordBatchStreamAdapter; +use datafusion_execution::config::SessionConfig; use futures::{Stream, StreamExt}; /// A stream that passes record batches through unchanged while cooperating with the Tokio runtime. @@ -300,7 +300,7 @@ impl ExecutionPlan for CooperativeExec { &self, _phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { FilterDescription::from_children(parent_filters, &self.children()) } @@ -309,7 +309,7 @@ impl ExecutionPlan for CooperativeExec { &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index a70cd9cb0d64d..6e1f5cb72c3ad 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -53,6 +53,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; +use datafusion_execution::config::SessionConfig; use futures::stream::{StreamExt, TryStreamExt}; /// Represent nodes in the DataFusion Physical Plan. @@ -553,7 +554,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { &self, _phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { Ok(FilterDescription::all_unsupported( &parent_filters, @@ -644,7 +645,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 047c72076e4c6..121b127090339 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -45,7 +45,6 @@ use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; -use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, @@ -61,6 +60,7 @@ use datafusion_physical_expr::{ ConstExpr, ExprBoundaries, PhysicalExpr, }; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -449,7 +449,7 @@ impl ExecutionPlan for FilterExec { &self, phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { if !matches!(phase, FilterPushdownPhase::Pre) { // For non-pre phase, filters pass through unchanged @@ -478,7 +478,7 @@ impl ExecutionPlan for FilterExec { &self, phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { if !matches!(phase, FilterPushdownPhase::Pre) { return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b5fe5ee5cda14..8dc3751228035 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -60,7 +60,6 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use arrow_schema::DataType; -use datafusion_common::config::ConfigOptions; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result, @@ -76,6 +75,7 @@ use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; +use datafusion_execution::config::SessionConfig; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::TryStreamExt; use parking_lot::Mutex; @@ -1112,7 +1112,7 @@ impl ExecutionPlan for HashJoinExec { &self, phase: FilterPushdownPhase, parent_filters: Vec>, - config: &ConfigOptions, + config: &SessionConfig, ) -> Result { // Other types of joins can support *some* filters, but restrictions are complex and error prone. // For now we don't support them. @@ -1137,7 +1137,10 @@ impl ExecutionPlan for HashJoinExec { // Add dynamic filters in Post phase if enabled if matches!(phase, FilterPushdownPhase::Post) - && config.optimizer.enable_join_dynamic_filter_pushdown + && config + .options() + .optimizer + .enable_join_dynamic_filter_pushdown { // Add actual dynamic filter to right side (probe side) let dynamic_filter = Self::create_dynamic_filter(&self.on); @@ -1153,7 +1156,7 @@ impl ExecutionPlan for HashJoinExec { &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for // non-inner joins in `gather_filters_for_pushdown`. diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 6eea70e1176d3..c38c533c82c76 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -41,7 +41,6 @@ use std::task::{Context, Poll}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; -use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, @@ -53,6 +52,7 @@ use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_execution::config::SessionConfig; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -370,7 +370,7 @@ impl ExecutionPlan for ProjectionExec { &self, _phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { // TODO: In future, we can try to handle inverting aliases here. // For the time being, we pass through untransformed filters, so filters on aliases are not handled. @@ -382,7 +382,7 @@ impl ExecutionPlan for ProjectionExec { &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index dafde268ba737..84c3c03e9fc3a 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -59,6 +59,7 @@ use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; +use datafusion_execution::config::SessionConfig; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::trace; @@ -845,7 +846,7 @@ impl ExecutionPlan for RepartitionExec { &self, _phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { FilterDescription::from_children(parent_filters, &self.children()) } @@ -854,7 +855,7 @@ impl ExecutionPlan for RepartitionExec { &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result>> { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index bd798ab4f54b2..d0ae4562b32cf 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -66,6 +66,7 @@ use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExpr; +use datafusion_execution::config::SessionConfig; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -1345,7 +1346,7 @@ impl ExecutionPlan for SortExec { &self, phase: FilterPushdownPhase, parent_filters: Vec>, - config: &datafusion_common::config::ConfigOptions, + config: &SessionConfig, ) -> Result { if !matches!(phase, FilterPushdownPhase::Post) { return FilterDescription::from_children(parent_filters, &self.children()); @@ -1355,7 +1356,11 @@ impl ExecutionPlan for SortExec { ChildFilterDescription::from_child(&parent_filters, self.input())?; if let Some(filter) = &self.filter { - if config.optimizer.enable_topk_dynamic_filter_pushdown { + if config + .options() + .optimizer + .enable_topk_dynamic_filter_pushdown + { child = child.with_self_filter(filter.read().expr()); } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 164f17edebd31..f901502d422f5 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -43,12 +43,12 @@ use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::{exec_err, internal_datafusion_err, internal_err, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{calculate_union, EquivalenceProperties, PhysicalExpr}; +use datafusion_execution::config::SessionConfig; use futures::Stream; use itertools::Itertools; use log::{debug, trace, warn}; @@ -359,7 +359,7 @@ impl ExecutionPlan for UnionExec { &self, _phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + _config: &SessionConfig, ) -> Result { FilterDescription::from_children(parent_filters, &self.children()) }