-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Support recursive queries with a distinct 'UNION' #18254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
8539670
cd0701a
6cf4151
48e8e33
af0f140
0af5648
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| start,end | ||
| 1,2 | ||
| 2,3 | ||
| 2,4 | ||
| 2,4 | ||
| 4,1 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,14 +21,18 @@ use std::any::Any; | |
| use std::sync::Arc; | ||
| use std::task::{Context, Poll}; | ||
|
|
||
| use super::work_table::{ReservedBatches, WorkTable, WorkTableExec}; | ||
| use crate::aggregates::group_values::{new_group_values, GroupValues}; | ||
| use crate::aggregates::order::GroupOrdering; | ||
| use crate::execution_plan::{Boundedness, EmissionType}; | ||
| use crate::metrics::RecordOutput; | ||
| use crate::work_table::{ReservedBatches, WorkTable, WorkTableExec}; | ||
| use crate::{ | ||
| metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, | ||
| PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, | ||
| }; | ||
| use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; | ||
|
|
||
| use arrow::array::{BooleanArray, BooleanBuilder}; | ||
| use arrow::compute::filter_record_batch; | ||
| use arrow::datatypes::SchemaRef; | ||
| use arrow::record_batch::RecordBatch; | ||
| use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; | ||
|
|
@@ -195,8 +199,9 @@ impl ExecutionPlan for RecursiveQueryExec { | |
| Arc::clone(&self.work_table), | ||
| Arc::clone(&self.recursive_term), | ||
| static_stream, | ||
| self.is_distinct, | ||
| baseline_metrics, | ||
| ))) | ||
| )?)) | ||
| } | ||
|
|
||
| fn metrics(&self) -> Option<MetricsSet> { | ||
|
|
@@ -268,8 +273,10 @@ struct RecursiveQueryStream { | |
| buffer: Vec<RecordBatch>, | ||
| /// Tracks the memory used by the buffer | ||
| reservation: MemoryReservation, | ||
| /// If the distinct flag is set, then we use this hash table to remove duplicates from result and work tables | ||
| distinct_deduplicator: Option<DistinctDeduplicator>, | ||
| // /// Metrics. | ||
| _baseline_metrics: BaselineMetrics, | ||
| baseline_metrics: BaselineMetrics, | ||
| } | ||
|
|
||
| impl RecursiveQueryStream { | ||
|
|
@@ -279,12 +286,16 @@ impl RecursiveQueryStream { | |
| work_table: Arc<WorkTable>, | ||
| recursive_term: Arc<dyn ExecutionPlan>, | ||
| static_stream: SendableRecordBatchStream, | ||
| is_distinct: bool, | ||
| baseline_metrics: BaselineMetrics, | ||
| ) -> Self { | ||
| ) -> Result<Self> { | ||
| let schema = static_stream.schema(); | ||
| let reservation = | ||
| MemoryConsumer::new("RecursiveQuery").register(task_context.memory_pool()); | ||
| Self { | ||
| let distinct_deduplicator = is_distinct | ||
| .then(|| DistinctDeduplicator::new(Arc::clone(&schema), &task_context)) | ||
| .transpose()?; | ||
| Ok(Self { | ||
| task_context, | ||
| work_table, | ||
| recursive_term, | ||
|
|
@@ -293,21 +304,28 @@ impl RecursiveQueryStream { | |
| schema, | ||
| buffer: vec![], | ||
| reservation, | ||
| _baseline_metrics: baseline_metrics, | ||
| } | ||
| distinct_deduplicator, | ||
| baseline_metrics, | ||
| }) | ||
| } | ||
|
|
||
| /// Push a clone of the given batch to the in memory buffer, and then return | ||
| /// a poll with it. | ||
| fn push_batch( | ||
| mut self: std::pin::Pin<&mut Self>, | ||
| batch: RecordBatch, | ||
| mut batch: RecordBatch, | ||
| ) -> Poll<Option<Result<RecordBatch>>> { | ||
| let baseline_metrics = self.baseline_metrics.clone(); | ||
| if let Some(deduplicator) = &mut self.distinct_deduplicator { | ||
Tpt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let _timer_guard = baseline_metrics.elapsed_compute().timer(); | ||
| batch = deduplicator.deduplicate(&batch)?; | ||
| } | ||
|
|
||
| if let Err(e) = self.reservation.try_grow(batch.get_array_memory_size()) { | ||
| return Poll::Ready(Some(Err(e))); | ||
| } | ||
|
|
||
| self.buffer.push(batch.clone()); | ||
| (&batch).record_output(&baseline_metrics); | ||
| Poll::Ready(Some(Ok(batch))) | ||
| } | ||
|
|
||
|
|
@@ -434,5 +452,55 @@ impl RecordBatchStream for RecursiveQueryStream { | |
| } | ||
| } | ||
|
|
||
| /// Deduplicator based on a hash table. | ||
| struct DistinctDeduplicator { | ||
| /// Grouped rows used for distinct | ||
| group_values: Box<dyn GroupValues>, | ||
| reservation: MemoryReservation, | ||
| intern_output_buffer: Vec<usize>, | ||
| } | ||
|
|
||
| impl DistinctDeduplicator { | ||
| fn new(schema: SchemaRef, task_context: &TaskContext) -> Result<Self> { | ||
| let group_values = new_group_values(schema, &GroupOrdering::None)?; | ||
| let reservation = MemoryConsumer::new("RecursiveQueryHashTable") | ||
| .register(task_context.memory_pool()); | ||
| Ok(Self { | ||
| group_values, | ||
| reservation, | ||
| intern_output_buffer: Vec::new(), | ||
| }) | ||
| } | ||
|
|
||
| fn deduplicate(&mut self, batch: &RecordBatch) -> Result<RecordBatch> { | ||
| // We use the hash table to allocate new group ids. | ||
Tpt marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // If they are new, i.e., if they have ids >= length before interning, we keep them. | ||
| // We also detect duplicates by enforcing that group ids are increasing. | ||
| let size_before = self.group_values.len(); | ||
| self.intern_output_buffer.reserve(batch.num_rows()); | ||
| self.group_values | ||
| .intern(batch.columns(), &mut self.intern_output_buffer)?; | ||
| let mask = are_increasing_mask(&self.intern_output_buffer, size_before); | ||
| self.intern_output_buffer.clear(); | ||
| // We update the reservation to reflect the new size of the hash table. | ||
| self.reservation.try_resize(self.group_values.size())?; | ||
| Ok(filter_record_batch(batch, &mask)?) | ||
| } | ||
| } | ||
|
|
||
| /// Return a mask, each element true if the value is greater than all previous ones and greater or equal than the min_value | ||
| fn are_increasing_mask(values: &[usize], mut min_value: usize) -> BooleanArray { | ||
|
||
| let mut output = BooleanBuilder::with_capacity(values.len()); | ||
| for value in values { | ||
| if *value >= min_value { | ||
| output.append_value(true); | ||
| min_value = *value + 1; // We want to be increasing | ||
| } else { | ||
| output.append_value(false); | ||
| } | ||
| } | ||
| output.finish() | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests {} | ||
Uh oh!
There was an error while loading. Please reload this page.