diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index c1f7087269b68..a667f0a3c2168 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -329,13 +329,11 @@ impl WindowAggStream { fn compute_aggregates(&self) -> Result { // record compute time on drop let _timer = self.baseline_metrics.elapsed_compute().timer(); - - if self.batches.is_empty() { + let batch = concat_batches(&self.input.schema(), &self.batches)?; + if batch.num_rows() == 0 { return Ok(RecordBatch::new_empty(self.schema.clone())); } - let batch = concat_batches(&self.input.schema(), &self.batches)?; - let partition_by_sort_keys = self .partition_by_sort_keys .iter() diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 08d6f1b32928b..da682abb85099 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -2385,6 +2385,35 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re Ok(()) } +#[tokio::test] +async fn test_window_agg_low_cardinality() -> Result<()> { + let config = SessionConfig::new().with_target_partitions(32); + let ctx = SessionContext::with_config(config); + register_aggregate_csv(&ctx).await?; + let sql = "SELECT + SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as summation1, + SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation2 + FROM aggregate_test_100 + ORDER BY c9 + LIMIT 5"; + + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+------------+-------------+", + "| summation1 | summation2 |", + "+------------+-------------+", + "| -16110 | 61035129 |", + "| 3917 | -108973366 |", + "| -16974 | 623103518 |", + "| -1114 | -1927628110 |", + "| 15673 | -1899175111 |", + "+------------+-------------+", + ]; + assert_batches_eq!(expected, &actual); + + Ok(()) +} + fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> { let ts_field = Field::new("ts", DataType::Int32, false); let inc_field = Field::new("inc_col", DataType::Int32, false);