Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,11 @@ impl WindowAggStream {
fn compute_aggregates(&self) -> Result<RecordBatch> {
// record compute time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();

if self.batches.is_empty() {
let batch = concat_batches(&self.input.schema(), &self.batches)?;
if batch.num_rows() == 0 {
return Ok(RecordBatch::new_empty(self.schema.clone()));
}

let batch = concat_batches(&self.input.schema(), &self.batches)?;

let partition_by_sort_keys = self
.partition_by_sort_keys
.iter()
Expand Down
30 changes: 30 additions & 0 deletions datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2385,6 +2385,36 @@ async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Re
Ok(())
}

#[tokio::test]
async fn test_window_agg_low_cardinality() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let config = SessionConfig::new()
.with_target_partitions(32);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
let sql = "SELECT
SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as summation1,
SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation2
FROM aggregate_test_100
ORDER BY c9
LIMIT 5";

let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+------------+-------------+",
"| summation1 | summation2 |",
"+------------+-------------+",
"| -16110 | 61035129 |",
"| 3917 | -108973366 |",
"| -16974 | 623103518 |",
"| -1114 | -1927628110 |",
"| 15673 | -1899175111 |",
"+------------+-------------+",
];
assert_batches_eq!(expected, &actual);

Ok(())
}

fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> {
let ts_field = Field::new("ts", DataType::Int32, false);
let inc_field = Field::new("inc_col", DataType::Int32, false);
Expand Down