diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e1c5d5424b08..f4b2ed6685290 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -325,11 +325,11 @@ config_namespace! { /// Aggregation ratio (number of distinct groups / number of input rows) /// threshold for skipping partial aggregation. If the value is greater /// then partial aggregation will skip aggregation for further input - pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8 + pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.1 /// Number of input rows partial aggregation partition should process, before /// aggregation ratio check and trying to switch to skipping aggregation mode - pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 + pub skip_partial_aggregation_probe_rows_threshold: usize, default = 0 /// Should DataFusion use row number estimates at the input to decide /// whether increasing parallelism is beneficial or not. By default, diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 998f6184f3213..9517fa6aba68e 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -137,14 +137,6 @@ struct SkipAggregationProbe { /// Flag indicating further updates of `SkipAggregationProbe` state won't /// make any effect (set either while probing or on probing completion) is_locked: bool, - - /// Number of rows where state was output without aggregation. - /// - /// * If 0, all input rows were aggregated (should_skip was always false) - /// - /// * if greater than zero, the number of rows which were output directly - /// without aggregation - skipped_aggregation_rows: metrics::Count, } impl SkipAggregationProbe { @@ -160,7 +152,6 @@ impl SkipAggregationProbe { probe_ratio_threshold, should_skip: false, is_locked: false, - skipped_aggregation_rows, } } @@ -171,26 +162,17 @@ impl SkipAggregationProbe { /// aggregation ratio and sets `should_skip` flag /// - if `should_skip` is set, locks further state updates fn update_state(&mut self, input_rows: usize, num_groups: usize) { - if self.is_locked { - return; - } self.input_rows += input_rows; self.num_groups = num_groups; if self.input_rows >= self.probe_rows_threshold { self.should_skip = self.num_groups as f64 / self.input_rows as f64 >= self.probe_ratio_threshold; - self.is_locked = true; } } fn should_skip(&self) -> bool { self.should_skip } - - /// Record the number of rows that were output directly without aggregation - fn record_skipped(&mut self, batch: &RecordBatch) { - self.skipped_aggregation_rows.add(batch.num_rows()); - } } /// HashTable based Grouping Aggregator @@ -616,7 +598,7 @@ impl Stream for GroupedHashAggregateStream { // Do the grouping extract_ok!(self.group_aggregate_batch(batch)); - + self.update_skip_aggregation_probe(input_rows); // If we can begin emitting rows, do so, @@ -640,10 +622,10 @@ impl Stream for GroupedHashAggregateStream { break 'reading_input; } - extract_ok!(self.emit_early_if_necessary()); - extract_ok!(self.switch_to_skip_aggregation()); + extract_ok!(self.emit_early_if_necessary()); + timer.done(); } @@ -700,9 +682,9 @@ impl Stream for GroupedHashAggregateStream { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { let _timer = elapsed_compute.timer(); - if let Some(probe) = self.skip_aggregation_probe.as_mut() { - probe.record_skipped(&batch); - } + // if let Some(probe) = self.skip_aggregation_probe.as_mut() { + // probe.record_skipped(&batch); + // } let states = self.transform_to_states(batch)?; return Poll::Ready(Some(Ok( states.record_output(&self.baseline_metrics)