From 66e0e783b95d08c1b2185887750519a4597350c2 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 12 Jun 2023 18:01:30 +0300 Subject: [PATCH 1/7] minor changes --- datafusion-examples/examples/simple_udwf.rs | 19 ++++++++---- datafusion/core/src/physical_plan/udaf.rs | 14 ++++++++- datafusion/expr/src/accumulator.rs | 6 ++++ datafusion/expr/src/partition_evaluator.rs | 17 +++++++++++ .../physical-expr/src/aggregate/average.rs | 10 +++++-- .../physical-expr/src/aggregate/count.rs | 10 +++++-- .../physical-expr/src/aggregate/min_max.rs | 20 +++++++++---- datafusion/physical-expr/src/aggregate/mod.rs | 10 +++---- datafusion/physical-expr/src/aggregate/sum.rs | 10 +++++-- .../physical-expr/src/window/aggregate.rs | 5 ++-- .../physical-expr/src/window/built_in.rs | 10 ++++--- .../window/built_in_window_function_expr.rs | 30 +++++++++---------- .../physical-expr/src/window/lead_lag.rs | 10 +++++-- .../physical-expr/src/window/nth_value.rs | 20 +++++++++---- datafusion/physical-expr/src/window/rank.rs | 10 +++++-- .../physical-expr/src/window/row_number.rs | 10 +++++-- .../src/window/sliding_aggregate.rs | 5 ++-- 17 files changed, 151 insertions(+), 65 deletions(-) diff --git a/datafusion-examples/examples/simple_udwf.rs b/datafusion-examples/examples/simple_udwf.rs index 8b6d66b5de2a9..a61481b761a08 100644 --- a/datafusion-examples/examples/simple_udwf.rs +++ b/datafusion-examples/examples/simple_udwf.rs @@ -21,12 +21,13 @@ use arrow::{ array::{AsArray, Float64Array}, datatypes::Float64Type, }; +use arrow::array::Array; use arrow_schema::DataType; use datafusion::datasource::file_format::options::CsvReadOptions; use datafusion::error::Result; use datafusion::prelude::*; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::{ partition_evaluator::PartitionEvaluator, Signature, Volatility, WindowUDF, }; @@ -198,12 +199,18 @@ impl PartitionEvaluator for MyPartitionEvaluator { fn evaluate_inside_range( &self, - _values: &[arrow::array::ArrayRef], - _range: &std::ops::Range, + values: &[arrow::array::ArrayRef], + range: &std::ops::Range, ) -> Result { - Err(DataFusionError::NotImplemented( - "evaluate_inside_range is not implemented by default".into(), - )) + let first = ScalarValue::try_from_array(&values[0], range.start)?; + // Err(DataFusionError::NotImplemented( + // "evaluate_inside_range is not implemented by default".into(), + // )) + Ok(first) + } + + fn uses_window_frame(&self) -> bool { + false } } diff --git a/datafusion/core/src/physical_plan/udaf.rs b/datafusion/core/src/physical_plan/udaf.rs index d9f52eba77d0c..7397bf190c2df 100644 --- a/datafusion/core/src/physical_plan/udaf.rs +++ b/datafusion/core/src/physical_plan/udaf.rs @@ -28,7 +28,7 @@ use arrow::{ use super::{expressions::format_state_name, Accumulator, AggregateExpr}; use crate::physical_plan::PhysicalExpr; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; pub use datafusion_expr::AggregateUDF; use datafusion_physical_expr::aggregate::utils::down_cast_any_ref; @@ -106,6 +106,18 @@ impl AggregateExpr for AggregateFunctionExpr { (self.fun.accumulator)(&self.data_type) } + fn create_sliding_accumulator(&self) -> Result> { + let acc = (self.fun.accumulator)(&self.data_type)?; + if acc.supports_bounded_execution() { + Ok(acc) + } else { + Err(DataFusionError::Execution(format!( + "Accumulator: {:?} doesn't support bounded execution", + self.fun.name + ))) + } + } + fn name(&self) -> &str { &self.name } diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs index 7e941d0cff97f..8b2b64caa0079 100644 --- a/datafusion/expr/src/accumulator.rs +++ b/datafusion/expr/src/accumulator.rs @@ -81,6 +81,12 @@ pub trait Accumulator: Send + Sync + Debug { /// Returns the final aggregate value based on its current state. fn evaluate(&self) -> Result; + /// Specifies whether this aggregate function can run using bounded memory. + /// Any accumulator returning "true" needs to implement `retract_batch`. + fn supports_bounded_execution(&self) -> bool { + false + } + /// Allocated size required for this accumulator, in bytes, including `Self`. /// Allocated means that for internal containers such as `Vec`, the `capacity` should be used /// not the `len` diff --git a/datafusion/expr/src/partition_evaluator.rs b/datafusion/expr/src/partition_evaluator.rs index 87261db4addbf..9ca37db57828b 100644 --- a/datafusion/expr/src/partition_evaluator.rs +++ b/datafusion/expr/src/partition_evaluator.rs @@ -224,4 +224,21 @@ pub trait PartitionEvaluator: Debug + Send { "evaluate_inside_range is not implemented by default".into(), )) } + + /// Does the window function use the values from its window frame? + /// + /// If this function returns true, [`Self::create_evaluator`] must + /// implement [`PartitionEvaluator::evaluate_inside_range`] + fn uses_window_frame(&self) -> bool { + false + } + + /// Can the window function be incrementally computed using + /// bounded memory? + /// + /// If this function returns true, [`Self::create_evaluator`] must + /// implement [`PartitionEvaluator::evaluate_stateful`] + fn supports_bounded_execution(&self) -> bool { + false + } } diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 2fe44602d831a..2cf75292ad32c 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -134,9 +134,9 @@ impl AggregateExpr for Avg { is_row_accumulator_support_dtype(&self.sum_data_type) } - fn supports_bounded_execution(&self) -> bool { - true - } + // fn supports_bounded_execution(&self) -> bool { + // true + // } fn create_row_accumulator( &self, @@ -257,6 +257,10 @@ impl Accumulator for AvgAccumulator { } } + fn supports_bounded_execution(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size() } diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 15df28b4e38ad..9febd99f75e6e 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -133,9 +133,9 @@ impl AggregateExpr for Count { true } - fn supports_bounded_execution(&self) -> bool { - true - } + // fn supports_bounded_execution(&self) -> bool { + // true + // } fn create_row_accumulator( &self, @@ -214,6 +214,10 @@ impl Accumulator for CountAccumulator { Ok(ScalarValue::Int64(Some(self.count))) } + fn supports_bounded_execution(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) } diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index f811dae7b5609..6d16f0efdc20d 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -125,9 +125,9 @@ impl AggregateExpr for Max { is_row_accumulator_support_dtype(&self.data_type) } - fn supports_bounded_execution(&self) -> bool { - true - } + // fn supports_bounded_execution(&self) -> bool { + // true + // } fn create_row_accumulator( &self, @@ -699,6 +699,10 @@ impl Accumulator for SlidingMaxAccumulator { Ok(self.max.clone()) } + fn supports_bounded_execution(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size() } @@ -825,9 +829,9 @@ impl AggregateExpr for Min { is_row_accumulator_support_dtype(&self.data_type) } - fn supports_bounded_execution(&self) -> bool { - true - } + // fn supports_bounded_execution(&self) -> bool { + // true + // } fn create_row_accumulator( &self, @@ -958,6 +962,10 @@ impl Accumulator for SlidingMinAccumulator { Ok(self.min.clone()) } + fn supports_bounded_execution(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size() } diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 09fd9bcfc524a..1bb04198777e9 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -96,11 +96,11 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { false } - /// Specifies whether this aggregate function can run using bounded memory. - /// Any accumulator returning "true" needs to implement `retract_batch`. - fn supports_bounded_execution(&self) -> bool { - false - } + // /// Specifies whether this aggregate function can run using bounded memory. + // /// Any accumulator returning "true" needs to implement `retract_batch`. + // fn supports_bounded_execution(&self) -> bool { + // false + // } /// RowAccumulator to access/update row-based aggregation state in-place. /// Currently, row accumulator only supports states of fixed-sized type. diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 1c70dc67beeb8..bbfb3719f0f68 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -131,9 +131,9 @@ impl AggregateExpr for Sum { is_row_accumulator_support_dtype(&self.data_type) } - fn supports_bounded_execution(&self) -> bool { - true - } + // fn supports_bounded_execution(&self) -> bool { + // true + // } fn create_row_accumulator( &self, @@ -361,6 +361,10 @@ impl Accumulator for SumAccumulator { } } + fn supports_bounded_execution(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size() } diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index c8a4797a52880..f12063fa3b4bd 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -155,8 +155,9 @@ impl WindowExpr for PlainAggregateWindowExpr { } fn uses_bounded_memory(&self) -> bool { - self.aggregate.supports_bounded_execution() - && !self.window_frame.end_bound.is_unbounded() + let supports_bounded_execution = + self.aggregate.create_sliding_accumulator().is_ok(); + supports_bounded_execution && !self.window_frame.end_bound.is_unbounded() } } diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index 4e5665f0a62ef..a535697f6e239 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -36,6 +36,7 @@ use arrow::datatypes::Field; use arrow::record_batch::RecordBatch; use datafusion_common::utils::evaluate_partition_ranges; use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::partition_evaluator::PartitionEvaluator; use datafusion_expr::window_frame_state::WindowFrameContext; use datafusion_expr::WindowFrame; @@ -99,7 +100,7 @@ impl WindowExpr for BuiltInWindowExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let evaluator = self.expr.create_evaluator()?; let num_rows = batch.num_rows(); - if self.expr.uses_window_frame() { + if evaluator.uses_window_frame() { let sort_options: Vec = self.order_by.iter().map(|o| o.options).collect(); let mut row_wise_results = vec![]; @@ -174,7 +175,7 @@ impl WindowExpr for BuiltInWindowExpr { }; let mut row_wise_results: Vec = vec![]; for idx in state.last_calculated_index..num_rows { - let frame_range = if self.expr.uses_window_frame() { + let frame_range = if evaluator.uses_window_frame() { state .window_frame_ctx .get_or_insert_with(|| { @@ -248,8 +249,9 @@ impl WindowExpr for BuiltInWindowExpr { } fn uses_bounded_memory(&self) -> bool { - self.expr.supports_bounded_execution() - && (!self.expr.uses_window_frame() + let evaluator = self.expr.create_evaluator().unwrap(); + evaluator.supports_bounded_execution() + && (!evaluator.uses_window_frame() || !self.window_frame.end_bound.is_unbounded()) } } diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs index 9f6694dc77d55..3f513ef1bdf65 100644 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs @@ -77,20 +77,20 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { None } - /// Can the window function be incrementally computed using - /// bounded memory? - /// - /// If this function returns true, [`Self::create_evaluator`] must - /// implement [`PartitionEvaluator::evaluate_stateful`] - fn supports_bounded_execution(&self) -> bool { - false - } + // /// Can the window function be incrementally computed using + // /// bounded memory? + // /// + // /// If this function returns true, [`Self::create_evaluator`] must + // /// implement [`PartitionEvaluator::evaluate_stateful`] + // fn supports_bounded_execution(&self) -> bool { + // false + // } - /// Does the window function use the values from its window frame? - /// - /// If this function returns true, [`Self::create_evaluator`] must - /// implement [`PartitionEvaluator::evaluate_inside_range`] - fn uses_window_frame(&self) -> bool { - false - } + // /// Does the window function use the values from its window frame? + // /// + // /// If this function returns true, [`Self::create_evaluator`] must + // /// implement [`PartitionEvaluator::evaluate_inside_range`] + // fn uses_window_frame(&self) -> bool { + // false + // } } diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index 5414d6dff98c3..bdb7350af20f5 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -110,9 +110,9 @@ impl BuiltInWindowFunctionExpr for WindowShift { })) } - fn supports_bounded_execution(&self) -> bool { - true - } + // fn supports_bounded_execution(&self) -> bool { + // true + // } fn reverse_expr(&self) -> Option> { Some(Arc::new(Self { @@ -231,6 +231,10 @@ impl PartitionEvaluator for WindowShiftEvaluator { let value = &values[0]; shift_with_default_value(value, self.shift_offset, self.default_value.as_ref()) } + + fn supports_bounded_execution(&self) -> bool { + true + } } fn get_default_value( diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 5dbf34bee8543..6e279f2d0d0c7 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -122,13 +122,13 @@ impl BuiltInWindowFunctionExpr for NthValue { Ok(Box::new(NthValueEvaluator { state })) } - fn supports_bounded_execution(&self) -> bool { - true - } + // fn supports_bounded_execution(&self) -> bool { + // true + // } - fn uses_window_frame(&self) -> bool { - true - } + // fn uses_window_frame(&self) -> bool { + // true + // } fn reverse_expr(&self) -> Option> { let reversed_kind = match self.kind { @@ -215,6 +215,14 @@ impl PartitionEvaluator for NthValueEvaluator { } } } + + fn uses_window_frame(&self) -> bool { + true + } + + fn supports_bounded_execution(&self) -> bool { + true + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index 81d8c71071081..ba8de5aba34ff 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -100,9 +100,9 @@ impl BuiltInWindowFunctionExpr for Rank { &self.name } - fn supports_bounded_execution(&self) -> bool { - matches!(self.rank_type, RankType::Basic | RankType::Dense) - } + // fn supports_bounded_execution(&self) -> bool { + // matches!(self.rank_type, RankType::Basic | RankType::Dense) + // } fn create_evaluator(&self) -> Result> { Ok(Box::new(RankEvaluator { @@ -221,6 +221,10 @@ impl PartitionEvaluator for RankEvaluator { }; Ok(result) } + + fn supports_bounded_execution(&self) -> bool { + matches!(self.rank_type, RankType::Basic | RankType::Dense) + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index 229504bbd7f27..7e8552252f40c 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -65,9 +65,9 @@ impl BuiltInWindowFunctionExpr for RowNumber { Ok(Box::::default()) } - fn supports_bounded_execution(&self) -> bool { - true - } + // fn supports_bounded_execution(&self) -> bool { + // true + // } } #[derive(Default, Debug)] @@ -100,6 +100,10 @@ impl PartitionEvaluator for NumRowsEvaluator { 1..(num_rows as u64) + 1, ))) } + + fn supports_bounded_execution(&self) -> bool { + true + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 709f8d23be366..bc5dc92536e9e 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -139,8 +139,9 @@ impl WindowExpr for SlidingAggregateWindowExpr { } fn uses_bounded_memory(&self) -> bool { - self.aggregate.supports_bounded_execution() - && !self.window_frame.end_bound.is_unbounded() + let supports_bounded_execution = + self.aggregate.create_sliding_accumulator().is_ok(); + supports_bounded_execution && !self.window_frame.end_bound.is_unbounded() } } From 6de0cc1d296892fea00120215115c4e47050d4d6 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 13 Jun 2023 10:12:50 +0300 Subject: [PATCH 2/7] Use case for first_value (uses window frame) --- datafusion-examples/examples/simple_udwf.rs | 57 ++++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/datafusion-examples/examples/simple_udwf.rs b/datafusion-examples/examples/simple_udwf.rs index a61481b761a08..791bf6858fd60 100644 --- a/datafusion-examples/examples/simple_udwf.rs +++ b/datafusion-examples/examples/simple_udwf.rs @@ -17,11 +17,11 @@ use std::sync::Arc; +use arrow::array::Array; use arrow::{ array::{AsArray, Float64Array}, datatypes::Float64Type, }; -use arrow::array::Array; use arrow_schema::DataType; use datafusion::datasource::file_format::options::CsvReadOptions; @@ -54,6 +54,9 @@ async fn main() -> Result<()> { // register the window function with DataFusion so wecan call it ctx.register_udwf(my_average()); + // register the window function with DataFusion so wecan call it + ctx.register_udwf(my_first_value()); + // Use SQL to run the new window function let df = ctx.sql("SELECT * from cars").await?; // print the results @@ -68,6 +71,7 @@ async fn main() -> Result<()> { speed, \ lag(speed, 1) OVER (PARTITION BY car ORDER BY time),\ my_average(speed) OVER (PARTITION BY car ORDER BY time),\ + my_first_value(speed) OVER (PARTITION BY car ORDER BY time),\ time \ from cars", ) @@ -197,21 +201,60 @@ impl PartitionEvaluator for MyPartitionEvaluator { )) } + fn evaluate_inside_range( + &self, + _values: &[arrow::array::ArrayRef], + _range: &std::ops::Range, + ) -> Result { + Err(DataFusionError::NotImplemented( + "evaluate_inside_range is not implemented by default".into(), + )) + } +} + +fn my_first_value() -> WindowUDF { + WindowUDF { + name: String::from("my_first_value"), + // it will take 2 arguments -- the column and the window size + signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable), + return_type: Arc::new(return_type), + partition_evaluator: Arc::new(make_partition_evaluator_first_value), + } +} + +/// Create a partition evaluator for this argument +fn make_partition_evaluator_first_value() -> Result> { + Ok(Box::new(MyFirstValue::new())) +} + +#[derive(Clone, Debug)] +struct MyFirstValue {} + +impl MyFirstValue { + fn new() -> Self { + Self {} + } +} + +// TODO show how to use other evaluate methods +/// These different evaluation methods are called depending on the various settings of WindowUDF +impl PartitionEvaluator for MyFirstValue { + fn get_range(&self, _idx: usize, _n_rows: usize) -> Result> { + Err(DataFusionError::NotImplemented( + "get_range is not implemented for this window function".to_string(), + )) + } + fn evaluate_inside_range( &self, values: &[arrow::array::ArrayRef], range: &std::ops::Range, ) -> Result { let first = ScalarValue::try_from_array(&values[0], range.start)?; - // Err(DataFusionError::NotImplemented( - // "evaluate_inside_range is not implemented by default".into(), - // )) Ok(first) } fn uses_window_frame(&self) -> bool { - false + true } } - -// TODO show how to use other evaluate methods From c61343427bd5b312ed367da0f1d04e11b1e338f0 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 13 Jun 2023 10:32:52 +0300 Subject: [PATCH 3/7] Bounded implementation example --- datafusion-examples/examples/simple_udwf.rs | 69 ++++++++++++++++++++- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/datafusion-examples/examples/simple_udwf.rs b/datafusion-examples/examples/simple_udwf.rs index 791bf6858fd60..29419dcdb1e37 100644 --- a/datafusion-examples/examples/simple_udwf.rs +++ b/datafusion-examples/examples/simple_udwf.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use arrow::array::Array; +use arrow::array::{Array, ArrayRef, UInt64Array}; use arrow::{ array::{AsArray, Float64Array}, datatypes::Float64Type, @@ -53,9 +53,8 @@ async fn main() -> Result<()> { // register the window function with DataFusion so wecan call it ctx.register_udwf(my_average()); - - // register the window function with DataFusion so wecan call it ctx.register_udwf(my_first_value()); + ctx.register_udwf(my_odd_row_number()); // Use SQL to run the new window function let df = ctx.sql("SELECT * from cars").await?; @@ -72,6 +71,21 @@ async fn main() -> Result<()> { lag(speed, 1) OVER (PARTITION BY car ORDER BY time),\ my_average(speed) OVER (PARTITION BY car ORDER BY time),\ my_first_value(speed) OVER (PARTITION BY car ORDER BY time),\ + my_odd_row_number(speed) OVER (PARTITION BY car ORDER BY time), \ + time \ + from cars", + ) + .await?; + // print the results + df.show().await?; + + // When all of the window functions support bounded, BoundedWindowAggExec will be used + // In this case evaluate_stateful method will be called + let df = ctx + .sql( + "SELECT car, \ + speed, \ + my_odd_row_number(speed) OVER (PARTITION BY car ORDER BY time), \ time \ from cars", ) @@ -227,6 +241,21 @@ fn make_partition_evaluator_first_value() -> Result> Ok(Box::new(MyFirstValue::new())) } +fn my_odd_row_number() -> WindowUDF { + WindowUDF { + name: String::from("my_odd_row_number"), + // it will take 2 arguments -- the column and the window size + signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable), + return_type: Arc::new(|_| Ok(Arc::new(DataType::UInt64))), + partition_evaluator: Arc::new(make_partition_evaluator_odd_row_number), + } +} + +/// Create a partition evaluator for this argument +fn make_partition_evaluator_odd_row_number() -> Result> { + Ok(Box::new(OddRowNumber::new())) +} + #[derive(Clone, Debug)] struct MyFirstValue {} @@ -258,3 +287,37 @@ impl PartitionEvaluator for MyFirstValue { true } } + +#[derive(Clone, Debug)] +struct OddRowNumber { + row_idx: usize +} + +impl OddRowNumber { + fn new() -> Self { + Self {row_idx: 1} + } +} + +// TODO show how to use other evaluate methods +/// These different evaluation methods are called depending on the various settings of WindowUDF +impl PartitionEvaluator for OddRowNumber { + fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { + Ok(std::ops::Range{start: idx, end: idx+1}) + } + + fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { + Ok(Arc::new(UInt64Array::from_iter_values( + (0..(num_rows as u64)).into_iter().map(|val| val*2+1), + ))) + } + + fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { + self.row_idx += 2; + Ok(ScalarValue::UInt64(Some(self.row_idx as u64))) + } + + fn supports_bounded_execution(&self) -> bool { + true + } +} From 000fc3ce491b787207be2df91a09d344bab9b2bb Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 13 Jun 2023 10:36:51 +0300 Subject: [PATCH 4/7] remove leftovers --- datafusion-examples/examples/simple_udwf.rs | 13 ++++++++----- datafusion/physical-expr/src/aggregate/average.rs | 4 ---- datafusion/physical-expr/src/aggregate/count.rs | 4 ---- datafusion/physical-expr/src/aggregate/min_max.rs | 8 -------- datafusion/physical-expr/src/aggregate/mod.rs | 6 ------ datafusion/physical-expr/src/aggregate/sum.rs | 4 ---- 6 files changed, 8 insertions(+), 31 deletions(-) diff --git a/datafusion-examples/examples/simple_udwf.rs b/datafusion-examples/examples/simple_udwf.rs index 29419dcdb1e37..b7edc2fab812e 100644 --- a/datafusion-examples/examples/simple_udwf.rs +++ b/datafusion-examples/examples/simple_udwf.rs @@ -290,12 +290,12 @@ impl PartitionEvaluator for MyFirstValue { #[derive(Clone, Debug)] struct OddRowNumber { - row_idx: usize + row_idx: usize, } impl OddRowNumber { fn new() -> Self { - Self {row_idx: 1} + Self { row_idx: 1 } } } @@ -303,18 +303,21 @@ impl OddRowNumber { /// These different evaluation methods are called depending on the various settings of WindowUDF impl PartitionEvaluator for OddRowNumber { fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { - Ok(std::ops::Range{start: idx, end: idx+1}) + Ok(std::ops::Range { + start: idx, + end: idx + 1, + }) } fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { Ok(Arc::new(UInt64Array::from_iter_values( - (0..(num_rows as u64)).into_iter().map(|val| val*2+1), + (0..(num_rows as u64)).into_iter().map(|val| val * 2 + 1), ))) } fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { self.row_idx += 2; - Ok(ScalarValue::UInt64(Some(self.row_idx as u64))) + Ok(ScalarValue::UInt64(Some(self.row_idx as u64))) } fn supports_bounded_execution(&self) -> bool { diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 2cf75292ad32c..97312dcd8c11a 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -134,10 +134,6 @@ impl AggregateExpr for Avg { is_row_accumulator_support_dtype(&self.sum_data_type) } - // fn supports_bounded_execution(&self) -> bool { - // true - // } - fn create_row_accumulator( &self, start_index: usize, diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 9febd99f75e6e..7bd6892843fe4 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -133,10 +133,6 @@ impl AggregateExpr for Count { true } - // fn supports_bounded_execution(&self) -> bool { - // true - // } - fn create_row_accumulator( &self, start_index: usize, diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 6d16f0efdc20d..35a699313e97f 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -125,10 +125,6 @@ impl AggregateExpr for Max { is_row_accumulator_support_dtype(&self.data_type) } - // fn supports_bounded_execution(&self) -> bool { - // true - // } - fn create_row_accumulator( &self, start_index: usize, @@ -829,10 +825,6 @@ impl AggregateExpr for Min { is_row_accumulator_support_dtype(&self.data_type) } - // fn supports_bounded_execution(&self) -> bool { - // true - // } - fn create_row_accumulator( &self, start_index: usize, diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 1bb04198777e9..7d2316c532a0a 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -96,12 +96,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { false } - // /// Specifies whether this aggregate function can run using bounded memory. - // /// Any accumulator returning "true" needs to implement `retract_batch`. - // fn supports_bounded_execution(&self) -> bool { - // false - // } - /// RowAccumulator to access/update row-based aggregation state in-place. /// Currently, row accumulator only supports states of fixed-sized type. /// diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index bbfb3719f0f68..266dcadfd7387 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -131,10 +131,6 @@ impl AggregateExpr for Sum { is_row_accumulator_support_dtype(&self.data_type) } - // fn supports_bounded_execution(&self) -> bool { - // true - // } - fn create_row_accumulator( &self, start_index: usize, From dc7edf94ba5efeb588d87c584e5e7cf836672c87 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 13 Jun 2023 10:44:28 +0300 Subject: [PATCH 5/7] Remove leftovers --- .../src/window/built_in_window_function_expr.rs | 17 ----------------- datafusion/physical-expr/src/window/lead_lag.rs | 4 ---- .../physical-expr/src/window/nth_value.rs | 8 -------- datafusion/physical-expr/src/window/rank.rs | 4 ---- .../physical-expr/src/window/row_number.rs | 4 ---- 5 files changed, 37 deletions(-) diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs index 3f513ef1bdf65..2d17991ea544d 100644 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs @@ -76,21 +76,4 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { fn reverse_expr(&self) -> Option> { None } - - // /// Can the window function be incrementally computed using - // /// bounded memory? - // /// - // /// If this function returns true, [`Self::create_evaluator`] must - // /// implement [`PartitionEvaluator::evaluate_stateful`] - // fn supports_bounded_execution(&self) -> bool { - // false - // } - - // /// Does the window function use the values from its window frame? - // /// - // /// If this function returns true, [`Self::create_evaluator`] must - // /// implement [`PartitionEvaluator::evaluate_inside_range`] - // fn uses_window_frame(&self) -> bool { - // false - // } } diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index bdb7350af20f5..2c1b6beff3ced 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -110,10 +110,6 @@ impl BuiltInWindowFunctionExpr for WindowShift { })) } - // fn supports_bounded_execution(&self) -> bool { - // true - // } - fn reverse_expr(&self) -> Option> { Some(Arc::new(Self { name: self.name.clone(), diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 6e279f2d0d0c7..f7b23c803e299 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -122,14 +122,6 @@ impl BuiltInWindowFunctionExpr for NthValue { Ok(Box::new(NthValueEvaluator { state })) } - // fn supports_bounded_execution(&self) -> bool { - // true - // } - - // fn uses_window_frame(&self) -> bool { - // true - // } - fn reverse_expr(&self) -> Option> { let reversed_kind = match self.kind { NthValueKind::First => NthValueKind::Last, diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index ba8de5aba34ff..98852081a6886 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -100,10 +100,6 @@ impl BuiltInWindowFunctionExpr for Rank { &self.name } - // fn supports_bounded_execution(&self) -> bool { - // matches!(self.rank_type, RankType::Basic | RankType::Dense) - // } - fn create_evaluator(&self) -> Result> { Ok(Box::new(RankEvaluator { state: RankState::default(), diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index 7e8552252f40c..1db7d2528ed05 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -64,10 +64,6 @@ impl BuiltInWindowFunctionExpr for RowNumber { fn create_evaluator(&self) -> Result> { Ok(Box::::default()) } - - // fn supports_bounded_execution(&self) -> bool { - // true - // } } #[derive(Default, Debug)] From 84bdf28f0c5d570fc2adcd2a40a012c2fe9bf620 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 13 Jun 2023 11:00:28 +0300 Subject: [PATCH 6/7] Change get_range implementation --- datafusion-examples/examples/simple_udwf.rs | 19 ------------------- datafusion/expr/src/partition_evaluator.rs | 18 ++++++++++++++---- datafusion/physical-expr/src/window/rank.rs | 6 ------ .../physical-expr/src/window/row_number.rs | 6 ------ 4 files changed, 14 insertions(+), 35 deletions(-) diff --git a/datafusion-examples/examples/simple_udwf.rs b/datafusion-examples/examples/simple_udwf.rs index b7edc2fab812e..8ab0352c3ba4d 100644 --- a/datafusion-examples/examples/simple_udwf.rs +++ b/datafusion-examples/examples/simple_udwf.rs @@ -151,12 +151,6 @@ impl MyPartitionEvaluator { /// These different evaluation methods are called depending on the various settings of WindowUDF impl PartitionEvaluator for MyPartitionEvaluator { - fn get_range(&self, _idx: usize, _n_rows: usize) -> Result> { - Err(DataFusionError::NotImplemented( - "get_range is not implemented for this window function".to_string(), - )) - } - /// This function is given the values of each partition fn evaluate( &self, @@ -268,12 +262,6 @@ impl MyFirstValue { // TODO show how to use other evaluate methods /// These different evaluation methods are called depending on the various settings of WindowUDF impl PartitionEvaluator for MyFirstValue { - fn get_range(&self, _idx: usize, _n_rows: usize) -> Result> { - Err(DataFusionError::NotImplemented( - "get_range is not implemented for this window function".to_string(), - )) - } - fn evaluate_inside_range( &self, values: &[arrow::array::ArrayRef], @@ -302,13 +290,6 @@ impl OddRowNumber { // TODO show how to use other evaluate methods /// These different evaluation methods are called depending on the various settings of WindowUDF impl PartitionEvaluator for OddRowNumber { - fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { - Ok(std::ops::Range { - start: idx, - end: idx + 1, - }) - } - fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { Ok(Arc::new(UInt64Array::from_iter_values( (0..(num_rows as u64)).into_iter().map(|val| val * 2 + 1), diff --git a/datafusion/expr/src/partition_evaluator.rs b/datafusion/expr/src/partition_evaluator.rs index 9ca37db57828b..77409a6df1238 100644 --- a/datafusion/expr/src/partition_evaluator.rs +++ b/datafusion/expr/src/partition_evaluator.rs @@ -147,10 +147,20 @@ pub trait PartitionEvaluator: Debug + Send { /// /// `idx`: is the index of last row for which result is calculated. /// `n_rows`: is the number of rows of the input record batch (Used during bounds check) - fn get_range(&self, _idx: usize, _n_rows: usize) -> Result> { - Err(DataFusionError::NotImplemented( - "get_range is not implemented for this window function".to_string(), - )) + /// If `uses_window_frame` flag is `false`. This method is used to calculate required range for the window function + /// Generally there is no required range, hence by default this returns smallest range(current row). e.g seeing current row + /// is enough to calculate window result (such as row_number, rank, etc) + fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { + if self.uses_window_frame() { + Err(DataFusionError::Execution( + "Range should be calculated from window frame".to_string(), + )) + } else { + Ok(Range { + start: idx, + end: idx + 1, + }) + } } /// Called for window functions that *do not use* values from the diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index 98852081a6886..100e3ef944730 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -115,12 +115,6 @@ pub(crate) struct RankEvaluator { } impl PartitionEvaluator for RankEvaluator { - fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { - let start = idx; - let end = idx + 1; - Ok(Range { start, end }) - } - fn state( &self, ) -> Result>, DataFusionError> { diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index 1db7d2528ed05..6783a4d5318c5 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -79,12 +79,6 @@ impl PartitionEvaluator for NumRowsEvaluator { )))) } - fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { - let start = idx; - let end = idx + 1; - Ok(Range { start, end }) - } - /// evaluate window function result inside given range fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { self.state.n_rows += 1; From 828de45ff8624751788d22e7673519fba219613a Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 13 Jun 2023 13:15:47 +0300 Subject: [PATCH 7/7] Combine evaluate_stateful and evaluate_inside_ranges --- datafusion-examples/examples/simple_udwf.rs | 39 +++++--------- datafusion/expr/src/partition_evaluator.rs | 44 +++++++-------- .../physical-expr/src/window/built_in.rs | 9 ++-- .../physical-expr/src/window/lead_lag.rs | 10 ++-- .../physical-expr/src/window/nth_value.rs | 54 +++++++++---------- datafusion/physical-expr/src/window/ntile.rs | 2 +- datafusion/physical-expr/src/window/rank.rs | 6 ++- .../physical-expr/src/window/row_number.rs | 12 +++-- 8 files changed, 84 insertions(+), 92 deletions(-) diff --git a/datafusion-examples/examples/simple_udwf.rs b/datafusion-examples/examples/simple_udwf.rs index 8ab0352c3ba4d..a65a6236c0546 100644 --- a/datafusion-examples/examples/simple_udwf.rs +++ b/datafusion-examples/examples/simple_udwf.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::ops::Range; use std::sync::Arc; use arrow::array::{Array, ArrayRef, UInt64Array}; @@ -29,7 +30,8 @@ use datafusion::error::Result; use datafusion::prelude::*; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::{ - partition_evaluator::PartitionEvaluator, Signature, Volatility, WindowUDF, + partition_evaluator::PartitionEvaluator, Accumulator, Signature, Volatility, + WindowUDF, }; // create local execution context with `cars.csv` registered as a table named `cars` @@ -152,8 +154,8 @@ impl MyPartitionEvaluator { /// These different evaluation methods are called depending on the various settings of WindowUDF impl PartitionEvaluator for MyPartitionEvaluator { /// This function is given the values of each partition - fn evaluate( - &self, + fn evaluate_all( + &mut self, values: &[arrow::array::ArrayRef], _num_rows: usize, ) -> Result { @@ -190,12 +192,13 @@ impl PartitionEvaluator for MyPartitionEvaluator { Ok(Arc::new(new_values)) } - fn evaluate_stateful( + fn evaluate( &mut self, _values: &[arrow::array::ArrayRef], + _range: &std::ops::Range, ) -> Result { Err(DataFusionError::NotImplemented( - "evaluate_stateful is not implemented by default".into(), + "evaluate is not implemented by default".into(), )) } @@ -209,15 +212,6 @@ impl PartitionEvaluator for MyPartitionEvaluator { )) } - fn evaluate_inside_range( - &self, - _values: &[arrow::array::ArrayRef], - _range: &std::ops::Range, - ) -> Result { - Err(DataFusionError::NotImplemented( - "evaluate_inside_range is not implemented by default".into(), - )) - } } fn my_first_value() -> WindowUDF { @@ -262,11 +256,11 @@ impl MyFirstValue { // TODO show how to use other evaluate methods /// These different evaluation methods are called depending on the various settings of WindowUDF impl PartitionEvaluator for MyFirstValue { - fn evaluate_inside_range( - &self, + fn evaluate( + &mut self, values: &[arrow::array::ArrayRef], range: &std::ops::Range, - ) -> Result { + ) -> Result { let first = ScalarValue::try_from_array(&values[0], range.start)?; Ok(first) } @@ -290,15 +284,10 @@ impl OddRowNumber { // TODO show how to use other evaluate methods /// These different evaluation methods are called depending on the various settings of WindowUDF impl PartitionEvaluator for OddRowNumber { - fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { - Ok(Arc::new(UInt64Array::from_iter_values( - (0..(num_rows as u64)).into_iter().map(|val| val * 2 + 1), - ))) - } - - fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { + fn evaluate(&mut self, _values: &[ArrayRef], _range: &Range,) -> Result { + let res = Ok(ScalarValue::UInt64(Some(self.row_idx as u64))); self.row_idx += 2; - Ok(ScalarValue::UInt64(Some(self.row_idx as u64))) + res } fn supports_bounded_execution(&self) -> bool { diff --git a/datafusion/expr/src/partition_evaluator.rs b/datafusion/expr/src/partition_evaluator.rs index 77409a6df1238..f38bc56ceac13 100644 --- a/datafusion/expr/src/partition_evaluator.rs +++ b/datafusion/expr/src/partition_evaluator.rs @@ -80,19 +80,19 @@ pub trait PartitionState { /// /// # Stateless `PartitionEvaluator` /// -/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or +/// In this case, [`Self::evaluate_all`], [`Self::evaluate_with_rank`] or /// [`Self::evaluate_inside_range`] is called with values for the /// entire partition. /// /// # Stateful `PartitionEvaluator` /// -/// In this case, [`Self::evaluate_stateful`] is called to calculate +/// In this case, [`Self::evaluate`] is called to calculate /// the results of the window function incrementally for each new /// batch, saving and restoring any state needed to do so as /// [`BuiltinWindowState`]. /// /// For example, when computing `ROW_NUMBER` incrementally, -/// [`Self::evaluate_stateful`] will be called multiple times with +/// [`Self::evaluate`] will be called multiple times with /// different batches. For all batches after the first, the output /// `row_number` must start from last `row_number` produced for the /// previous batch. The previous row number is saved and restored as @@ -166,18 +166,27 @@ pub trait PartitionEvaluator: Debug + Send { /// Called for window functions that *do not use* values from the /// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, /// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`). - fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result { - Err(DataFusionError::NotImplemented( - "evaluate is not implemented by default".into(), - )) + fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { + if !self.uses_window_frame() && self.supports_bounded_execution(){ + let res = (0..num_rows).into_iter().map(|idx| self.evaluate(values, &Range{start: 0, end: 1})).collect::>>()?; + ScalarValue::iter_to_array(res.into_iter()) + }else { + Err(DataFusionError::NotImplemented( + "evaluate_all is not implemented by default".into(), + )) + } } /// Evaluate window function result inside given range. /// /// Only used for stateful evaluation - fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { + fn evaluate( + &mut self, + _values: &[ArrayRef], + _range: &Range, + ) -> Result { Err(DataFusionError::NotImplemented( - "evaluate_stateful is not implemented by default".into(), + "evaluate is not implemented by default".into(), )) } @@ -220,21 +229,6 @@ pub trait PartitionEvaluator: Debug + Send { )) } - /// Called for window functions that use values from window frame, - /// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a - /// single value for every row in the partition. - /// - /// Returns a [`ScalarValue`] that is the value of the window function for the entire partition - fn evaluate_inside_range( - &self, - _values: &[ArrayRef], - _range: &Range, - ) -> Result { - Err(DataFusionError::NotImplemented( - "evaluate_inside_range is not implemented by default".into(), - )) - } - /// Does the window function use the values from its window frame? /// /// If this function returns true, [`Self::create_evaluator`] must @@ -247,7 +241,7 @@ pub trait PartitionEvaluator: Debug + Send { /// bounded memory? /// /// If this function returns true, [`Self::create_evaluator`] must - /// implement [`PartitionEvaluator::evaluate_stateful`] + /// implement [`PartitionEvaluator::evaluate`] fn supports_bounded_execution(&self) -> bool { false } diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index a535697f6e239..9b2ad80ce02fa 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -98,7 +98,7 @@ impl WindowExpr for BuiltInWindowExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let evaluator = self.expr.create_evaluator()?; + let mut evaluator = self.expr.create_evaluator()?; let num_rows = batch.num_rows(); if evaluator.uses_window_frame() { let sort_options: Vec = @@ -117,7 +117,7 @@ impl WindowExpr for BuiltInWindowExpr { num_rows, idx, )?; - let value = evaluator.evaluate_inside_range(&values, &range)?; + let value = evaluator.evaluate(&values, &range)?; row_wise_results.push(value); last_range = range; } @@ -128,7 +128,7 @@ impl WindowExpr for BuiltInWindowExpr { evaluator.evaluate_with_rank(num_rows, &sort_partition_points) } else { let (values, _) = self.get_values_orderbys(batch)?; - evaluator.evaluate(&values, num_rows) + evaluator.evaluate_all(&values, num_rows) } } @@ -202,7 +202,8 @@ impl WindowExpr for BuiltInWindowExpr { // Update last range state.window_frame_range = frame_range; evaluator.update_state(state, idx, &order_bys, &sort_partition_points)?; - row_wise_results.push(evaluator.evaluate_stateful(&values)?); + row_wise_results + .push(evaluator.evaluate(&values, &state.window_frame_range)?); } let out_col = if row_wise_results.is_empty() { new_empty_array(out_type) diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index 2c1b6beff3ced..5229ade259501 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -211,7 +211,11 @@ impl PartitionEvaluator for WindowShiftEvaluator { } } - fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result { + fn evaluate( + &mut self, + values: &[ArrayRef], + _range: &Range, + ) -> Result { let array = &values[0]; let dtype = array.data_type(); let idx = self.state.idx as i64 - self.shift_offset; @@ -222,7 +226,7 @@ impl PartitionEvaluator for WindowShiftEvaluator { } } - fn evaluate(&self, values: &[ArrayRef], _num_rows: usize) -> Result { + fn evaluate_all(&mut self, values: &[ArrayRef], _num_rows: usize) -> Result { // LEAD, LAG window functions take single column, values will have size 1 let value = &values[0]; shift_with_default_value(value, self.shift_offset, self.default_value.as_ref()) @@ -267,7 +271,7 @@ mod tests { let values = expr.evaluate_args(&batch)?; let result = expr .create_evaluator()? - .evaluate(&values, batch.num_rows())?; + .evaluate_all(&values, batch.num_rows())?; let result = as_int32_array(&result)?; assert_eq!(expected, *result); Ok(()) diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index f7b23c803e299..dbdd698587793 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -173,36 +173,32 @@ impl PartitionEvaluator for NthValueEvaluator { Ok(()) } - fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result { - if let Some(ref result) = self.state.finalized_result { - Ok(result.clone()) - } else { - self.evaluate_inside_range(values, &self.state.range) - } - } - - fn evaluate_inside_range( - &self, + fn evaluate( + &mut self, values: &[ArrayRef], range: &Range, ) -> Result { - // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1. - let arr = &values[0]; - let n_range = range.end - range.start; - if n_range == 0 { - // We produce None if the window is empty. - return ScalarValue::try_from(arr.data_type()); - } - match self.state.kind { - NthValueKind::First => ScalarValue::try_from_array(arr, range.start), - NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1), - NthValueKind::Nth(n) => { - // We are certain that n > 0. - let index = (n as usize) - 1; - if index >= n_range { - ScalarValue::try_from(arr.data_type()) - } else { - ScalarValue::try_from_array(arr, range.start + index) + if let Some(ref result) = self.state.finalized_result { + Ok(result.clone()) + } else { + // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1. + let arr = &values[0]; + let n_range = range.end - range.start; + if n_range == 0 { + // We produce None if the window is empty. + return ScalarValue::try_from(arr.data_type()); + } + match self.state.kind { + NthValueKind::First => ScalarValue::try_from_array(arr, range.start), + NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1), + NthValueKind::Nth(n) => { + // We are certain that n > 0. + let index = (n as usize) - 1; + if index >= n_range { + ScalarValue::try_from(arr.data_type()) + } else { + ScalarValue::try_from_array(arr, range.start + index) + } } } } @@ -238,11 +234,11 @@ mod tests { end: i + 1, }) } - let evaluator = expr.create_evaluator()?; + let mut evaluator = expr.create_evaluator()?; let values = expr.evaluate_args(&batch)?; let result = ranges .iter() - .map(|range| evaluator.evaluate_inside_range(&values, range)) + .map(|range| evaluator.evaluate(&values, range)) .collect::>>()?; let result = ScalarValue::iter_to_array(result.into_iter())?; let result = as_int32_array(&result)?; diff --git a/datafusion/physical-expr/src/window/ntile.rs b/datafusion/physical-expr/src/window/ntile.rs index 6618de423f2b9..da6b4a91233e0 100644 --- a/datafusion/physical-expr/src/window/ntile.rs +++ b/datafusion/physical-expr/src/window/ntile.rs @@ -70,7 +70,7 @@ pub(crate) struct NtileEvaluator { } impl PartitionEvaluator for NtileEvaluator { - fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { + fn evaluate_all(&mut self, _values: &[ArrayRef], num_rows: usize) -> Result { let num_rows = num_rows as u64; let mut vec: Vec = Vec::new(); for i in 0..num_rows { diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index 100e3ef944730..ec28e1a4996c8 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -149,7 +149,11 @@ impl PartitionEvaluator for RankEvaluator { } /// evaluate window function result inside given range - fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { + fn evaluate( + &mut self, + _values: &[ArrayRef], + _range: &Range, + ) -> Result { match self.rank_type { RankType::Basic => Ok(ScalarValue::UInt64(Some( self.state.last_rank_boundary as u64 + 1, diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index 6783a4d5318c5..f59d18c50cd46 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -80,12 +80,16 @@ impl PartitionEvaluator for NumRowsEvaluator { } /// evaluate window function result inside given range - fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { + fn evaluate( + &mut self, + _values: &[ArrayRef], + _range: &Range, + ) -> Result { self.state.n_rows += 1; Ok(ScalarValue::UInt64(Some(self.state.n_rows as u64))) } - fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { + fn evaluate_all(&mut self, _values: &[ArrayRef], num_rows: usize) -> Result { Ok(Arc::new(UInt64Array::from_iter_values( 1..(num_rows as u64) + 1, ))) @@ -114,7 +118,7 @@ mod tests { let values = row_number.evaluate_args(&batch)?; let result = row_number .create_evaluator()? - .evaluate(&values, batch.num_rows())?; + .evaluate_all(&values, batch.num_rows())?; let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *result); @@ -132,7 +136,7 @@ mod tests { let values = row_number.evaluate_args(&batch)?; let result = row_number .create_evaluator()? - .evaluate(&values, batch.num_rows())?; + .evaluate_all(&values, batch.num_rows())?; let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *result);