From df510e09994e4c2aec9918a34ef32a8e52e4358b Mon Sep 17 00:00:00 2001 From: DreaMer963 Date: Sat, 23 Oct 2021 01:55:25 +0800 Subject: [PATCH] DataFrame supports window function --- datafusion/src/execution/dataframe_impl.rs | 43 +++++++++++++++++++--- datafusion/src/logical_plan/builder.rs | 25 ++++++++++++- datafusion/src/sql/planner.rs | 29 ++------------- 3 files changed, 65 insertions(+), 32 deletions(-) diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 18a558ef7114e..a313cc170a404 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -35,6 +35,7 @@ use crate::arrow::util::pretty; use crate::physical_plan::{ execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, }; +use crate::sql::utils::find_window_exprs; use async_trait::async_trait; /// Implementation of DataFrame API @@ -75,10 +76,17 @@ impl DataFrame for DataFrameImpl { /// Create a projection based on arbitrary expressions fn select(&self, expr_list: Vec) -> Result> { - let plan = LogicalPlanBuilder::from(self.to_logical_plan()) - .project(expr_list)? - .build()?; - Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan))) + let window_func_exprs = find_window_exprs(&expr_list); + let plan = if window_func_exprs.is_empty() { + self.to_logical_plan() + } else { + LogicalPlanBuilder::window_plan(self.to_logical_plan(), window_func_exprs)? + }; + let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?; + Ok(Arc::new(DataFrameImpl::new( + self.ctx_state.clone(), + &project_plan, + ))) } /// Create a filter based on a predicate expression @@ -233,7 +241,7 @@ mod tests { use crate::execution::options::CsvReadOptions; use crate::logical_plan::*; use crate::physical_plan::functions::Volatility; - use crate::physical_plan::ColumnarValue; + use crate::physical_plan::{window_functions, ColumnarValue}; use crate::{assert_batches_sorted_eq, execution::context::ExecutionContext}; use crate::{physical_plan::functions::ScalarFunctionImplementation, test}; use arrow::datatypes::DataType; @@ -270,6 +278,31 @@ mod tests { Ok(()) } + #[tokio::test] + async fn select_with_window_exprs() -> Result<()> { + // build plan using Table API + let t = test_table().await?; + let first_row = Expr::WindowFunction { + fun: window_functions::WindowFunction::BuiltInWindowFunction( + window_functions::BuiltInWindowFunction::FirstValue, + ), + args: vec![col("aggregate_test_100.c1")], + partition_by: vec![col("aggregate_test_100.c2")], + order_by: vec![], + window_frame: None, + }; + let t2 = t.select(vec![col("c1"), first_row])?; + let plan = t2.to_logical_plan(); + + let sql_plan = create_plan( + "select c1, first_value(c1) over (partition by c2) from aggregate_test_100", + ) + .await?; + + assert_same_plan(&plan, &sql_plan); + Ok(()) + } + #[tokio::test] async fn aggregate() -> Result<()> { // build plan using DataFrame API diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 3a1d127356588..b25ad63462804 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -44,6 +44,7 @@ use crate::logical_plan::{ columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema, DFSchemaRef, Partitioning, }; +use crate::sql::utils::group_window_expr_by_sort_keys; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -327,7 +328,29 @@ impl LogicalPlanBuilder { Ok(Self::from(table_scan)) } - + /// Wrap a plan in a window + pub(crate) fn window_plan( + input: LogicalPlan, + window_exprs: Vec, + ) -> Result { + let mut plan = input; + let mut groups = group_window_expr_by_sort_keys(&window_exprs)?; + // sort by sort_key len descending, so that more deeply sorted plans gets nested further + // down as children; to further mimic the behavior of PostgreSQL, we want stable sort + // and a reverse so that tieing sort keys are reversed in order; note that by this rule + // if there's an empty over, it'll be at the top level + groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len())); + groups.reverse(); + for (_, exprs) in groups { + let window_exprs = exprs.into_iter().cloned().collect::>(); + // the partition and sort itself is done at physical level, see physical_planner's + // fn create_initial_plan + plan = LogicalPlanBuilder::from(plan) + .window(window_exprs)? + .build()?; + } + Ok(plan) + } /// Apply a projection without alias. pub fn project(&self, expr: impl IntoIterator) -> Result { self.project_with_alias(expr, None) diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 5c1b50107e25b..029fa05185c58 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -59,9 +59,8 @@ use super::{ parser::DFParser, utils::{ can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases, - find_aggregate_exprs, find_column_exprs, find_window_exprs, - group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs, - resolve_positions_to_exprs, + find_aggregate_exprs, find_column_exprs, find_window_exprs, rebase_expr, + resolve_aliases_to_exprs, resolve_positions_to_exprs, }, }; use crate::logical_plan::builder::project_with_alias; @@ -791,7 +790,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let plan = if window_func_exprs.is_empty() { plan } else { - self.window(plan, window_func_exprs)? + LogicalPlanBuilder::window_plan(plan, window_func_exprs)? }; let plan = if select.distinct { @@ -838,28 +837,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { LogicalPlanBuilder::from(input).project(expr)?.build() } - /// Wrap a plan in a window - fn window(&self, input: LogicalPlan, window_exprs: Vec) -> Result { - let mut plan = input; - let mut groups = group_window_expr_by_sort_keys(&window_exprs)?; - // sort by sort_key len descending, so that more deeply sorted plans gets nested further - // down as children; to further mimic the behavior of PostgreSQL, we want stable sort - // and a reverse so that tieing sort keys are reversed in order; note that by this rule - // if there's an empty over, it'll be at the top level - groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len())); - groups.reverse(); - for (_, exprs) in groups { - let window_exprs = exprs.into_iter().cloned().collect::>(); - // the partition and sort itself is done at physical level, see physical_planner's - // fn create_initial_plan - plan = LogicalPlanBuilder::from(plan) - .window(window_exprs)? - .build()?; - } - - Ok(plan) - } - /// Wrap a plan in an aggregate fn aggregate( &self,