Skip to content

Commit 484cc58

Browse files
adriangbStanding-Man
authored andcommitted
Enable physical filter pushdown for hash joins (apache#16954)
1 parent df4efcd commit 484cc58

File tree

4 files changed

+350
-18
lines changed

4 files changed

+350
-18
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,269 @@ fn test_pushdown_into_scan_with_config_options() {
159159
);
160160
}
161161

162+
#[tokio::test]
163+
async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
164+
use datafusion_common::JoinType;
165+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
166+
167+
// Create build side with limited values
168+
let build_batches = vec![record_batch!(
169+
("a", Utf8, ["aa", "ab"]),
170+
("b", Utf8, ["ba", "bb"]),
171+
("c", Float64, [1.0, 2.0])
172+
)
173+
.unwrap()];
174+
let build_side_schema = Arc::new(Schema::new(vec![
175+
Field::new("a", DataType::Utf8, false),
176+
Field::new("b", DataType::Utf8, false),
177+
Field::new("c", DataType::Float64, false),
178+
]));
179+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
180+
.with_support(true)
181+
.with_batches(build_batches)
182+
.build();
183+
184+
// Create probe side with more values
185+
let probe_batches = vec![record_batch!(
186+
("d", Utf8, ["aa", "ab", "ac", "ad"]),
187+
("e", Utf8, ["ba", "bb", "bc", "bd"]),
188+
("f", Float64, [1.0, 2.0, 3.0, 4.0])
189+
)
190+
.unwrap()];
191+
let probe_side_schema = Arc::new(Schema::new(vec![
192+
Field::new("d", DataType::Utf8, false),
193+
Field::new("e", DataType::Utf8, false),
194+
Field::new("f", DataType::Float64, false),
195+
]));
196+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
197+
.with_support(true)
198+
.with_batches(probe_batches)
199+
.build();
200+
201+
// Create HashJoinExec
202+
let on = vec![(
203+
col("a", &build_side_schema).unwrap(),
204+
col("d", &probe_side_schema).unwrap(),
205+
)];
206+
let join = Arc::new(
207+
HashJoinExec::try_new(
208+
build_scan,
209+
probe_scan,
210+
on,
211+
None,
212+
&JoinType::Inner,
213+
None,
214+
PartitionMode::Partitioned,
215+
datafusion_common::NullEquality::NullEqualsNothing,
216+
)
217+
.unwrap(),
218+
);
219+
220+
let join_schema = join.schema();
221+
222+
// Finally let's add a SortExec on the outside to test pushdown of dynamic filters
223+
let sort_expr =
224+
PhysicalSortExpr::new(col("e", &join_schema).unwrap(), SortOptions::default());
225+
let plan = Arc::new(
226+
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), join)
227+
.with_fetch(Some(2)),
228+
) as Arc<dyn ExecutionPlan>;
229+
230+
let mut config = ConfigOptions::default();
231+
config.optimizer.enable_dynamic_filter_pushdown = true;
232+
config.execution.parquet.pushdown_filters = true;
233+
234+
// Appy the FilterPushdown optimizer rule
235+
let plan = FilterPushdown::new_post_optimization()
236+
.optimize(Arc::clone(&plan), &config)
237+
.unwrap();
238+
239+
// Test that filters are pushed down correctly to each side of the join
240+
insta::assert_snapshot!(
241+
format_plan_for_test(&plan),
242+
@r"
243+
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
244+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
245+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
246+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
247+
"
248+
);
249+
250+
// Put some data through the plan to check that the filter is updated to reflect the TopK state
251+
let session_ctx = SessionContext::new_with_config(SessionConfig::new());
252+
session_ctx.register_object_store(
253+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
254+
Arc::new(InMemory::new()),
255+
);
256+
let state = session_ctx.state();
257+
let task_ctx = state.task_ctx();
258+
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
259+
// Iterate one batch
260+
stream.next().await.unwrap().unwrap();
261+
262+
// Test that filters are pushed down correctly to each side of the join
263+
insta::assert_snapshot!(
264+
format_plan_for_test(&plan),
265+
@r"
266+
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
267+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
268+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
269+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
270+
"
271+
);
272+
}
273+
274+
// Test both static and dynamic filter pushdown in HashJoinExec.
275+
// Note that static filter pushdown is rare: it should have already happened in the logical optimizer phase.
276+
// However users may manually construct plans that could result in a FilterExec -> HashJoinExec -> Scan setup.
277+
// Dynamic filters arise in cases such as nested inner joins or TopK -> HashJoinExec -> Scan setups.
278+
#[tokio::test]
279+
async fn test_static_filter_pushdown_through_hash_join() {
280+
use datafusion_common::JoinType;
281+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
282+
283+
// Create build side with limited values
284+
let build_batches = vec![record_batch!(
285+
("a", Utf8, ["aa", "ab"]),
286+
("b", Utf8, ["ba", "bb"]),
287+
("c", Float64, [1.0, 2.0])
288+
)
289+
.unwrap()];
290+
let build_side_schema = Arc::new(Schema::new(vec![
291+
Field::new("a", DataType::Utf8, false),
292+
Field::new("b", DataType::Utf8, false),
293+
Field::new("c", DataType::Float64, false),
294+
]));
295+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
296+
.with_support(true)
297+
.with_batches(build_batches)
298+
.build();
299+
300+
// Create probe side with more values
301+
let probe_batches = vec![record_batch!(
302+
("d", Utf8, ["aa", "ab", "ac", "ad"]),
303+
("e", Utf8, ["ba", "bb", "bc", "bd"]),
304+
("f", Float64, [1.0, 2.0, 3.0, 4.0])
305+
)
306+
.unwrap()];
307+
let probe_side_schema = Arc::new(Schema::new(vec![
308+
Field::new("d", DataType::Utf8, false),
309+
Field::new("e", DataType::Utf8, false),
310+
Field::new("f", DataType::Float64, false),
311+
]));
312+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
313+
.with_support(true)
314+
.with_batches(probe_batches)
315+
.build();
316+
317+
// Create HashJoinExec
318+
let on = vec![(
319+
col("a", &build_side_schema).unwrap(),
320+
col("d", &probe_side_schema).unwrap(),
321+
)];
322+
let join = Arc::new(
323+
HashJoinExec::try_new(
324+
build_scan,
325+
probe_scan,
326+
on,
327+
None,
328+
&JoinType::Inner,
329+
None,
330+
PartitionMode::Partitioned,
331+
datafusion_common::NullEquality::NullEqualsNothing,
332+
)
333+
.unwrap(),
334+
);
335+
336+
// Create filters that can be pushed down to different sides
337+
// We need to create filters in the context of the join output schema
338+
let join_schema = join.schema();
339+
340+
// Filter on build side column: a = 'aa'
341+
let left_filter = col_lit_predicate("a", "aa", &join_schema);
342+
// Filter on probe side column: e = 'ba'
343+
let right_filter = col_lit_predicate("e", "ba", &join_schema);
344+
// Filter that references both sides: a = d (should not be pushed down)
345+
let cross_filter = Arc::new(BinaryExpr::new(
346+
col("a", &join_schema).unwrap(),
347+
Operator::Eq,
348+
col("d", &join_schema).unwrap(),
349+
)) as Arc<dyn PhysicalExpr>;
350+
351+
let filter =
352+
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
353+
let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap());
354+
let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap())
355+
as Arc<dyn ExecutionPlan>;
356+
357+
// Test that filters are pushed down correctly to each side of the join
358+
insta::assert_snapshot!(
359+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
360+
@r"
361+
OptimizationTest:
362+
input:
363+
- FilterExec: a@0 = d@3
364+
- FilterExec: e@4 = ba
365+
- FilterExec: a@0 = aa
366+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
367+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
368+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
369+
output:
370+
Ok:
371+
- FilterExec: a@0 = d@3
372+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
373+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
374+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba
375+
"
376+
);
377+
378+
// Test left join - filters should NOT be pushed down
379+
let join = Arc::new(
380+
HashJoinExec::try_new(
381+
TestScanBuilder::new(Arc::clone(&build_side_schema))
382+
.with_support(true)
383+
.build(),
384+
TestScanBuilder::new(Arc::clone(&probe_side_schema))
385+
.with_support(true)
386+
.build(),
387+
vec![(
388+
col("a", &build_side_schema).unwrap(),
389+
col("d", &probe_side_schema).unwrap(),
390+
)],
391+
None,
392+
&JoinType::Left,
393+
None,
394+
PartitionMode::Partitioned,
395+
datafusion_common::NullEquality::NullEqualsNothing,
396+
)
397+
.unwrap(),
398+
);
399+
400+
let join_schema = join.schema();
401+
let filter = col_lit_predicate("a", "aa", &join_schema);
402+
let plan =
403+
Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc<dyn ExecutionPlan>;
404+
405+
// Test that filters are NOT pushed down for left join
406+
insta::assert_snapshot!(
407+
OptimizationTest::new(plan, FilterPushdown::new(), true),
408+
@r"
409+
OptimizationTest:
410+
input:
411+
- FilterExec: a@0 = aa
412+
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
413+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
414+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
415+
output:
416+
Ok:
417+
- FilterExec: a@0 = aa
418+
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
419+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
420+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
421+
"
422+
);
423+
}
424+
162425
#[test]
163426
fn test_filter_collapse() {
164427
// filter should be pushed down into the parquet scan with two filters

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
1919
use crate::filter_pushdown::{
20-
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21-
FilterPushdownPropagation, PushedDownPredicate,
20+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21+
FilterPushdownPropagation,
2222
};
2323
pub use crate::metrics::Metric;
2424
pub use crate::ordering::InputOrderMode;
@@ -33,7 +33,6 @@ pub use datafusion_physical_expr::window::WindowExpr;
3333
pub use datafusion_physical_expr::{
3434
expressions, Distribution, Partitioning, PhysicalExpr,
3535
};
36-
use itertools::Itertools;
3736

3837
use std::any::Any;
3938
use std::fmt::Debug;
@@ -521,19 +520,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
521520
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
522521
_config: &ConfigOptions,
523522
) -> Result<FilterDescription> {
524-
// Default implementation: mark all filters as unsupported for all children
525-
let mut desc = FilterDescription::new();
526-
let child_filters = parent_filters
527-
.iter()
528-
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
529-
.collect_vec();
530-
for _ in 0..self.children().len() {
531-
desc = desc.with_child(ChildFilterDescription {
532-
parent_filters: child_filters.clone(),
533-
self_filters: vec![],
534-
});
535-
}
536-
Ok(desc)
523+
Ok(FilterDescription::all_unsupported(
524+
&parent_filters,
525+
&self.children(),
526+
))
537527
}
538528

