Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 263 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,269 @@ fn test_pushdown_into_scan_with_config_options() {
);
}

#[tokio::test]
async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with limited values
let build_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0])
)
.unwrap()];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values
let probe_batches = vec![record_batch!(
("d", Utf8, ["aa", "ab", "ac", "ad"]),
("e", Utf8, ["ba", "bb", "bc", "bd"]),
("f", Float64, [1.0, 2.0, 3.0, 4.0])
)
.unwrap()];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("d", DataType::Utf8, false),
Field::new("e", DataType::Utf8, false),
Field::new("f", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create HashJoinExec
let on = vec![(
col("a", &build_side_schema).unwrap(),
col("d", &probe_side_schema).unwrap(),
)];
let join = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
);

let join_schema = join.schema();

// Finally let's add a SortExec on the outside to test pushdown of dynamic filters
let sort_expr =
PhysicalSortExpr::new(col("e", &join_schema).unwrap(), SortOptions::default());
let plan = Arc::new(
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), join)
.with_fetch(Some(2)),
) as Arc<dyn ExecutionPlan>;

let mut config = ConfigOptions::default();
config.optimizer.enable_dynamic_filter_pushdown = true;
config.execution.parquet.pushdown_filters = true;

// Appy the FilterPushdown optimizer rule
let plan = FilterPushdown::new_post_optimization()
.optimize(Arc::clone(&plan), &config)
.unwrap();

// Test that filters are pushed down correctly to each side of the join
insta::assert_snapshot!(
format_plan_for_test(&plan),
@r"
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
"
);

// Put some data through the plan to check that the filter is updated to reflect the TopK state
let session_ctx = SessionContext::new_with_config(SessionConfig::new());
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
// Iterate one batch
stream.next().await.unwrap().unwrap();

// Test that filters are pushed down correctly to each side of the join
insta::assert_snapshot!(
format_plan_for_test(&plan),
@r"
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
"
);
}

// Test both static and dynamic filter pushdown in HashJoinExec.
// Note that static filter pushdown is rare: it should have already happened in the logical optimizer phase.
// However users may manually construct plans that could result in a FilterExec -> HashJoinExec -> Scan setup.
// Dynamic filters arise in cases such as nested inner joins or TopK -> HashJoinExec -> Scan setups.
#[tokio::test]
async fn test_static_filter_pushdown_through_hash_join() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with limited values
let build_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0])
)
.unwrap()];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values
let probe_batches = vec![record_batch!(
("d", Utf8, ["aa", "ab", "ac", "ad"]),
("e", Utf8, ["ba", "bb", "bc", "bd"]),
("f", Float64, [1.0, 2.0, 3.0, 4.0])
)
.unwrap()];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("d", DataType::Utf8, false),
Field::new("e", DataType::Utf8, false),
Field::new("f", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create HashJoinExec
let on = vec![(
col("a", &build_side_schema).unwrap(),
col("d", &probe_side_schema).unwrap(),
)];
let join = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_scan,
on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
);

// Create filters that can be pushed down to different sides
// We need to create filters in the context of the join output schema
let join_schema = join.schema();

// Filter on build side column: a = 'aa'
let left_filter = col_lit_predicate("a", "aa", &join_schema);
// Filter on probe side column: e = 'ba'
let right_filter = col_lit_predicate("e", "ba", &join_schema);
// Filter that references both sides: a = d (should not be pushed down)
let cross_filter = Arc::new(BinaryExpr::new(
col("a", &join_schema).unwrap(),
Operator::Eq,
col("d", &join_schema).unwrap(),
)) as Arc<dyn PhysicalExpr>;

let filter =
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap());
let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap())
as Arc<dyn ExecutionPlan>;

// Test that filters are pushed down correctly to each side of the join
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = d@3
- FilterExec: e@4 = ba
- FilterExec: a@0 = aa
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
output:
Ok:
- FilterExec: a@0 = d@3
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba
"
);

// Test left join - filters should NOT be pushed down
let join = Arc::new(
HashJoinExec::try_new(
TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.build(),
TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.build(),
vec![(
col("a", &build_side_schema).unwrap(),
col("d", &probe_side_schema).unwrap(),
)],
None,
&JoinType::Left,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
);

let join_schema = join.schema();
let filter = col_lit_predicate("a", "aa", &join_schema);
let plan =
Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc<dyn ExecutionPlan>;

// Test that filters are NOT pushed down for left join
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown::new(), true),
@r"
OptimizationTest:
input:
- FilterExec: a@0 = aa
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
output:
Ok:
- FilterExec: a@0 = aa
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
"
);
}

#[test]
fn test_filter_collapse() {
// filter should be pushed down into the parquet scan with two filters
Expand Down
22 changes: 6 additions & 16 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, PushedDownPredicate,
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
Expand All @@ -33,7 +33,6 @@ pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{
expressions, Distribution, Partitioning, PhysicalExpr,
};
use itertools::Itertools;

use std::any::Any;
use std::fmt::Debug;
Expand Down Expand Up @@ -521,19 +520,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
// Default implementation: mark all filters as unsupported for all children
let mut desc = FilterDescription::new();
let child_filters = parent_filters
.iter()
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
.collect_vec();
for _ in 0..self.children().len() {
desc = desc.with_child(ChildFilterDescription {
parent_filters: child_filters.clone(),
self_filters: vec![],
});
}
Ok(desc)
Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
))
}

/// Handle the result of a child pushdown.
Expand Down
35 changes: 34 additions & 1 deletion datafusion/physical-plan/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use std::sync::Arc;
use datafusion_common::Result;
use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use itertools::Itertools;

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterPushdownPhase {
/// Pushdown that happens before most other optimizations.
/// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down.
Expand Down Expand Up @@ -257,6 +258,19 @@ impl<T> FilterPushdownPropagation<T> {
}
}

/// Create a new [`FilterPushdownPropagation`] that tells the parent node that no filters were pushed down regardless of the child results.
pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self {
let filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|_| PushedDown::No)
.collect();
Self {
filters,
updated_node: None,
}
}

/// Create a new [`FilterPushdownPropagation`] with the specified filter support.
/// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was.
pub fn with_parent_pushdown_result(filters: Vec<PushedDown>) -> Self {
Expand Down Expand Up @@ -413,6 +427,25 @@ impl FilterDescription {
Ok(desc)
}

/// Mark all parent filters as unsupported for all children.
pub fn all_unsupported(
parent_filters: &[Arc<dyn PhysicalExpr>],
children: &[&Arc<dyn crate::ExecutionPlan>],
) -> Self {
let mut desc = Self::new();
let child_filters = parent_filters
.iter()
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
.collect_vec();
for _ in 0..children.len() {
desc = desc.with_child(ChildFilterDescription {
parent_filters: child_filters.clone(),
self_filters: vec![],
});
}
desc
}

pub fn parent_filters(&self) -> Vec<Vec<PushedDownPredicate>> {
self.child_filter_descriptions
.iter()
Expand Down
Loading