Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ mod projection_pushdown;
mod replace_with_order_preserving_variants;
mod sanity_checker;
mod test_utils;
mod window_optimize;
89 changes: 89 additions & 0 deletions datafusion/core/tests/physical_optimizer/window_optimize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[cfg(test)]
mod test {
use arrow::array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::Result;
use datafusion_datasource::memory::MemorySourceConfig;
use datafusion_datasource::source::DataSourceExec;
use datafusion_execution::TaskContext;
use datafusion_expr::WindowFrame;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::{col, Column};
use datafusion_physical_expr::window::PlainAggregateWindowExpr;
use datafusion_physical_plan::windows::BoundedWindowAggExec;
use datafusion_physical_plan::{common, ExecutionPlan, InputOrderMode};
use std::sync::Arc;

#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also verified that these tests cover the code change. Without the change in this PR and they fail with

attempt to subtract with overflow
thread 'physical_optimizer::window_optimize::test::test_window_constant_aggregate' panicked at datafusion/expr/src/window_state.rs:95:13:
attempt to subtract with overflow

async fn test_window_constant_aggregate() -> Result<()> {
let source = mock_data()?;
let schema = source.schema();
let c = Arc::new(Column::new("b", 1));
let cnt = AggregateExprBuilder::new(count_udaf(), vec![c])
.schema(schema.clone())
.alias("t")
.build()?;
let parition = [col("a", &schema)?];
let frame = WindowFrame::new(None);
let plain =
PlainAggregateWindowExpr::new(Arc::new(cnt), &parition, &[], Arc::new(frame));

let bounded_agg_exec = BoundedWindowAggExec::try_new(
vec![Arc::new(plain)],
source,
InputOrderMode::Linear,
true,
)?;
let task_ctx = Arc::new(TaskContext::default());
common::collect(bounded_agg_exec.execute(0, task_ctx)?).await?;

Ok(())
}

pub fn mock_data() -> Result<Arc<DataSourceExec>> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![
Some(1),
Some(1),
Some(3),
Some(2),
Some(1),
])),
Arc::new(Int32Array::from(vec![
Some(1),
Some(6),
Some(2),
Some(8),
Some(9),
])),
],
)?;

MemorySourceConfig::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
}
}
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ pub trait AggregateWindowExpr: WindowExpr {
let values = self.evaluate_args(record_batch)?;

if self.is_constant_in_partition() {
if not_end {
let field = self.field()?;
let out_type = field.data_type();
return Ok(new_empty_array(out_type));
}
accumulator.update_batch(&values)?;
let value = accumulator.evaluate()?;
return value.to_array_of_size(record_batch.num_rows());
Expand Down
Loading