Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,15 @@ impl SMJStream {
) {
// The reverse of the selection mask. For the rows not pass join filter above,
// we need to join them (left or right) with null rows for outer joins.
let not_mask = compute::not(mask)?;
let not_mask = if mask.null_count() > 0 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the added test by #9080 again in a new laptop and found this bug. I'm not sure why previously the test passed locally and in CI in #9080. 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-checked the results in sort_merge_join.slt and it should be correct (as it is same as join.slt which is produced by hash join operator).

// If the mask contains nulls, we need to use `prep_null_mask_filter` to
// handle the nulls in the mask as false to produce rows where the mask
// was null itself.
compute::not(&compute::prep_null_mask_filter(mask))?
} else {
compute::not(mask)?
};

let null_joined_batch =
compute::filter_record_batch(&output_batch, &not_mask)?;

Expand Down Expand Up @@ -1254,6 +1262,19 @@ impl SMJStream {

// For full join, we also need to output the null joined rows from the buffered side
if matches!(self.join_type, JoinType::Full) {
// Handle not mask for buffered side further.
// For buffered side, we want to output the rows that are not null joined with
// the streamed side. i.e. the rows that are not null in the `buffered_indices`.
let not_mask = if let Some(nulls) = buffered_indices.nulls() {
let mask = not_mask.values() & nulls.inner();
BooleanArray::new(mask, None)
} else {
not_mask
};

let null_joined_batch =
compute::filter_record_batch(&output_batch, &not_mask)?;

let mut streamed_columns = self
.streamed_schema
.fields()
Expand Down