Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions datafusion/core/benches/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use arrow::datatypes::{DataType, Field, Schema};
use bytes::{BufMut, BytesMut};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::config::ConfigOptions;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::ExecutionPlan;
use object_store::memory::InMemory;
use object_store::path::Path;
Expand Down Expand Up @@ -106,11 +106,13 @@ fn bench_push_down_filter(c: &mut Criterion) {
config.execution.parquet.pushdown_filters = true;
let plan = BenchmarkPlan { plan, config };
let optimizer = FilterPushdown::new();
let session_config = SessionConfig::from(plan.config.clone());
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);

c.bench_function("push_down_filter", |b| {
b.iter(|| {
optimizer
.optimize(Arc::clone(&plan.plan), &plan.config)
.optimize_plan(Arc::clone(&plan.plan), &optimizer_context)
.unwrap();
});
});
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::{
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
Expand Down Expand Up @@ -2271,11 +2271,15 @@ impl DefaultPhysicalPlanner {
// to verify that the plan fulfills the base requirements.
InvariantChecker(InvariantLevel::Always).check(&plan)?;

// Create optimizer context from session state
let optimizer_context =
OptimizerContext::new_from_session_config(&session_state.config().clone());

let mut new_plan = Arc::clone(&plan);
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer
.optimize(new_plan, session_state.config_options())
.optimize_plan(new_plan, &optimizer_context)
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coop::make_cooperative;
use datafusion_physical_plan::filter::FilterExec;
Expand Down Expand Up @@ -810,8 +810,9 @@ async fn query_yields(
task_ctx: Arc<TaskContext>,
) -> Result<(), Box<dyn Error>> {
// Run plan through EnsureCooperative
let optimized =
EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?;
let optimizer_context =
OptimizerContext::new_from_session_config(&task_ctx.session_config().clone());
let optimized = EnsureCooperative::new().optimize_plan(plan, &optimizer_context)?;

// Get the stream
let stream = physical_plan::execute_stream(optimized, task_ctx)?;
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use datafusion_expr::{col, lit, Expr};
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::ExecutionPlan;
use tempfile::tempdir;
Expand Down Expand Up @@ -84,8 +84,10 @@ async fn check_stats_precision_with_filter_pushdown() {
Arc::new(FilterExec::try_new(physical_filter, exec_with_filter).unwrap())
as Arc<dyn ExecutionPlan>;

let session_config = SessionConfig::from(options.clone());
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);
let optimized_exec = FilterPushdown::new()
.optimize(filtered_exec, &options)
.optimize_plan(filtered_exec, &optimizer_context)
.unwrap();

assert!(
Expand Down
21 changes: 14 additions & 7 deletions datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion_common::cast::as_int64_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{self, cast};
use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
use datafusion_physical_optimizer::OptimizerContext;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::aggregates::AggregateMode;
Expand Down Expand Up @@ -67,8 +68,10 @@ async fn assert_count_optim_success(
let task_ctx = Arc::new(TaskContext::default());
let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);

let config = ConfigOptions::new();
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?;
let session_config = SessionConfig::new();
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);
let optimized = AggregateStatistics::new()
.optimize_plan(Arc::clone(&plan), &optimizer_context)?;

// A ProjectionExec is a sign that the count optimization was applied
assert!(optimized.as_any().is::<ProjectionExec>());
Expand Down Expand Up @@ -264,8 +267,10 @@ async fn test_count_inexact_stat() -> Result<()> {
Arc::clone(&schema),
)?;

let conf = ConfigOptions::new();
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
let session_config = SessionConfig::new();
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);
let optimized = AggregateStatistics::new()
.optimize_plan(Arc::new(final_agg), &optimizer_context)?;

// check that the original ExecutionPlan was not replaced
assert!(optimized.as_any().is::<AggregateExec>());
Expand Down Expand Up @@ -308,8 +313,10 @@ async fn test_count_with_nulls_inexact_stat() -> Result<()> {
Arc::clone(&schema),
)?;

let conf = ConfigOptions::new();
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
let session_config = SessionConfig::new();
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);
let optimized = AggregateStatistics::new()
.optimize_plan(Arc::new(final_agg), &optimizer_context)?;

// check that the original ExecutionPlan was not replaced
assert!(optimized.as_any().is::<AggregateExec>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ use std::sync::Arc;
use crate::physical_optimizer::test_utils::parquet_exec;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_execution::config::SessionConfig;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::{col, lit};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand All @@ -47,8 +47,9 @@ macro_rules! assert_optimized {
($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => {
// run optimizer
let optimizer = CombinePartialFinalAggregate {};
let config = ConfigOptions::new();
let optimized = optimizer.optimize($PLAN, &config)?;
let session_config = SessionConfig::new();
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);
let optimized = optimizer.optimize_plan($PLAN, &optimizer_context)?;
// Now format correctly
let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = plan.trim();
Expand Down
29 changes: 22 additions & 7 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{
use datafusion_physical_optimizer::enforce_distribution::*;
use datafusion_physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand Down Expand Up @@ -489,7 +489,10 @@ impl TestConfig {
) -> Result<Arc<dyn ExecutionPlan>> {
// Add the ancillary output requirements operator at the start:
let optimizer = OutputRequirements::new_add_mode();
let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
let session_config = SessionConfig::from(self.config.clone());
let optimizer_context =
OptimizerContext::new_from_session_config(&session_config);
let mut optimized = optimizer.optimize_plan(plan.clone(), &optimizer_context)?;

// This file has 2 rules that use tree node, apply these rules to original plan consecutively
// After these operations tree nodes should be in a consistent state.
Expand Down Expand Up @@ -525,21 +528,27 @@ impl TestConfig {
}

for run in optimizers_to_run {
let session_config = SessionConfig::from(self.config.clone());
let optimizer_context =
OptimizerContext::new_from_session_config(&session_config);
optimized = match run {
Run::Distribution => {
let optimizer = EnforceDistribution::new();
optimizer.optimize(optimized, &self.config)?
optimizer.optimize_plan(optimized, &optimizer_context)?
}
Run::Sorting => {
let optimizer = EnforceSorting::new();
optimizer.optimize(optimized, &self.config)?
optimizer.optimize_plan(optimized, &optimizer_context)?
}
};
}

// Remove the ancillary output requirements operator when done:
let optimizer = OutputRequirements::new_remove_mode();
let optimized = optimizer.optimize(optimized, &self.config)?;
let session_config = SessionConfig::from(self.config.clone());
let optimizer_context =
OptimizerContext::new_from_session_config(&session_config);
let optimized = optimizer.optimize_plan(optimized, &optimizer_context)?;

Ok(optimized)
}
Expand Down Expand Up @@ -3372,7 +3381,10 @@ SortRequiredExec: [a@0 ASC]
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
let session_config = SessionConfig::from(config);
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);
let dist_plan =
EnforceDistribution::new().optimize_plan(physical_plan, &optimizer_context)?;
// Since at the start of the rule ordering requirement is not satisfied
// EnforceDistribution rule doesn't satisfy this requirement either.
assert_plan!(dist_plan, @r"
Expand Down Expand Up @@ -3408,7 +3420,10 @@ SortRequiredExec: [a@0 ASC]
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
let session_config = SessionConfig::from(config);
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);
let dist_plan =
EnforceDistribution::new().optimize_plan(physical_plan, &optimizer_context)?;
// Since at the start of the rule ordering requirement is satisfied
// EnforceDistribution rule satisfy this requirement also.
assert_plan!(dist_plan, @r"
Expand Down
14 changes: 10 additions & 4 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preservin
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion::prelude::*;
use arrow::array::{Int32Array, RecordBatch};
use arrow::datatypes::{Field};
Expand Down Expand Up @@ -175,8 +175,11 @@ impl EnforceSortingTest {
let input_plan_string = displayable(self.plan.as_ref()).indent(true).to_string();

// Run the actual optimizer
let session_config = SessionConfig::from(config);
let optimizer_context =
OptimizerContext::new_from_session_config(&session_config);
let optimized_physical_plan = EnforceSorting::new()
.optimize(Arc::clone(&self.plan), &config)
.optimize_plan(Arc::clone(&self.plan), &optimizer_context)
.expect("enforce_sorting failed");

// Get string representation of the plan
Expand Down Expand Up @@ -2363,23 +2366,26 @@ async fn test_commutativity() -> Result<()> {
"#);

let config = ConfigOptions::new();
let session_config = SessionConfig::from(config);
let optimizer_context = OptimizerContext::new_from_session_config(&session_config);
let rules = vec![
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
];
let mut first_plan = orig_plan.clone();
for rule in rules {
first_plan = rule.optimize(first_plan, &config)?;
first_plan = rule.optimize_plan(first_plan, &optimizer_context)?;
}

let optimizer_context2 = OptimizerContext::new_from_session_config(&session_config);
let rules = vec![
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
];
let mut second_plan = orig_plan.clone();
for rule in rules {
second_plan = rule.optimize(second_plan, &config)?;
second_plan = rule.optimize_plan(second_plan, &optimizer_context2)?;
}

assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan));
Expand Down
Loading
Loading