Skip to content

Commit 238a57e

Browse files
committed
Refactor dynamic filter side logic in join types and update related tests
1 parent d67f5db commit 238a57e

File tree

4 files changed

+35
-78
lines changed

4 files changed

+35
-78
lines changed

datafusion/common/src/join_type.rs

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -145,39 +145,6 @@ impl JoinType {
145145
pub fn preserves_right(self) -> bool {
146146
self.preserves(JoinSide::Right)
147147
}
148-
149-
/// Returns the input side eligible for dynamic filter pushdown.
150-
///
151-
/// The side returned here can have a `DynamicFilterPhysicalExpr` pushed
152-
/// into it, allowing values read from the opposite input to prune rows
153-
/// before the join executes. When both inputs must be preserved,
154-
/// dynamic filter pushdown is not supported and [`JoinSide::None`] is
155-
/// returned.
156-
///
157-
/// If neither input is preserving (for example with [`JoinType::Inner`],
158-
/// [`JoinType::LeftSemi`], [`JoinType::RightSemi`],
159-
/// [`JoinType::LeftAnti`], or [`JoinType::RightAnti`]), either side could
160-
/// in principle receive the pushed filter. DataFusion selects the probe
161-
/// side: for [`JoinType::LeftSemi`] and [`JoinType::LeftAnti`] this is the
162-
/// left input, for [`JoinType::RightSemi`] and [`JoinType::RightAnti`] it
163-
/// is the right input, and for other joins the right input is used by
164-
/// default as joins typically treat the right as the probe side.
165-
pub fn dynamic_filter_side(self) -> JoinSide {
166-
use JoinSide::*;
167-
let preserves_left = self.preserves_left();
168-
let preserves_right = self.preserves_right();
169-
170-
match (preserves_left, preserves_right) {
171-
(true, true) => None,
172-
(true, false) => Right,
173-
(false, true) => Left,
174-
(false, false) => match self {
175-
JoinType::LeftSemi | JoinType::LeftAnti => Left,
176-
JoinType::RightSemi | JoinType::RightAnti => Right,
177-
_ => Right,
178-
},
179-
}
180-
}
181148
}
182149

183150
impl Display for JoinType {
@@ -314,42 +281,4 @@ mod tests {
314281
assert!(!JoinType::RightAnti.preserves(Left));
315282
assert!(!JoinType::RightAnti.preserves(Right));
316283
}
317-
318-
#[test]
319-
fn test_dynamic_filter_side() {
320-
use JoinSide::*;
321-
322-
assert_eq!(JoinType::Inner.dynamic_filter_side(), Right);
323-
assert_eq!(JoinType::Left.dynamic_filter_side(), Right);
324-
assert_eq!(JoinType::Right.dynamic_filter_side(), Left);
325-
assert_eq!(JoinType::Full.dynamic_filter_side(), None);
326-
assert_eq!(JoinType::LeftSemi.dynamic_filter_side(), Left);
327-
assert_eq!(JoinType::RightSemi.dynamic_filter_side(), Right);
328-
assert_eq!(JoinType::LeftAnti.dynamic_filter_side(), Left);
329-
assert_eq!(JoinType::RightAnti.dynamic_filter_side(), Right);
330-
assert_eq!(JoinType::LeftMark.dynamic_filter_side(), Right);
331-
assert_eq!(JoinType::RightMark.dynamic_filter_side(), Left);
332-
}
333-
334-
#[test]
335-
fn test_dynamic_filter_side_preservation_logic() {
336-
use JoinSide::*;
337-
338-
for jt in [JoinType::Left, JoinType::LeftMark] {
339-
assert!(jt.preserves_left());
340-
assert!(!jt.preserves_right());
341-
assert_eq!(jt.dynamic_filter_side(), Right);
342-
}
343-
344-
for jt in [JoinType::Right, JoinType::RightMark] {
345-
assert!(!jt.preserves_left());
346-
assert!(jt.preserves_right());
347-
assert_eq!(jt.dynamic_filter_side(), Left);
348-
}
349-
350-
let jt = JoinType::Full;
351-
assert!(jt.preserves_left());
352-
assert!(jt.preserves_right());
353-
assert_eq!(jt.dynamic_filter_side(), None);
354-
}
355284
}

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1606,7 +1606,7 @@ fn test_hashjoin_handle_child_pushdown_result_dynamic_filter_left() {
16061606
)
16071607
.unwrap();
16081608

1609-
assert_eq!(join.join_type.dynamic_filter_side(), JoinSide::Left);
1609+
assert_eq!(join.dynamic_filter_side(), JoinSide::Left);
16101610

16111611
let keys: Vec<_> = on.iter().map(|(l, _)| l.clone()).collect();
16121612
let df_expr = Arc::new(DynamicFilterPhysicalExpr::new(keys, lit(true)));

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,24 @@ use parking_lot::Mutex;
8686
const HASH_JOIN_SEED: RandomState =
8787
RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64);
8888

