Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ impl JoinType {
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
)
}
}
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ async fn test_left_mark_join_1k_filtered() {
.await
}

// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
#[tokio::test]
async fn test_right_mark_join_1k() {
JoinFuzzTestCase::new(
Expand All @@ -314,7 +313,7 @@ async fn test_right_mark_join_1k() {
JoinType::RightMark,
None,
)
.run_test(&[NljHj], false)
.run_test(&[HjSmj, NljHj], false)
.await
}

Expand All @@ -326,7 +325,7 @@ async fn test_right_mark_join_1k_filtered() {
JoinType::RightMark,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[NljHj], false)
.run_test(&[HjSmj, NljHj], false)
.await
}

Expand Down
59 changes: 58 additions & 1 deletion datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,61 @@ async fn test_join_with_swap_semi() {
}
}

#[tokio::test]
async fn test_join_with_swap_mark() {
let join_types = [JoinType::LeftMark];
for join_type in join_types {
let (big, small) = create_big_and_small();

let join = HashJoinExec::try_new(
Arc::clone(&big),
Arc::clone(&small),
vec![(
Arc::new(Column::new_with_schema("big_col", &big.schema()).unwrap()),
Arc::new(Column::new_with_schema("small_col", &small.schema()).unwrap()),
)],
None,
&join_type,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
)
.unwrap();

let original_schema = join.schema();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapped_join = optimized_join
.as_any()
.downcast_ref::<HashJoinExec>()
.expect(
"A proj is not required to swap columns back to their original order",
);

assert_eq!(swapped_join.schema().fields().len(), 2);
assert_eq!(
swapped_join
.left()
.partition_statistics(None)
.unwrap()
.total_byte_size,
Precision::Inexact(8192)
);
assert_eq!(
swapped_join
.right()
.partition_statistics(None)
.unwrap()
.total_byte_size,
Precision::Inexact(2097152)
);
assert_eq!(original_schema, swapped_join.schema());
}
}

/// Compare the input plan with the plan after running the probe order optimizer.
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
Expand Down Expand Up @@ -577,8 +632,10 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
join_type,
case::left_semi(JoinType::LeftSemi),
case::left_anti(JoinType::LeftAnti),
case::left_mark(JoinType::LeftMark),
case::right_semi(JoinType::RightSemi),
case::right_anti(JoinType::RightAnti)
case::right_anti(JoinType::RightAnti),
case::right_mark(JoinType::RightMark)
)]
#[tokio::test]
async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,10 @@ pub fn build_join_schema(
);

let (schema1, schema2) = match join_type {
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightMark => (left, right),
_ => (right, left),
};

Expand Down
15 changes: 10 additions & 5 deletions datafusion/physical-optimizer/src/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ pub fn hash_join_swap_subrule(
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark
)
{
input = swap_join_according_to_unboundedness(hash_join)?;
Expand All @@ -549,10 +550,10 @@ pub fn hash_join_swap_subrule(

/// This function swaps sides of a hash join to make it runnable even if one of
/// its inputs are infinite. Note that this is not always possible; i.e.
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
/// we swap join sides. Therefore, we do not consider them here.
/// This function is crate public as it is useful for downstream projects
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`],
/// [`JoinType::RightSemi`], and [`JoinType::RightMark`] can not run with an
/// unbounded left side, even if we swap join sides. Therefore, we do not consider
/// them here. This function is crate public as it is useful for downstream projects
/// to implement, or experiment with, their own join selection rules.
pub(crate) fn swap_join_according_to_unboundedness(
hash_join: &HashJoinExec,
Expand All @@ -562,7 +563,11 @@ pub(crate) fn swap_join_according_to_unboundedness(
match (*partition_mode, *join_type) {
(
_,
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightMark
| JoinType::Full,
) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
(PartitionMode::Partitioned, _) => {
hash_join.swap_inputs(PartitionMode::Partitioned)
Expand Down
5 changes: 4 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,16 @@ impl HashJoinExec {
partition_mode,
self.null_equality(),
)?;
// In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again

// In case of Anti/Semi/Mark joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
if matches!(
self.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
) || self.projection.is_some()
{
Ok(Arc::new(new_join))
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,16 @@ impl NestedLoopJoinExec {
),
)?;

// For Semi/Anti joins, swap result will produce same output schema,
// For Semi/Anti/Mark joins, swap result will produce same output schema,
// no need to wrap them into additional projection
let plan: Arc<dyn ExecutionPlan> = if matches!(
self.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
) || self.projection.is_some()
{
Arc::new(new_join)
Expand Down
Loading