@@ -275,7 +275,7 @@ struct RecursiveQueryStream {
275275 reservation : MemoryReservation ,
276276 /// If the distinct flag is set, then we use this hash table to remove duplicates from result and work tables
277277 distinct_deduplicator : Option < DistinctDeduplicator > ,
278- // // / Metrics.
278+ /// Metrics.
279279 baseline_metrics : BaselineMetrics ,
280280}
281281
@@ -415,7 +415,6 @@ impl Stream for RecursiveQueryStream {
415415 mut self : std:: pin:: Pin < & mut Self > ,
416416 cx : & mut Context < ' _ > ,
417417 ) -> Poll < Option < Self :: Item > > {
418- // TODO: we should use this poll to record some metrics!
419418 if let Some ( static_stream) = & mut self . static_stream {
420419 // While the static term's stream is available, we'll be forwarding the batches from it (also
421420 // saving them for the initial iteration of the recursive term).
@@ -472,10 +471,13 @@ impl DistinctDeduplicator {
472471 } )
473472 }
474473
474+ /// Remove duplicated rows from the given batch, keeping a state between batches.
475+ ///
476+ /// We use a hash table to allocate new group ids for the new rows.
477+ /// [`GroupValues`] allocate increasing group ids.
478+ /// Hence, if groups (i.e., rows) are now, then they have ids >= length before interning, we keep them.
479+ /// We also detect duplicates by enforcing that group ids are increasing.
475480 fn deduplicate ( & mut self , batch : & RecordBatch ) -> Result < RecordBatch > {
476- // We use the hash table to allocate new group ids.
477- // If they are new, i.e., if they have ids >= length before interning, we keep them.
478- // We also detect duplicates by enforcing that group ids are increasing.
479481 let size_before = self . group_values . len ( ) ;
480482 self . intern_output_buffer . reserve ( batch. num_rows ( ) ) ;
481483 self . group_values
@@ -488,7 +490,7 @@ impl DistinctDeduplicator {
488490 }
489491}
490492
491- /// Return a mask, each element true if the value is greater than all previous ones and greater or equal than the min_value
493+ /// Return a mask, each element being true if, and only if, the element is greater than all previous elements and greater or equal than the provided min_value
492494fn are_increasing_mask ( values : & [ usize ] , mut min_value : usize ) -> BooleanArray {
493495 let mut output = BooleanBuilder :: with_capacity ( values. len ( ) ) ;
494496 for value in values {
0 commit comments