89+
fn dynamic_filter_pushdown_side(join_type: JoinType) -> JoinSide {
90+
use JoinSide::*;
91+
92+
let preserves_left = join_type.preserves_left();
93+
let preserves_right = join_type.preserves_right();
94+
95+
match (preserves_left, preserves_right) {
96+
(true, true) => None,
97+
(true, false) => Right,
98+
(false, true) => Left,
99+
(false, false) => match join_type {
100+
JoinType::LeftSemi | JoinType::LeftAnti => Left,
101+
JoinType::RightSemi | JoinType::RightAnti => Right,
102+
_ => Right,
103+
},
104+
}
105+
}
106+
89107
/// HashTable and input data for the left (build side) of a join
90108
pub(super) struct JoinLeftData {
91109
/// The hash table with indices into `batch`
@@ -496,6 +514,11 @@ impl HashJoinExec {
496514
&self.right
497515
}
498516

517+
/// Preferred input side for installing a dynamic filter.
518+
pub fn dynamic_filter_side(&self) -> JoinSide {
519+
dynamic_filter_pushdown_side(self.join_type)
520+
}
521+
499522
/// Set of common columns used to join on
500523
pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)] {
501524
&self.on
@@ -927,7 +950,7 @@ impl ExecutionPlan for HashJoinExec {
927950
}
928951

929952
let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
930-
let df_side = self.join_type.dynamic_filter_side();
953+
let df_side = self.dynamic_filter_side();
931954

932955
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
933956
let left_fut = match self.mode {
@@ -1042,6 +1065,7 @@ impl ExecutionPlan for HashJoinExec {
10421065
on_right,
10431066
self.filter.clone(),
10441067
self.join_type,
1068+
df_side,
10451069
right_stream,
10461070
self.random_state.clone(),
10471071
join_metrics,
@@ -1131,7 +1155,7 @@ impl ExecutionPlan for HashJoinExec {
11311155
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
11321156
config: &ConfigOptions,
11331157
) -> Result<FilterDescription> {
1134-
let df_side = self.join_type.dynamic_filter_side();
1158+
let df_side = self.dynamic_filter_side();
11351159
self.gather_filters_for_pushdown_with_side(phase, parent_filters, config, df_side)
11361160
}
11371161

@@ -1141,7 +1165,7 @@ impl ExecutionPlan for HashJoinExec {
11411165
child_pushdown_result: ChildPushdownResult,
11421166
config: &ConfigOptions,
11431167
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
1144-
let df_side = self.join_type.dynamic_filter_side();
1168+
let df_side = self.dynamic_filter_side();
11451169
self.handle_child_pushdown_result_with_side(
11461170
phase,
11471171
child_pushdown_result,
@@ -1680,7 +1704,7 @@ mod tests {
16801704
#[case] join_type: JoinType,
16811705
#[case] expected_side: JoinSide,
16821706
) {
1683-
assert_eq!(join_type.dynamic_filter_side(), expected_side);
1707+
assert_eq!(dynamic_filter_pushdown_side(join_type), expected_side);
16841708
}
16851709

16861710
#[test]

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,8 @@ pub(super) struct HashJoinStream {
238238
filter: Option<JoinFilter>,
239239
/// type of the join (left, right, semi, etc)
240240
join_type: JoinType,
241+
/// Preferred input side for dynamic filter installation
242+
dynamic_filter_side: JoinSide,
241243
/// right (probe) input
242244
right: SendableRecordBatchStream,
243245
/// Random state used for hashing initialization
@@ -360,6 +362,7 @@ impl HashJoinStream {
360362
on_right: Vec<PhysicalExprRef>,
361363
filter: Option<JoinFilter>,
362364
join_type: JoinType,
365+
dynamic_filter_side: JoinSide,
363366
right: SendableRecordBatchStream,
364367
random_state: RandomState,
365368
join_metrics: BuildProbeJoinMetrics,
@@ -380,6 +383,7 @@ impl HashJoinStream {
380383
on_right,
381384
filter,
382385
join_type,
386+
dynamic_filter_side,
383387
right,
384388
random_state,
385389
join_metrics,
@@ -469,7 +473,7 @@ impl HashJoinStream {
469473
// Dynamic filter coordination between partitions:
470474
// Report bounds to the accumulator which will handle synchronization and filter updates
471475
if let Some(ref bounds_accumulator) = self.bounds_accumulator {
472-
if self.join_type.dynamic_filter_side() == JoinSide::Right {
476+
if self.dynamic_filter_side == JoinSide::Right {
473477
let bounds_accumulator = Arc::clone(bounds_accumulator);
474478

475479
let left_side_partition_id = match self.mode {
@@ -508,7 +512,7 @@ impl HashJoinStream {
508512
match ready!(self.right.poll_next_unpin(cx)) {
509513
None => {
510514
if let Some(ref bounds_accumulator) = self.bounds_accumulator {
511-
if self.join_type.dynamic_filter_side() == JoinSide::Left {
515+
if self.dynamic_filter_side == JoinSide::Left {
512516
if let Some(accs) = self.probe_bounds_accumulators.take() {
513517
let right_bounds = if self.probe_side_row_count > 0 {
514518
Some(

0 commit comments

Comments
 (0)