Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions datafusion/core/benches/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use arrow::datatypes::{DataType, Field, Schema};
use bytes::{BufMut, BytesMut};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::config::ConfigOptions;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::ExecutionPlan;
use object_store::memory::InMemory;
use object_store::path::Path;
Expand Down Expand Up @@ -106,11 +106,13 @@ fn bench_push_down_filter(c: &mut Criterion) {
config.execution.parquet.pushdown_filters = true;
let plan = BenchmarkPlan { plan, config };
let optimizer = FilterPushdown::new();
let session_config = SessionConfig::from(plan.config.clone());
let optimizer_context = OptimizerContext::new(session_config);

c.bench_function("push_down_filter", |b| {
b.iter(|| {
optimizer
.optimize(Arc::clone(&plan.plan), &plan.config)
.optimize_plan(Arc::clone(&plan.plan), &optimizer_context)
.unwrap();
});
});
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::{
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
Expand Down Expand Up @@ -2271,11 +2271,14 @@ impl DefaultPhysicalPlanner {
// to verify that the plan fulfills the base requirements.
InvariantChecker(InvariantLevel::Always).check(&plan)?;

// Create optimizer context from session state
let optimizer_context = OptimizerContext::new(session_state.config().clone());

let mut new_plan = Arc::clone(&plan);
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer
.optimize(new_plan, session_state.config_options())
.optimize_plan(new_plan, &optimizer_context)
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coop::make_cooperative;
use datafusion_physical_plan::filter::FilterExec;
Expand Down Expand Up @@ -810,8 +810,8 @@ async fn query_yields(
task_ctx: Arc<TaskContext>,
) -> Result<(), Box<dyn Error>> {
// Run plan through EnsureCooperative
let optimized =
EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?;
let optimizer_context = OptimizerContext::new(task_ctx.session_config().clone());
let optimized = EnsureCooperative::new().optimize_plan(plan, &optimizer_context)?;

// Get the stream
let stream = physical_plan::execute_stream(optimized, task_ctx)?;
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use datafusion_expr::{col, lit, Expr};
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::ExecutionPlan;
use tempfile::tempdir;
Expand Down Expand Up @@ -84,8 +84,10 @@ async fn check_stats_precision_with_filter_pushdown() {
Arc::new(FilterExec::try_new(physical_filter, exec_with_filter).unwrap())
as Arc<dyn ExecutionPlan>;

let session_config = SessionConfig::from(options.clone());
let optimizer_context = OptimizerContext::new(session_config);
let optimized_exec = FilterPushdown::new()
.optimize(filtered_exec, &options)
.optimize_plan(filtered_exec, &optimizer_context)
.unwrap();

assert!(
Expand Down
21 changes: 14 additions & 7 deletions datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion_common::cast::as_int64_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{self, cast};
use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
use datafusion_physical_optimizer::OptimizerContext;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::aggregates::AggregateMode;
Expand Down Expand Up @@ -67,8 +68,10 @@ async fn assert_count_optim_success(
let task_ctx = Arc::new(TaskContext::default());
let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);

let config = ConfigOptions::new();
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?;
let session_config = SessionConfig::new();
let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized = AggregateStatistics::new()
.optimize_plan(Arc::clone(&plan), &optimizer_context)?;

// A ProjectionExec is a sign that the count optimization was applied
assert!(optimized.as_any().is::<ProjectionExec>());
Expand Down Expand Up @@ -264,8 +267,10 @@ async fn test_count_inexact_stat() -> Result<()> {
Arc::clone(&schema),
)?;

let conf = ConfigOptions::new();
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
let session_config = SessionConfig::new();
let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized = AggregateStatistics::new()
.optimize_plan(Arc::new(final_agg), &optimizer_context)?;

// check that the original ExecutionPlan was not replaced
assert!(optimized.as_any().is::<AggregateExec>());
Expand Down Expand Up @@ -308,8 +313,10 @@ async fn test_count_with_nulls_inexact_stat() -> Result<()> {
Arc::clone(&schema),
)?;

let conf = ConfigOptions::new();
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
let session_config = SessionConfig::new();
let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized = AggregateStatistics::new()
.optimize_plan(Arc::new(final_agg), &optimizer_context)?;

// check that the original ExecutionPlan was not replaced
assert!(optimized.as_any().is::<AggregateExec>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ use std::sync::Arc;
use crate::physical_optimizer::test_utils::parquet_exec;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_execution::config::SessionConfig;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::{col, lit};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand All @@ -47,8 +47,9 @@ macro_rules! assert_optimized {
($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => {
// run optimizer
let optimizer = CombinePartialFinalAggregate {};
let config = ConfigOptions::new();
let optimized = optimizer.optimize($PLAN, &config)?;
let session_config = SessionConfig::new();
let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized = optimizer.optimize_plan($PLAN, &optimizer_context)?;
// Now format correctly
let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = plan.trim();
Expand Down
26 changes: 19 additions & 7 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{
use datafusion_physical_optimizer::enforce_distribution::*;
use datafusion_physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand Down Expand Up @@ -489,7 +489,9 @@ impl TestConfig {
) -> Result<Arc<dyn ExecutionPlan>> {
// Add the ancillary output requirements operator at the start:
let optimizer = OutputRequirements::new_add_mode();
let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
let session_config = SessionConfig::from(self.config.clone());
let optimizer_context = OptimizerContext::new(session_config.clone());
let mut optimized = optimizer.optimize_plan(plan.clone(), &optimizer_context)?;

// This file has 2 rules that use tree node, apply these rules to original plan consecutively
// After these operations tree nodes should be in a consistent state.
Expand Down Expand Up @@ -525,21 +527,25 @@ impl TestConfig {
}

for run in optimizers_to_run {
let session_config = SessionConfig::from(self.config.clone());
let optimizer_context = OptimizerContext::new(session_config.clone());
optimized = match run {
Run::Distribution => {
let optimizer = EnforceDistribution::new();
optimizer.optimize(optimized, &self.config)?
optimizer.optimize_plan(optimized, &optimizer_context)?
}
Run::Sorting => {
let optimizer = EnforceSorting::new();
optimizer.optimize(optimized, &self.config)?
optimizer.optimize_plan(optimized, &optimizer_context)?
}
};
}

// Remove the ancillary output requirements operator when done:
let optimizer = OutputRequirements::new_remove_mode();
let optimized = optimizer.optimize(optimized, &self.config)?;
let session_config = SessionConfig::from(self.config.clone());
let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized = optimizer.optimize_plan(optimized, &optimizer_context)?;

Ok(optimized)
}
Expand Down Expand Up @@ -3372,7 +3378,10 @@ SortRequiredExec: [a@0 ASC]
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
let session_config = SessionConfig::from(config);
let optimizer_context = OptimizerContext::new(session_config.clone());
let dist_plan =
EnforceDistribution::new().optimize_plan(physical_plan, &optimizer_context)?;
// Since at the start of the rule ordering requirement is not satisfied
// EnforceDistribution rule doesn't satisfy this requirement either.
assert_plan!(dist_plan, @r"
Expand Down Expand Up @@ -3408,7 +3417,10 @@ SortRequiredExec: [a@0 ASC]
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
let session_config = SessionConfig::from(config);
let optimizer_context = OptimizerContext::new(session_config.clone());
let dist_plan =
EnforceDistribution::new().optimize_plan(physical_plan, &optimizer_context)?;
// Since at the start of the rule ordering requirement is satisfied
// EnforceDistribution rule satisfy this requirement also.
assert_plan!(dist_plan, @r"
Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preservin
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion::prelude::*;
use arrow::array::{Int32Array, RecordBatch};
use arrow::datatypes::{Field};
Expand Down Expand Up @@ -175,8 +175,10 @@ impl EnforceSortingTest {
let input_plan_string = displayable(self.plan.as_ref()).indent(true).to_string();

// Run the actual optimizer
let session_config = SessionConfig::from(config);
let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_physical_plan = EnforceSorting::new()
.optimize(Arc::clone(&self.plan), &config)
.optimize_plan(Arc::clone(&self.plan), &optimizer_context)
.expect("enforce_sorting failed");

// Get string representation of the plan
Expand Down Expand Up @@ -2363,23 +2365,26 @@ async fn test_commutativity() -> Result<()> {
"#);

let config = ConfigOptions::new();
let session_config = SessionConfig::from(config);
let optimizer_context = OptimizerContext::new(session_config.clone());
let rules = vec![
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
];
let mut first_plan = orig_plan.clone();
for rule in rules {
first_plan = rule.optimize(first_plan, &config)?;
first_plan = rule.optimize_plan(first_plan, &optimizer_context)?;
}

let optimizer_context2 = OptimizerContext::new(session_config.clone());
let rules = vec![
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
];
let mut second_plan = orig_plan.clone();
for rule in rules {
second_plan = rule.optimize(second_plan, &config)?;
second_plan = rule.optimize_plan(second_plan, &optimizer_context2)?;
}

assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan));
Expand Down
Loading
Loading