-
Notifications
You must be signed in to change notification settings - Fork 2
HashJoin Checkpoint Upstream Diff #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Jay Zhan <[email protected]>
berkaysynnada
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan-synnada I have marked each diff as remove from this PR or keep it in. You will understand our approach after looking at them. We both try to keep the codes same as much as possible, and to not spoil the upstream code.
After updating according to my suggestions, you can open the datafusion PR with the remaining parts in this PR.
|
|
||
| let test2 = BinaryTestCase { | ||
| source_types: (SourceType::Bounded, SourceType::Unbounded), | ||
| expect_fail: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep this change
| }; | ||
| let test2 = BinaryTestCase { | ||
| source_types: (SourceType::Bounded, SourceType::Unbounded), | ||
| expect_fail: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep this change
| probe_threads_counter: AtomicUsize, | ||
| /// Memory reservation that tracks memory used by `hash_map` hash table | ||
| /// `batch`. Cleared on drop. | ||
| _reservation: MemoryReservation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing reservation will be sent to the upstream, but upstream does not need batches_hash_values, it will be kept in Aras only
| batch: RecordBatch, | ||
| visited_indices_bitmap: SharedBitmapBuilder, | ||
| probe_threads_counter: AtomicUsize, | ||
| reservation: MemoryReservation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep reservation removal, but remove batches_hash_values
| batch, | ||
| visited_indices_bitmap, | ||
| probe_threads_counter, | ||
| _reservation: reservation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep reservation removal, but remove batches_hash_values
| Mutex::new(visited_indices_bitmap), | ||
| AtomicUsize::new(probe_threads_count), | ||
| reservation, | ||
| batches_hash_values, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep reservation removal, but remove batches_hash_values
| } | ||
|
|
||
| /// THIS STRUCT IS COMMON, MODIFIED BY ARAS | ||
| /// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this change
| /// THIS STRUCT IS COMMON, MODIFIED BY ARAS | ||
| /// | ||
| /// Container for HashJoinStreamState::ProcessProbeBatch related data | ||
| #[derive(Debug, Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can send this change to decrease diff
| batch, | ||
| offset: (0, None), | ||
| joined_probe_idx: None, | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these changes
| fn columns(schema: &Schema) -> Vec<String> { | ||
| schema.fields().iter().map(|f| f.name().clone()).collect() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this change
Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?