539529
/// Handle the result of a child pushdown.

datafusion/physical-plan/src/filter_pushdown.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ use std::sync::Arc;
4040
use datafusion_common::Result;
4141
use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns};
4242
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
43+
use itertools::Itertools;
4344

44-
#[derive(Debug, Clone, Copy)]
45+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4546
pub enum FilterPushdownPhase {
4647
/// Pushdown that happens before most other optimizations.
4748
/// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down.
@@ -257,6 +258,19 @@ impl<T> FilterPushdownPropagation<T> {
257258
}
258259
}
259260

261+
/// Create a new [`FilterPushdownPropagation`] that tells the parent node that no filters were pushed down regardless of the child results.
262+
pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self {
263+
let filters = child_pushdown_result
264+
.parent_filters
265+
.into_iter()
266+
.map(|_| PushedDown::No)
267+
.collect();
268+
Self {
269+
filters,
270+
updated_node: None,
271+
}
272+
}
273+
260274
/// Create a new [`FilterPushdownPropagation`] with the specified filter support.
261275
/// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was.
262276
pub fn with_parent_pushdown_result(filters: Vec<PushedDown>) -> Self {
@@ -413,6 +427,25 @@ impl FilterDescription {
413427
Ok(desc)
414428
}
415429

430+
/// Mark all parent filters as unsupported for all children.
431+
pub fn all_unsupported(
432+
parent_filters: &[Arc<dyn PhysicalExpr>],
433+
children: &[&Arc<dyn crate::ExecutionPlan>],
434+
) -> Self {
435+
let mut desc = Self::new();
436+
let child_filters = parent_filters
437+
.iter()
438+
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
439+
.collect_vec();
440+
for _ in 0..children.len() {
441+
desc = desc.with_child(ChildFilterDescription {
442+
parent_filters: child_filters.clone(),
443+
self_filters: vec![],
444+
});
445+
}
446+
desc
447+
}
448+
416449
pub fn parent_filters(&self) -> Vec<Vec<PushedDownPredicate>> {
417450
self.child_filter_descriptions
418451
.iter()

0 commit comments

Comments
 (0)